# Setup Geral

Se estiver executando este exercício no Google Colab, execute as próximas duas células. 

Caso esteja executando localmente, não é necessário executar mas certifique-se de que o **pyspark** está instalado e configurado em sua máquina.

In [None]:
%%bash

# Instal Java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# PySpark

**setup**:

In [87]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadeSQL").getOrCreate()

In [267]:
def cria_tabela(path, nome_tabela):
    df = spark.read.csv(path, inferSchema=True, header=True)
    df.createOrReplaceTempView(nome_tabela)
    return df

usuarios = cria_tabela("bases_teste/usuarios.csv", "usuarios")
produtos = cria_tabela("bases_teste/produtos.csv", "produtos")
vendas = cria_tabela("bases_teste/vendas.csv", "vendas")

Responda às perguntas a seguir utilizando **Spark DATAFRAMES**.

1) Qual foi o total de compras realizadas, o total de itens comprados e a receita total obtida em todo o período analisado?

In [171]:
from pyspark.sql.functions import format_string

df_vendas = spark.read.table("vendas")

df_resposta = df_vendas.agg(
    count("*").alias("total_compras"),
    sum("quantidade").alias("total_itens"),
    sum("valor").alias("receita_total"),
)

df_resposta = df_resposta.withColumn(
    "receita_total", format_string("%.0f", df_resposta["receita_total"])
)


df_resposta.show()


+-------------+-----------+-------------+
|total_compras|total_itens|receita_total|
+-------------+-----------+-------------+
|        20000|     209149|    215849002|
+-------------+-----------+-------------+



2) Quais são os 3 produtos mais comprados dos estados da região Sul e Sudeste, a quantidade de itens comprados, o valor total pago e a média de preço paga?

In [269]:
from pyspark.sql.functions import col, count, dense_rank, desc, format_number, sum
from pyspark.sql.window import Window

df_usuarios = spark.read.table("usuarios")
df_vendas = spark.read.table("vendas")
df_produtos = spark.read.table("produtos")

estados_considerados = [
    ("São Paulo",),
    ("Rio de Janeiro",),
    ("Minas Gerais",),
    ("Espírito Santo",),
    ("Paraná",),
    ("Santa Catarina",),
    ("Rio Grande do Sul",),
]


df_produtos = df_produtos["cod_produto", "nome_produto"]

df_estados_considerados = spark.createDataFrame(estados_considerados, ["estado"])


df_usuarios_nos_estados = df_usuarios.join(
    df_estados_considerados,
    df_usuarios.estado == df_estados_considerados.estado,
    "inner",
)


df_compras_por_usuario = df_usuarios_nos_estados.join(
    df_vendas, df_vendas.cod_usuario == df_usuarios_nos_estados.cod_usuario, "inner"
)


df_compras_por_usuario = df_compras_por_usuario[
    "usuarios.cod_usuario",
    "usuarios.estado",
    "cod_produto",
    "quantidade",
    "valor",
]


df_agrupado_por_produto_estado = df_compras_por_usuario.groupBy(
    "cod_produto", "estado"
).agg(sum("quantidade").alias("qtd_itens"), sum("valor").alias("valor_total"))


window_spec = Window.partitionBy("estado").orderBy(desc("qtd_itens"))
df_rank_por_produto_estado = df_agrupado_por_produto_estado.select(
    "cod_produto",
    "estado",
    "qtd_itens",
    "valor_total",
    dense_rank().over(window_spec).alias("rank"),
)


df_rank_por_produto_estado = df_rank_por_produto_estado.filter(col("rank") <= 3)

df_rank_por_produto_estado = df_rank_por_produto_estado.withColumn(
    "media_preco", col("valor_total") / col("qtd_itens")
)


df_resposta = df_rank_por_produto_estado.join(
    df_produtos,
    df_produtos.cod_produto == df_rank_por_produto_estado.cod_produto,
    "inner",
)

df_resposta = df_resposta[
    "vendas.cod_produto",
    "nome_produto",
    "estado",
    "qtd_itens",
    "valor_total",
    "rank",
    "media_preco",
]

