# Começando o Trabalho
---

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [4]:
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .master('local[*]') \
#     .appName("Iniciando com Spark") \
#     .getOrCreate()

In [5]:
# spark

## Acessando o [Spark UI](https://spark.apache.org/docs/3.1.2/web-ui.html) (Google Colab)

In [6]:
from pyspark.sql.functions import max, date_sub, year, month, last_day, col
from pyspark.sql.functions import sum, expr, current_date, count
from pyspark.sql import functions as F
from pyspark.sql.types import DateType
from datetime import datetime
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

[Site ngrok](https://ngrok.com)

In [8]:
!wget -q https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


In [9]:
!ngrok config add-authtoken 2KhGnm3oR704AwiHamZP5DayNYG_5UqPzTqfSjTJeUDwmK1uq

/bin/bash: ngrok: command not found


In [10]:
get_ipython().system_raw('./ngrok authtoken 2KhGnm3oR704AwiHamZP5DayNYG_5UqPzTqfSjTJeUDwmK1uq')
get_ipython().system_raw('./ngrok http 4050 &')

In [11]:
!curl -s http://localhost:4040/api/tunnels

{"tunnels":[],"uri":"/api/tunnels"}


In [12]:
spark

## Carregamento de dados

### Montando drive

In [13]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Carregando os dados

### Questão 1 - Quais são os três produtos MENOS vendidos?

In [14]:
path = '/content/drive/MyDrive/Colab Notebooks/Santander/7/nortwind/'
orders_detail = spark.read.csv(path+"OrderDetails.csv", sep=',', 
                               inferSchema=True, header='true')

In [15]:
orders_detail.show(5)

+-------+---------+---------+--------+--------+
|OrderID|ProductID|UnitPrice|Quantity|Discount|
+-------+---------+---------+--------+--------+
|  10248|       11|       14|      12|       0|
|  10248|       42|      9,8|      10|       0|
|  10248|       72|     34,8|       5|       0|
|  10249|       14|     18,6|       9|       0|
|  10249|       51|     42,4|      40|       0|
+-------+---------+---------+--------+--------+
only showing top 5 rows



In [16]:
# Agrupa e seleciona os três menos vendidos
produtos_vendidos = orders_detail.groupBy("ProductID") \
    .agg(sum("Quantity").alias("TotalVendido")) \
    .orderBy("TotalVendido")

tres_menos_vendidos = produtos_vendidos.limit(3)
tres_menos_vendidos.show()

+---------+------------+
|ProductID|TotalVendido|
+---------+------------+
|        9|          95|
|       15|         122|
|       37|         125|
+---------+------------+



### Questão 2 - Quais são os cinco clientes que MAIS compras fizeram? (quantidade)

In [17]:
customers = spark.read.csv(path+"Customers.csv", sep=',', encoding= "ISO-8859-1",
                               inferSchema=True, header='true')
customers.show(5)

+----------+--------------------+------------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|CustomerID|         CompanyName|       ContactName|        ContactTitle|             Address|       City|Region|PostalCode|Country|         Phone|           Fax|
+----------+--------------------+------------------+--------------------+--------------------+-----------+------+----------+-------+--------------+--------------+
|     ALFKI| Alfreds Futterkiste|      Maria Anders|Sales Representative|       Obere Str. 57|     Berlin|  null|     12209|Germany|   030-0074321|   030-0076545|
|     ANATR|Ana Trujillo Empa...|      Ana Trujillo|               Owner|Avda. de la Const...|México D.F.|  null|      5021| Mexico|  (5) 555-4729|  (5) 555-3745|
|     ANTON|Antonio Moreno Ta...|    Antonio Moreno|               Owner|     Mataderos  2312|México D.F.|  null|      5023| Mexico|  (5) 555-3932|          null|
|     AROUT|     Aroun

In [18]:
orders = spark.read.csv(path+"Orders.csv", sep=',', encoding= "ISO-8859-1",
                               inferSchema=True, header='true')
orders.show(5)

+-------+----------+----------+--------------------+--------------------+--------------------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+
|OrderID|CustomerID|EmployeeID|           OrderDate|        RequiredDate|         ShippedDate|ShipVia|Freight|            ShipName|         ShipAddress|      ShipCity|ShipRegion|ShipPostalCode|ShipCountry|
+-------+----------+----------+--------------------+--------------------+--------------------+-------+-------+--------------------+--------------------+--------------+----------+--------------+-----------+
|  10248|     VINET|         5|1996-07-04 00:00:...|1996-08-01 00:00:...|1996-07-16 00:00:...|      3|  32,38|Vins et alcools C...|  59 rue de l'Abbaye|         Reims|      null|         51100|     France|
|  10249|     TOMSP|         6|1996-07-05 00:00:...|1996-08-16 00:00:...|1996-07-10 00:00:...|      1|  11,61|  Toms Spezialitäten|       Luisenstr. 48|       Münster|      nul

In [19]:
merged_df = customers.join(orders, "CustomerID") \
    .join(orders_detail, "OrderID")

# Agrupa pelo nome somando a quantidade comprado
total_comprado_por_cliente = merged_df.groupBy("ContactName") \
    .agg(sum("Quantity").alias("TotalComprado")) \
    .orderBy("TotalComprado", ascending=False)


clientes_mais_compraram = total_comprado_por_cliente.limit(5)
clientes_mais_compraram.show()

+----------------+-------------+
|     ContactName|TotalComprado|
+----------------+-------------+
|  Jose Pavarotti|         4958|
|   Roland Mendel|         4543|
|     Horst Kloss|         3961|
|Patricia McKenna|         1684|
|   Peter Franken|         1525|
+----------------+-------------+



### Questão 3 - Quais são os cinco clientes com MAIOR total gasto? (montante)

In [20]:
merged_df = customers.join(orders, "CustomerID") \
    .join(orders_detail, "OrderID")

total_gasto_por_cliente = merged_df.groupBy("ContactName") \
    .agg(expr("sum(Quantity * UnitPrice * (1 - Discount))").alias("TotalGasto")) \
    .orderBy("TotalGasto", ascending=False)

cinco_maior_gasto = total_gasto_por_cliente.limit(5)
cinco_maior_gasto.show()

+--------------+----------+
|   ContactName|TotalGasto|
+--------------+----------+
| Roland Mendel|   28163.0|
|Jose Pavarotti|   26887.0|
|   Horst Kloss|   17983.0|
| Maria Larsson|   14071.0|
| Philip Cramer|   10107.0|
+--------------+----------+



### 4 - Qual o melhor funcionário do último mês registrado? (total de vendas)

In [21]:
employees = spark.read.csv(path+"Employees.csv", sep=',', encoding= "ISO-8859-1",
                               inferSchema=True, header='true')
employees.show(5)

+----------+---------+---------+--------------------+---------------+--------------------+--------------------+--------------------+--------+------+----------+-------+--------------+---------+--------------------+--------------------+---------+--------------------+
|EmployeeID| LastName|FirstName|               Title|TitleOfCourtesy|           BirthDate|            HireDate|             Address|    City|Region|PostalCode|Country|     HomePhone|Extension|               Photo|               Notes|ReportsTo|           PhotoPath|
+----------+---------+---------+--------------------+---------------+--------------------+--------------------+--------------------+--------+------+----------+-------+--------------+---------+--------------------+--------------------+---------+--------------------+
|         1|  Davolio|    Nancy|Sales Representative|            Ms.|1948-12-08 00:00:...|1992-05-01 00:00:...|507 - 20th Ave. E...| Seattle|    WA|     98122|    USA|(206) 555-9857|     5467|0x151C2F00

In [22]:
# filtro de data (deve ter um jeito mais elegante)
orders_mes_anterior = orders.filter(
    (orders.OrderDate >= '1998-04-01 00:00:00.000') & (orders.OrderDate < '1998-05-01 00:00:00.000'))

qnt_vendas = orders_mes_anterior.groupBy("EmployeeID") \
    .agg(count("OrderID").alias("QntVendas")) \
    .orderBy("QntVendas", ascending=False)

funcionario_mais_vendas = employees.join(qnt_vendas, "EmployeeID") \
    .orderBy("QntVendas", ascending=False) \
    .limit(1)

funcionario_mais_vendas.select("FirstName", "LastName", "Title", "QntVendas").show()

+---------+--------+--------------------+---------+
|FirstName|LastName|               Title|QntVendas|
+---------+--------+--------------------+---------+
|   Andrew|  Fuller|Vice President, S...|       18|
+---------+--------+--------------------+---------+



### 5 - Quais as regiões com menos clientes cadastrados?

In [23]:
clientes_por_regiao = customers.groupBy("Region")\
    .agg(count("CustomerID")\
    .alias("TotalClientes"))\
    .orderBy("TotalClientes", ascending=True)

clientes_por_regiao.show(10)

+-------------+-------------+
|       Region|TotalClientes|
+-------------+-------------+
|      Táchira|            1|
|Nueva Esparta|            1|
|     Co. Cork|            1|
|Isle of Wight|            1|
|           NM|            1|
|           CA|            1|
|           ID|            1|
|           MT|            1|
|       Québec|            1|
|           WY|            1|
+-------------+-------------+
only showing top 10 rows

