# Setup Geral

Se estiver executando este exercício no Google Colab, execute as próximas duas células.

Caso esteja executando localmente, não é necessário executar mas certifique-se de que o **pyspark** está instalado e configurado em sua máquina.

In [1]:
%%bash

# Instal Java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates/restricted amd64 Packages [1,614 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://archive.ubuntu.com/ubuntu jammy-updates/multiverse amd64 Packages [50.4 kB]
Get:13 http://archive.ubuntu.com/ubuntu jammy-updates/ma

In [2]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Teste

O teste a ser realizado é composto de 3 partes:

- um exercício de programação em Python
- alguns exercícios de programação em SQL
- alguns exercícios de programação com PySpark

Você pode escolher qual das partes do exercício vai fazer primeiro. Todo o exercício deve ser completado no período de 48 horas.

# Python

In [3]:
# SETUP
nomes_alunos = [
    ('Maria', 1),
    ('João', 2),
    ('Pedro', 3),
    ('Gabriella', 4),
    ('Giovana', 5),
    ('Arthur', 6)
]

notas_alunos = {
    1: 9.5,
    2: 5.1,
    3: 8.7,
    4: 7.1,
    5: 4.8,
    6: 6.3
}

Implemente uma função que recebe uma lista de nomes de alunos, um dicionário de notas dos mesmo, sendo que essas estruturas se relacionam por um ID.

A função deve retornar em ordem alfabética, o nome dos alunos que obtiveram notas maior ou igual de uma nota de corte informada.

In [10]:
def filtra_alunos_acima_corte(alunos, notas, nota_corte):
  # Desenvolva aqui
    # Primeiro, cria-se um dicionário para mapear IDs de alunos:
    dicionario_estudantes = {id: name for name, id in alunos}

    # Verifica-se que cada ID na lista de alunos corresponda a um ID no dicionário de notas
    alunos_verificados = [id for nome, id in alunos if id in notas]

    # Filtra-se as notas do dicionário para encontrar alunos com notas acima do corte:
    alunos_aprovados = {id: nota for id, nota in notas.items() if nota >= nota_corte}

    # Mapeia-se os estudantes aprovados, referenciando-se os nomes novamente:
    nomes_alunos_aprovados = [dicionario_estudantes[id] for id in alunos_aprovados]

    # Ordena em ordem alfabética e retorna a lista com o resultado
    return sorted(nomes_alunos_aprovados)
filtra_alunos_acima_corte(nomes_alunos, notas_alunos, 6)

['Arthur', 'Gabriella', 'Maria', 'Pedro']

# SQL

**Setup**


In [11]:
%%bash
mkdir bases_teste
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/produtos.csv -o bases_teste/produtos.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/vendas.csv -o bases_teste/vendas.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/usuarios.csv -o bases_teste/usuarios.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1039  100  1039    0     0   3443      0 --:--:-- --:--:-- --:--:--  3451
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0 38  533k   38  207k    0     0   359k      0  0:00:01 --:--:--  0:00:01  359k100  533k  100  533k    0     0   859k      0 --:--:-- --:--:-- --:--:--  859k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  211k  100  211k    0     0   472k      0 --:

In [12]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadeSQL").getOrCreate()

In [13]:
def cria_tabela(path, nome_tabela):
    df = spark.read.csv(path, inferSchema=True, header=True)
    df.createOrReplaceTempView(nome_tabela)
    return df

usuarios = cria_tabela("bases_teste/usuarios.csv", "usuarios")
produtos = cria_tabela("bases_teste/produtos.csv", "produtos")
vendas = cria_tabela("bases_teste/vendas.csv", "vendas")

**Função para execução de queries**

In [14]:
def q(query, n=30):
    return spark.sql(query).show(n=n, truncate=False)

Para executar alguma consulta, basta colocar seu código sql dentro da função q como no exemplo abaixo:

```python
q("""
    SELECT *
    FROM usuarios
""")
```

e o resultado será exibido na tela.

---

Nesta parte da atividade, você vai trabalhar com três tabelas:

- produtos
- usuarios
- vendas

Use-as para responder às perguntas a seguir.

1) Qual foi a quantidade de vendas nos estados de Minas Gerais e São Paulo para cada ano e mês?