df_resposta = df_resposta.withColumn("valor_total", format_number("valor_total", 2))
df_resposta = df_resposta.withColumn("media_preco", format_number("media_preco", 2))

df_resposta.show(truncate=False, n=100)


+-----------+--------------------------------+-----------------+---------+-----------+----+-----------+
|cod_produto|nome_produto                    |estado           |qtd_itens|valor_total|rank|media_preco|
+-----------+--------------------------------+-----------------+---------+-----------+----+-----------+
|15         |Alcool em Gel 70% Johnson       |Minas Gerais     |165      |823.35     |1   |4.99       |
|5          |Escrivaninha em L               |Minas Gerais     |78       |34,086.00  |2   |437.00     |
|1          |Notebook Asus Intel Core i7     |Minas Gerais     |71       |325,953.90 |3   |4,590.90   |
|11         |Jogo Mortal Kombat 11 PS4       |Paraná           |35       |1,746.50   |1   |49.90      |
|10         |Cadeira Gamer Xpress            |Paraná           |27       |18,897.30  |2   |699.90     |
|19         |SmartTV Samsung 4K 55 polegadas |Paraná           |25       |62,497.50  |3   |2,499.90   |
|15         |Alcool em Gel 70% Johnson       |Rio Grande do Sul|

3) Para cada produto, quantos usuários fizeram pelo menos uma compra desse produto e qual é o valor mínimo e máximo pago por eles? 

In [270]:
from pyspark.sql.functions import col, countDistinct, desc, format_number, min, max

df_vendas = spark.read.table("vendas")
df_produtos = spark.read.table("produtos")


df_vendas = df_vendas["cod_produto", "cod_usuario", "quantidade", "valor"]
df_vendas = df_vendas.withColumn("valor_unitario", col("valor") / col("quantidade"))

df_clientes_por_produto = df_vendas.groupBy("cod_produto").agg(
    min("valor_unitario").alias("valor_unitario_minimo"),
    max("valor_unitario").alias("valor_unitario_maximo"),
    countDistinct("cod_usuario").alias("qtd_usuarios"),
)

df_resposta = df_produtos.join(
    df_clientes_por_produto,
    df_produtos.cod_produto == df_clientes_por_produto.cod_produto,
    "left",
)

df_resposta = df_resposta.na.fill(
    0, subset=["qtd_usuarios", "valor_unitario_minimo", "valor_unitario_maximo"]
)

df_resposta = df_resposta.withColumn(
    "valor_unitario_minimo", format_number("valor_unitario_minimo", 2)
)
df_resposta = df_resposta.withColumn(
    "valor_unitario_maximo", format_number("valor_unitario_maximo", 2)
)

df_resposta = df_resposta[
    "produtos.cod_produto",
    "nome_produto",
    "qtd_usuarios",
    "valor_unitario_minimo",
    "valor_unitario_maximo",
]


df_resposta = df_resposta.orderBy(desc("qtd_usuarios"))
df_resposta.show(truncate=False, n=100)


+-----------+-----------------------------------+------------+---------------------+---------------------+
|cod_produto|nome_produto                       |qtd_usuarios|valor_unitario_minimo|valor_unitario_maximo|
+-----------+-----------------------------------+------------+---------------------+---------------------+
|18         |Lenço umedecido Turma da Monica    |880         |7.90                 |7.90                 |
|7          |Ar-condicionado 9500 BTUs LG       |863         |1,300.49             |1,300.49             |
|5          |Escrivaninha em L                  |860         |437.00               |437.00               |
|19         |SmartTV Samsung 4K 55 polegadas    |856         |2,499.90             |2,499.90             |
|9          |Microoondas 35L                    |847         |389.10               |389.10               |
|11         |Jogo Mortal Kombat 11 PS4          |847         |49.90                |49.90                |
|6          |Sofa 3 lugares          

4) Aplique um desconto de 10% em todas as vendas dos usuários que fizeram pelo menos 3 compras de produtos na mesma categoria, a partir da 4ª compra realizada. Exiba apenas os usuários que terão o desconto aplicado, mantendo todas as compras, o valor original e o valor com o desconto aplicado.

