# Exercícios Spark

Exercícios propostos <a href=https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565>neste</a> link.

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from datetime import *
from pyspark.sql import SparkSession, Window, Row
from datetime import date

# Leitura dos datasets

df_products = spark.read.parquet('s3://datalake-sandbox/raquelbustamante/DatasetToCompleteTheSixSparkExercises/products_parquet/')

df_sales = spark.read.parquet('s3://datalake-sandbox/raquelbustamante/DatasetToCompleteTheSixSparkExercises/sales_parquet/')

df_sellers = spark.read.parquet('s3://datalake-sandbox/raquelbustamante/DatasetToCompleteTheSixSparkExercises/sellers_parquet/')

#### Warm-up #1

Find out how many orders, how many products and how many sellers are in the data. How many products have been sold at least once? Which is the product contained in more orders?

In [0]:
print("Quantidade de produtos:", df_products.count())

In [0]:
print("Quantidade de vendas:", df_sales.count())

In [0]:
print("Quantidade de vendedores:", df_sellers.count())

In [0]:
# How many products have been sold at least once? 

# df_sales.show(5)
print("Quantidade de produtos vendidos pelo menos uma vez")
df_sales.select(countDistinct(col("product_id"))).show()

In [0]:
# Which is the product contained in more orders?

print("Produto mais vendido")
df_sales.groupBy("product_id").agg(count("product_id")).orderBy(count("product_id").desc()).limit(1).show()

In [0]:
df_sales.show(5)

### Warm-up #2
How many distinct products have been sold in each day?

In [0]:
df_sales.groupby(col("date")).agg(countDistinct(col("product_id"))).show()

### Exercise #1

What is the average revenue of the orders?

In [0]:
df_sales_product = df_sales.join(df_products, df_sales["product_id"] == df_products["product_id"])

In [0]:
df_sales_product.show(4)

In [0]:
df_sales_product.agg(avg(df_sales_product["num_pieces_sold"] * df_sales_product["price"]).alias("media preco")).show()

### Exercise #2

For each seller, what is the average % contribution of an order to the seller's daily quota?

In [0]:
df_ratio = df_sales.join(df_sellers, df_sales["seller_id"] == df_sellers["seller_id"], "inner").withColumn("ratio", df_sales["num_pieces_sold"]/df_sellers["daily_target"]).groupBy(df_sales["seller_id"]).agg(avg("ratio"))

In [0]:
df_ratio.show()

### Exercise #3

Para cada produto, quem é segundo funcionário que mais vende e o que menos vende (sellers)? Para produtos com o product_id = 0

Vamos analisar a questão: para cada produto, precisamos do segundo funcionário que mais vende e do que menos vende (vendedores): provavelmente precisaremos de dois rankings, um para obter o segundo e outro para obter o último no gráfico de vendas . Também precisamos lidar com alguns casos extremos:
- Se um produto foi vendido por apenas um vendedor, nós o colocaremos em uma categoria especial (categoria: Vendedor unico ou vários vendedores com a mesma quantidade). 
- Se um produto foi vendido por mais de um vendedor, mas todos venderam a mesma quantidade, vamos colocá-los na mesma categoria como se fossem apenas um único vendedor para aquele produto (categoria: Apenas vendedor ou vários vendedores com a mesma quantidade). 
- Se o “menos vendido” também for o “segundo vendedor”, contaremos apenas como “segundo vendedor”.

Vamos traçar uma estratégia: 
1. Obtemos a soma das vendas de cada par de produto e vendedor. 
2. Adicionamos duas novas colunas de classificação: uma que classifica as vendas dos produtos em ordem decrescente e outra que classifica em ordem crescente. 
3. Dividimos o conjunto de dados obtido em três partes: uma para cada caso que queremos tratar (segundo mais vendido, menos vendido, venda única). Ao calcular o “menos vendido”, excluímos os produtos que têm um único vendedor e aqueles em que o funcionário que menos vende também é o segundo mais vendido
4. Unimos as peças novamente.

In [0]:
# DUVIDA: POR QUE NOS EXERCICIOS ELE "ENCAPSULA" EM "COL"?
# Resposta: só usamos se quisermos

# num_pieces_sold = df_sales.groupBy("product_id", "seller_id").agg(sum("num_pieces_sold").alias("num_pieces_sold"))

# Cáculo do número de peças vendidas por cada vendedor para cada produto

sales_table = df_sales.groupby(col("product_id"), col("seller_id")). \
    agg(sum("num_pieces_sold").alias("num_pieces_sold"))

In [0]:
sales_table.show(3)

In [0]:
# Criar as windows functions, uma ordenará de forma crescente e outra decrescente. Particionada pelo product_id 
# e classificada pelas peças vendidas

# DUVIDA: POR QUE NAO INDICAMOS QUAL É O DF AQUI?
# Resposta: Pois o que utilizamos aqui são apenas a criação de variáveis para aplicar nas funções