In [29]:
#amostras de cada tabela
q("""
SELECT * FROM usuarios LIMIT 3
""")
q("""
SELECT * FROM vendas LIMIT 3
""")
q("""
SELECT * FROM produtos LIMIT 3
""")

+-----------+-------------+------------------+---------+-------------------+
|cod_usuario|data_cadastro|faixa_etaria      |cidade   |estado             |
+-----------+-------------+------------------+---------+-------------------+
|69         |2020-07-24   |Entre 16 a 21 anos|São Romão|Minas Gerais       |
|176        |2021-03-19   |Entre 28 a 36 anos|Guanambi |Bahia              |
|185        |2021-03-11   |Entre 16 a 21 anos|Martins  |Rio Grande do Norte|
+-----------+-------------+------------------+---------+-------------------+

+-----------+-----------+-----------+----------+-------+
|cod_usuario|cod_produto|data_compra|quantidade|valor  |
+-----------+-----------+-----------+----------+-------+
|927        |10         |2020-10-19 |4         |2799.6 |
|1544       |10         |2021-03-28 |2         |1399.8 |
|2833       |2          |2020-09-13 |20        |26915.6|
+-----------+-----------+-----------+----------+-------+

+-----------+---------------------------+-----------------+-

In [31]:
q("""
SELECT
YEAR(vendas.data_compra) AS ano
,MONTH(vendas.data_compra) AS mes
,usuarios.estado
,SUM(vendas.quantidade) AS total_vendas

FROM vendas AS vendas
INNER JOIN usuarios AS usuarios ON vendas.cod_usuario = usuarios.cod_usuario
WHERE usuarios.estado IN ('Minas Gerais', 'São Paulo')
GROUP BY YEAR(vendas.data_compra), MONTH(vendas.data_compra), usuarios.estado
ORDER BY ano, mes, usuarios.estado
""")

+----+---+------------+------------+
|ano |mes|estado      |total_vendas|
+----+---+------------+------------+
|2018|6  |Minas Gerais|10          |
|2018|7  |São Paulo   |9           |
|2018|8  |Minas Gerais|16          |
|2018|10 |Minas Gerais|20          |
|2018|10 |São Paulo   |12          |
|2018|11 |Minas Gerais|10          |
|2018|12 |Minas Gerais|39          |
|2019|1  |Minas Gerais|20          |
|2019|2  |Minas Gerais|2           |
|2019|4  |Minas Gerais|26          |
|2019|5  |Minas Gerais|11          |
|2019|6  |Minas Gerais|1           |
|2019|6  |São Paulo   |10          |
|2019|7  |Minas Gerais|14          |
|2019|8  |Minas Gerais|15          |
|2019|8  |São Paulo   |17          |
|2019|9  |Minas Gerais|27          |
|2019|10 |Minas Gerais|3           |
|2019|11 |Minas Gerais|1           |
|2019|12 |Minas Gerais|25          |
|2020|1  |Minas Gerais|8           |
|2020|1  |São Paulo   |6           |
|2020|2  |Minas Gerais|10          |
|2020|3  |Minas Gerais|14          |
|

2) Quais são os usuários por Estado que mais compraram em todo o período analisado e qual foi o número de compras realizadas, a quantidade total de itens comprados e valor total pago por usuário?

In [32]:
q("""
WITH vendas_por_usuario AS (
    SELECT
    usuarios.cod_usuario
    ,usuarios.estado
    ,COUNT(*) AS total_compras
    ,SUM(vendas.quantidade) AS total_itens
    ,ROUND(SUM(vendas.valor),2) AS valor_total
    FROM vendas
    JOIN usuarios ON vendas.cod_usuario = usuarios.cod_usuario
    GROUP BY usuarios.cod_usuario, usuarios.estado
)
SELECT estado, cod_usuario, total_compras, total_itens, valor_total
FROM (
    SELECT *,
           RANK() OVER (PARTITION BY estado ORDER BY total_compras DESC, total_itens DESC, valor_total DESC) as rank
    FROM vendas_por_usuario
) ranked
WHERE ranked.rank = 1
ORDER BY estado
""")