In [357]:
from pyspark.sql.functions import asc, col, date_format, dense_rank, desc, format_number, when

from pyspark.sql.window import Window

df_vendas = spark.read.table("vendas")
df_produtos = spark.read.table("produtos")

df_vendas_por_categoria = df_vendas.join(
    df_produtos, df_vendas.cod_produto == df_produtos.cod_produto, "inner"
)

df_vendas_por_categoria = df_vendas_por_categoria[
    "cod_usuario",
    "produtos.cod_produto",
    "nome_produto",
    "categoria_produto",
    "data_compra",
    "quantidade",
    "valor",
]

window_spec = Window.partitionBy("cod_usuario", "categoria_produto").orderBy(
    "data_compra"
)

df_vendas_por_categoria = df_vendas_por_categoria.select(
    "cod_usuario",
    "cod_produto",
    "nome_produto",
    "categoria_produto",
    "data_compra",
    "quantidade",
    "valor",
    dense_rank().over(window_spec).alias("ordem_compra"),
)


df_vendas_por_categoria = df_vendas_por_categoria.withColumn(
    "valor_com_desconto",
    when(df_vendas_por_categoria.ordem_compra > 3, col("valor") * 0.9).otherwise(
        col("valor")
    ),
)


df_vendas_por_categoria = df_vendas_por_categoria.withColumn(
    "valor", format_number("valor", 2)
)

df_vendas_por_categoria = df_vendas_por_categoria.withColumn(
    "valor_com_desconto", format_number("valor_com_desconto", 2)
)


df_usuarios_a_exibir = df_vendas_por_categoria[["cod_usuario", "categoria_produto"]].filter(col("ordem_compra") > 3).dropDuplicates()

df_usuarios_a_exibir = df_usuarios_a_exibir.withColumnRenamed("cod_usuario", "cod_usuario_a_exibir")
df_usuarios_a_exibir = df_usuarios_a_exibir.withColumnRenamed("categoria_produto", "categoria_produto_a_exibir")


df_resposta = df_vendas_por_categoria.join(df_usuarios_a_exibir
                                           , (df_vendas_por_categoria.cod_usuario == df_usuarios_a_exibir.cod_usuario_a_exibir) &
                                           (df_vendas_por_categoria.categoria_produto == df_usuarios_a_exibir.categoria_produto_a_exibir) 
                                           , "inner")


df_resposta = df_resposta["cod_usuario"
                        , "produtos.cod_produto"
                        , "produtos.categoria_produto"
                        , "produtos.nome_produto"
                        , "data_compra"
                        , "ordem_compra"
                        , "valor"
                        , "valor_com_desconto"
                        ,]

df_resposta = df_resposta.withColumn("data_compra", date_format("data_compra", "yyyy-MM-dd"))

df_resposta = df_resposta.orderBy(
     asc("cod_usuario"), asc("categoria_produto"), desc("ordem_compra")
)


df_resposta.show( n= 50)

+-----------+-----------+--------------------+--------------------+-----------+------------+---------+------------------+
|cod_usuario|cod_produto|   categoria_produto|        nome_produto|data_compra|ordem_compra|    valor|valor_com_desconto|
+-----------+-----------+--------------------+--------------------+-----------+------------+---------+------------------+
|          3|          1|          Tecnologia|Notebook Asus Int...| 2021-07-20|           4|36,727.20|         33,054.48|
|          3|          3|          Tecnologia|  Smartphone Samsung| 2021-07-18|           3|20,495.00|         20,495.00|
|          3|          3|          Tecnologia|  Smartphone Samsung| 2021-04-14|           2|18,445.50|         18,445.50|
|          3|          1|          Tecnologia|Notebook Asus Int...| 2021-03-24|           1|18,363.60|         18,363.60|
|          6|          8|    Eletrodomesticos| Cafeteira Nespresso| 2021-03-23|           4|   439.14|            395.23|
|          6|          9