window_desc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").desc())
window_asc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").asc())

# CRIAR UM DF DE RANK DE ASC E DESC PRA VER COMO OS DFS FICAM (SEUS RESULTADOS)

In [0]:
# Create a Dense Rank (para evitar buracos)
# DUVIDA: O que é esse Dense Rank?
# Dense Rank é uma função que rankeia os resultados. Nós utilizamos ele quando estamos falando de grupos diferentes, por isso ele é utilizado
# Referencia https://sparkbyexamples.com/spark/spark-sql-window-functions/

sales_table = sales_table.withColumn("rank_asc", dense_rank().over(window_asc)). \
    withColumn("rank_desc", dense_rank().over(window_desc))

# df_sales_wind = df_sales.withColumn("rank_asc", dense_rank().over(window_asc)).withColumn("rank_desc", dense_rank().over(window_desc))
# df_sales_wind.show(100)

In [0]:
# Obtenha os vendedores que menos venderam e exclua aquelas linhas que já foram incluidas no primeiro passo

single_seller = sales_table.where(col("rank_asc") == col("rank_desc")).select(
    col("product_id").alias("single_seller_product_id"), col("seller_id").alias("single_seller_seller_id"),
    lit("Vendedor único ou múltiplos vendedores com os mesmos resultados").alias("type")
)

single_seller.show()
# single_seller = df_sales.where("rank_asc" == "rank_desc").select(("product_id").alias("single_seller_product_id"), ("seller_id").alias("single_seller_seller_id"),lit("Vendedor único ou múltiplos vendedores com os mesmos resultados").alias("type"))                   

In [0]:
#Obtenha o segundo vendedor que mais vendeu
second_seller = sales_table.where(col("rank_desc") == 2).select(
    col("product_id").alias("second_seller_product_id"), col("seller_id").alias("second_seller_seller_id"),
    lit("Segundo vendedor que mais vendeu").alias("type")
)

second_seller.show(3)

In [0]:
# Obtenha os vendedores que menos venderam e excluir as linhas que já estão incluídas na primeira parte
# Excluir também os "segundos que mais venderam" que são também os "que menos venderam".

least_seller = sales_table.where(col("rank_asc") == 1).select(
    col("product_id"), col("seller_id"),
    lit("Vendedores que menos venderam").alias("type")
).join(single_seller, (sales_table["seller_id"] == single_seller["single_seller_seller_id"]) & (
        sales_table["product_id"] == single_seller["single_seller_product_id"]), "left_anti"). \
    join(second_seller, (sales_table["seller_id"] == second_seller["second_seller_seller_id"]) & (
        sales_table["product_id"] == second_seller["second_seller_product_id"]), "left_anti")
least_seller.show()

In [0]:
# Unindo todas as peças
union_table = least_seller.select(
    col("product_id"),
    col("seller_id"),
    col("type")
).union(second_seller.select(
    col("second_seller_product_id").alias("product_id"),
    col("second_seller_seller_id").alias("seller_id"),
    col("type")
)).union(single_seller.select(
    col("single_seller_product_id").alias("product_id"),
    col("single_seller_seller_id").alias("seller_id"),
    col("type")
))

In [0]:
# Qual é o segundo vendedor que mais vendeu e o que menos vendeu do produto 0?
union_table.where(col("product_id") == 0).show()

### Exercise #4

Create a new column called "hashed_bill" defined as follows:
- Se o order_id for par: aplique o hash MD5 iterativamente ao campo bill_raw_text, para cada 'A' (maiúsculo 'A') presente no texto. Por exemplo. se o texto da fatura for 'nbAAnllA', você aplicaria o hash três vezes iterativamente (somente se o número do pedido for par)
- Se o order_id for ímpar: aplique o hash SHA256 ao texto da fatura Por fim, verifique se há alguma duplicata na nova coluna

In [0]:
import hashlib

In [0]:
def algo(order_id, bill_text):
  # Se o número for par
  ret = bill_text.encode("utf-8")
  print('ret: ', ret)
  
  if int(order_id) % 2 == 0:
    # Conta a quantidade de 'A'
    cnt_A = bill_text.count('A')
    print('cnt_A: ', cnt_A)
    for _c in range(0, cnt_A):
      ret = hashlib.md5(ret).hexdigest().encode("utf-8")
    ret = ret.decode('utf-8')
  else:
    ret = ret = hashlib.sha256(ret).hexdigest()
    
  return ret

# Registrar a função UDF
algo_udf = spark.udf.register("algo", algo)

# Use o `algo_udf` para aplicar o agloritmo e depois verifique se há algum hash duplicado na tabela

df_sales.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text"))).groupBy(col("hashed_bill")).agg(count("*").alias("cnt")).where(col("cnt") > 1).show()