+-------------------+-----------+-------------+-----------+-----------+
|estado             |cod_usuario|total_compras|total_itens|valor_total|
+-------------------+-----------+-------------+-----------+-----------+
|Amapá              |2186       |7            |61         |51359.34   |
|Bahia              |2981       |9            |70         |126282.4   |
|Ceará              |1787       |8            |79         |121494.67  |
|Goiás              |700        |9            |74         |142389.09  |
|Maranhão           |2811       |13           |117        |184101.89  |
|Mato Grosso        |2340       |9            |112        |165197.3   |
|Mato Grosso do Sul |679        |6            |102        |152271.96  |
|Minas Gerais       |3090       |9            |93         |95697.3    |
|Paraná             |2843       |10           |114        |137228.01  |
|Paraíba            |464        |6            |73         |73556.1    |
|Pernambuco         |2646       |7            |84         |13555

3) Quais são os usuários que não fizeram nenhuma compra?

In [33]:
q("""
SELECT
  usuarios.cod_usuario
FROM usuarios
LEFT JOIN vendas ON usuarios.cod_usuario = vendas.cod_usuario
WHERE vendas.quantidade IS NULL
""")


+-----------+
|cod_usuario|
+-----------+
|5098       |
|5229       |
|5482       |
|5504       |
|5614       |
|5923       |
|6985       |
|7901       |
|9980       |
|10080      |
|10152      |
|10170      |
|10929      |
|11002      |
|11546      |
|12298      |
|12354      |
|13117      |
|13476      |
|14486      |
|15198      |
|15313      |
|15784      |
|16570      |
|17530      |
|17779      |
|18028      |
|18034      |
|18166      |
|18366      |
+-----------+
only showing top 30 rows



4) Qual é o ticket médio (média de valor gasto) e o número total de usuários que fizeram pelo menos uma compra por faixa etária?

In [34]:
q("""
WITH total_gasto_por_usuario AS (
    SELECT
     vendas.cod_usuario
    ,SUM(vendas.valor) AS total_gasto
    FROM vendas
    GROUP BY vendas.cod_usuario
)
SELECT
usuarios.faixa_etaria
,ROUND(AVG(total_gasto_por_usuario.total_gasto), 2) AS ticket_medio
,COUNT(DISTINCT usuarios.cod_usuario) AS total_usuarios
FROM total_gasto_por_usuario
JOIN usuarios ON total_gasto_por_usuario.cod_usuario = usuarios.cod_usuario
GROUP BY usuarios.faixa_etaria
ORDER BY usuarios.faixa_etaria

""")


+------------------+------------+--------------+
|faixa_etaria      |ticket_medio|total_usuarios|
+------------------+------------+--------------+
|Entre 10 a 15 anos|54610.77    |7             |
|Entre 16 a 21 anos|57845.21    |11            |
|Entre 22 a 27 anos|78099.35    |10            |
|Entre 28 a 36 anos|51227.79    |9             |
|Entre 37 a 49 anos|117090.33   |6             |
|Entre 50 a 61 anos|72329.84    |12            |
|Entre 62 a 70 anos|55784.63    |7             |
|Mais de 70 anos   |81947.5     |4             |
+------------------+------------+--------------+



In [35]:
spark.stop()

# PySpark

**setup**:

In [36]:
%%bash
mkdir bases_teste
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/produtos.csv -o bases_teste/produtos.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/vendas.csv -o bases_teste/vendas.csv
curl https://raw.githubusercontent.com/A3Data/bases_testes/main/bases_teste/usuarios.csv -o bases_teste/usuarios.csv

mkdir: cannot create directory ‘bases_teste’: File exists
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1039  100  1039    0     0   3535      0 --:--:-- --:--:-- --:--:--  3546
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  533k  100  533k    0     0  1275k      0 --:--:-- --:--:-- --:--:-- 1277k
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:

In [37]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Atividade_Spark_DataFrames").getOrCreate()

In [38]:
def cria_tabela(path, nome_tabela):
    df = spark.read.csv(path, inferSchema=True, header=True)
    df.createOrReplaceTempView(nome_tabela)
    return df

usuarios = cria_tabela("bases_teste/usuarios.csv", "usuarios")
produtos = cria_tabela("bases_teste/produtos.csv", "produtos")
vendas = cria_tabela("bases_teste/vendas.csv", "vendas")

In [39]:
#amostras
usuarios.show(3, truncate = False)
produtos.show(3, truncate = False)
vendas.show(3, truncate = False)

+-----------+-------------+------------------+---------+-------------------+
|cod_usuario|data_cadastro|faixa_etaria      |cidade   |estado             |
+-----------+-------------+------------------+---------+-------------------+
|69         |2020-07-24   |Entre 16 a 21 anos|São Romão|Minas Gerais       |
|176        |2021-03-19   |Entre 28 a 36 anos|Guanambi |Bahia              |
|185        |2021-03-11   |Entre 16 a 21 anos|Martins  |Rio Grande do Norte|
+-----------+-------------+------------------+---------+-------------------+
only showing top 3 rows

+-----------+---------------------------+-----------------+-------------+
|cod_produto|nome_produto               |categoria_produto|valor_produto|
+-----------+---------------------------+-----------------+-------------+
|1          |Notebook Asus Intel Core i7|Tecnologia       |4590.9       |
|2          |Bicicleta Caloi Aro 29     |Lazer            |1345.78      |
|3          |Smartphone Samsung         |Tecnologia       |2049.5 

Responda às perguntas a seguir utilizando **Spark DATAFRAMES**.

1) Qual foi o total de compras realizadas, o total de itens comprados e a receita total obtida em todo o período analisado?

In [42]:
from pyspark.sql import functions as F

# Cálculo do total de compras, total de itens, e receita total:
resultado = vendas.agg(
    F.count(F.lit(1)).alias("Total_Compras"),
    F.sum("quantidade").alias("Total_Itens"),
    F.format_number(F.sum("valor"), 2).alias("Receita_Total")
)

resultado.show(truncate = False)

+-------------+-----------+--------------+
|Total_Compras|Total_Itens|Receita_Total |
+-------------+-----------+--------------+
|20000        |209149     |215,849,001.66|
+-------------+-----------+--------------+



2) Quais são os 3 produtos mais comprados dos estados da região Sul e Sudeste, a quantidade de itens comprados, o valor total pago e a média de preço paga?

In [45]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Listagem de Estados do Sul e Sudeste:
estados_sul_sudeste = ["Rio Grande do Sul", "Santa Catarina", "Paraná",
                       "São Paulo", "Rio de Janeiro", "Espírito Santo", "Minas Gerais"]

# Condição do JOIN entre as tabelas vendas , usuarios e produtos:
condicao_join = [vendas.cod_usuario == usuarios.cod_usuario, vendas.cod_produto == produtos.cod_produto]
df_join = vendas.join(usuarios, condicao_join[0]).join(produtos, condicao_join[1])

# Aplicando as condições de retorno solicitadas e filtradas por estado
df_agregacao = df_join.filter(usuarios.estado.isin(estados_sul_sudeste)) \
    .groupBy(produtos.nome_produto) \
    .agg(
        F.sum("quantidade").alias("Total_Itens"),
        F.format_number(F.sum("valor"), 2).alias("Valor_Total"),
        F.format_number(F.sum("valor") / F.sum("quantidade"), 2).alias("Preco_Medio")
    )

# Aplicando o conceito de window functions para rankeamento
windowSpec = Window.partitionBy().orderBy(F.desc("Total_Itens"))

# Aplicando a função de window functions e selecionando os 3 primeiros
top_produtos = df_agregacao.withColumn("rank", F.row_number().over(windowSpec)) \
    .filter(F.col("rank") <= 3)

top_produtos.show(truncate = False)


+---------------------------+-----------+-----------+-----------+----+
|nome_produto               |Total_Itens|Valor_Total|Preco_Medio|rank|
+---------------------------+-----------+-----------+-----------+----+
|Alcool em Gel 70% Johnson  |280        |1,397.20   |4.99       |1   |
|Escrivaninha em L          |141        |61,617.00  |437.00     |2   |
|Notebook Asus Intel Core i7|134        |615,180.60 |4,590.90   |3   |
+---------------------------+-----------+-----------+-----------+----+



3) Para cada produto, quantos usuários fizeram pelo menos uma compra desse produto e qual é o valor mínimo e máximo pago por eles?

In [52]:
from pyspark.sql import functions as F

# dataframe com JOIN entre as tabelas vendas e produtos
df_join = vendas.join(produtos, vendas.cod_produto == produtos.cod_produto)

# Calculando o número de usuários distintos, valor mínimo e máximo pago por produto
resultado = df_join.groupBy(produtos.nome_produto) \
                   .agg(
                       F.countDistinct(vendas.cod_usuario).alias("Total_Usuarios"),
                       F.format_number(F.min(vendas.valor),2).alias("Valor_minimo"),
                       F.format_number(F.max(vendas.valor),2).alias("Valor_Maximo")
                   )

resultado.show(truncate = False)


+-----------------------------------+--------------+------------+------------+
|nome_produto                       |Total_Usuarios|Valor_minimo|Valor_Maximo|
+-----------------------------------+--------------+------------+------------+
|Bicicleta Caloi Aro 29             |775           |1,345.78    |26,915.60   |
|Cafeteira Nespresso                |832           |219.57      |4,391.40    |
|Notebook Asus Intel Core i7        |825           |4,590.90    |91,818.00   |
|Aparelho de Barbear OneBlade Philco|818           |129.90      |2,598.00    |
|SPA Intel 700 L                    |839           |5,999.10    |119,982.00  |
|Jogo Mortal Kombat 11 PS4          |847           |49.90       |998.00      |
|Umidificador de Ar Cadence         |788           |77.50       |1,550.00    |
|Blusa Lacoste                      |812           |138.99      |2,779.80    |
|Alcool em Gel 70% Johnson          |837           |4.99        |99.80       |
|Monitor LG 19 pol curvado          |842           |

4) Aplique um desconto de 10% em todas as vendas dos usuários que fizeram pelo menos 3 compras de produtos na mesma categoria, a partir da 4ª compra realizada. Exiba apenas os usuários que terão o desconto aplicado, mantendo todas as compras, o valor original e o valor com o desconto aplicado.

In [57]:
from pyspark.sql import Window
from pyspark.sql.functions import col, when, count, lit, format_number

# INNER JOIN da tabela vendas com a tabela produtos:
df_join = vendas.join(produtos, "cod_produto")

# Definição do particionamento por window functions, por usuário e categoria
windowSpec = Window.partitionBy("cod_usuario", "categoria_produto").orderBy("data_compra")

# Contando o número de compras por usuário em cada categoria e marcando compras que se qualificam para desconto
df_compras = df_join.withColumn("num_compras_categoria", count(lit(1)).over(windowSpec)) \
                    .withColumn("qualifica_desconto", when(col("num_compras_categoria") >= 4, 1).otherwise(0))

# Calculando o valor com desconto e formatando as colunas númericas:
df_com_desconto = df_compras.withColumn("valor_com_desconto", when(col("qualifica_desconto") == 1, col("valor") * 0.9).otherwise(col("valor"))) \
                            .withColumn("valor", format_number(col("valor"), 2)) \
                            .withColumn("valor_produto", format_number(col("valor_produto"), 2)) \
                            .withColumn("valor_com_desconto", format_number(col("valor_com_desconto"), 2))

# Filtrando para exibir apenas os usuários que se qualificados para o desconto
df_resultado = df_com_desconto.filter(col("qualifica_desconto") == 1)

df_resultado.show(truncate=False)
#Fim dos desafios por hora

+-----------+-----------+-----------+----------+---------+-------------------------------+----------------------+-------------+---------------------+------------------+------------------+
|cod_produto|cod_usuario|data_compra|quantidade|valor    |nome_produto                   |categoria_produto     |valor_produto|num_compras_categoria|qualifica_desconto|valor_com_desconto|
+-----------+-----------+-----------+----------+---------+-------------------------------+----------------------+-------------+---------------------+------------------+------------------+
|1          |3          |2021-07-20 |8         |36,727.20|Notebook Asus Intel Core i7    |Tecnologia            |4,590.90     |4                    |1                 |33,054.48         |
|8          |6          |2021-03-23 |2         |439.14   |Cafeteira Nespresso            |Eletrodomesticos      |219.57       |4                    |1                 |395.23            |
|1          |16         |2021-02-16 |9         |41,318.10|No

# FIM!