# Configuração, Carregamento e Persistência de Dados

In [19]:
!pip install pyspark # Instala a biblioteca PySpark no ambiente



In [20]:
# Importa bibliotecas necessárias, incluindo time para medir a performance
import pyspark
import time

from pyspark.sql import SparkSession  # Importa a classe SparkSession, o ponto de entrada para o Spark
from pyspark.sql.functions import *  # Importa Funções SQL (avg, when, udf, etc.).
from pyspark.storagelevel import StorageLevel # Importa StorageLevel para definir o nível de persistência (ex: memória/disco)

In [21]:
spark = SparkSession.builder.getOrCreate() # Cria ou obtém a SparkSession

In [22]:
from google.colab import drive # Importa o módulo drive do Google Colab.
drive.mount('/content/drive')  # Monta o Google Drive para acesso aos arquivos

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [23]:
# Lê todos os DataFrames do caminho especificado
produtos = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/produtos.csv", header=True, inferSchema=True)
vendedores = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/vendedores.csv", header=True, inferSchema=True)
clientes = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/clientes.csv", header=True, inferSchema=True)
itens_pedido = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/itens_pedido.csv", header=True, inferSchema=True)
pagamentos_pedido = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/pagamentos_pedido.csv", header=True, inferSchema=True)
avaliacoes_pedido = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/avaliacoes_pedido.csv", header=True, inferSchema=True)
pedidos = spark.read.csv("/content/drive/MyDrive/Material de apoio - M27/pedidos.csv", header=True, inferSchema=True)

In [24]:
# Exibe as primeiras linhas de DataFrames selecionados para inspeção
clientes.show() #
pedidos.show() #
pagamentos_pedido.show() #

+--------------------+--------------------+-----------+--------------------+--------------+
|          id_cliente|    id_unico_cliente|cep_cliente|      cidade_cliente|estado_cliente|
+--------------------+--------------------+-----------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|      14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|       9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|       1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|       8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|      13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|      89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|       4534|           sao paulo|            SP|
|5e274e7a0c3809e14...|57b2a98a409812fe9...|      35182|             timoteo|    

In [25]:
clientes.createOrReplaceTempView("clientes") # Transforma o DataFrame em uma View SQL
spark.sql("CACHE TABLE clientes") # Armazena a View clientes no nível de armazenamento padrão (geralmente memória e disco) do warehouse do Spark SQL
spark.sql("SELECT COUNT(*) FROM clientes").show() # Executa uma contagem. Esta primeira execução força o carregamento dos dados para o cache.
spark.sql("UNCACHE TABLE clientes") # Remove a View clientes do cache do Spark SQL

+--------+
|count(1)|
+--------+
|   99441|
+--------+



DataFrame[]

In [26]:
clientes.cache() # Armazena o DataFrame clientes no nível de armazenamento padrão (equivalente a persist(StorageLevel.MEMORY_AND_DISK))

clientes.persist(StorageLevel.MEMORY_AND_DISK_2) # Armazena o DataFrame usando o nível MEMORY_AND_DISK_2, o que significa que os dados serão armazenados em memória e em disco, com duas réplicas (tolerância a falhas)

clientes_estado_sp_df = clientes.filter(clientes['estado_cliente'] == 'SP') # Cria um novo DataFrame filtrando apenas clientes do estado 'SP'. O dado do DataFrame clientes será lido da memória/disco por causa do persist()
clientes_estado_sp_df.show() # Exibe os clientes filtrados

clientes.unpersist() # Remove o DataFrame clientes da persistência (libera a memória/disco)

+--------------------+--------------------+-----------+--------------------+--------------+
|          id_cliente|    id_unico_cliente|cep_cliente|      cidade_cliente|estado_cliente|
+--------------------+--------------------+-----------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|      14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|       9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|       1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|       8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|      13056|            campinas|            SP|
|fd826e7cf63160e53...|addec96d2e059c80c...|       4534|           sao paulo|            SP|
|b2d1536598b73a9ab...|918dc87cd72cd9f6e...|      18682|    lencois paulista|            SP|
|eabebad39a88bb6f5...|295c05e81917928d7...|       5704|           sao paulo|    

DataFrame[id_cliente: string, id_unico_cliente: string, cep_cliente: int, cidade_cliente: string, estado_cliente: string]

# Otimização de Join (Broadcast)

In [27]:
print(itens_pedido.count()) # Exibe a contagem de registros dos dois DataFrames para determinar qual é o menor
print(pedidos.count())

small_pedidos_df = broadcast(pedidos) # Envolve o DataFrame pedidos com a função broadcast(). Isso sinaliza ao Spark para enviar (broadcast) todo o DataFrame menor para todos os nós do cluster antes de realizar o join

broadcast_data = itens_pedido.join(small_pedidos_df, on='id_pedido') # Realiza o join do DataFrame maior (itens_pedido) com o DataFrame menor transmitido (small_pedidos_df)
broadcast_data.show() # Exibe o resultado do join otimizado.

112650
99441
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+--------------------+-------------+-------------------+---------------------+-------------------------+--------------------+---------------------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|valor_frete|          id_cliente|status_pedido| data_compra_pedido|data_aprovacao_pedido|data_envio_transportadora|data_entrega_cliente|data_estimada_entrega|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+--------------------+-------------+-------------------+---------------------+-------------------------+--------------------+---------------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|      13.29|3ce436f183e68e078...|    delivered|2017-09-13 08:59:02|  2017-09-13 09:45:

# Comparação de Estratégias de Filtragem

In [28]:
salario_media_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0] # Calcula a média e usa .collect() para forçar a execução da computação e transferir o valor escalar (média) para a memória do Python (driver)
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido['valor_pagamento'] > salario_media_valor) # Filtra o DataFrame usando o valor Python coletado. Desvantagem: Força uma execução e transferência de dados entre o cluster e o driver


# Realiza um join não equitativo (non-equi join) onde a condição é pagamento > media
# O Spark otimiza isso internamente como uma subconsulta
# Vantagem: Permanece totalmente no cluster do Spark
salario_medio_df = pagamentos_pedido.select(avg('valor_pagamento').alias('pagamento_medio')) # Calcula a média e a armazena em um DataFrame de uma linha
pedidos_filtrados_join_df = pagamentos_pedido.join(  # Realiza um join não equitativo (non-equi join) na condição de que o valor do pagamento é maior que o valor médio no DataFrame de uma linha. Mantém a lógica totalmente distribuída
    salario_medio_df,
    pagamentos_pedido['valor_pagamento'] > salario_medio_df['pagamento_medio']
).select(pagamentos_pedido["*"])

salario_medio_df.show() # Exibe o DataFrame de valor médio
pedidos_filtrados_join_df.show() # Exibe o resultado da filtragem usando a técnica de join

+------------------+
|   pagamento_medio|
+------------------+
|154.10038041698573|
+------------------+

+--------------------+-------------------+--------------+------------------+---------------+
|           id_pedido|sequencia_pagamento|tipo_pagamento|parcelas_pagamento|valor_pagamento|
+--------------------+-------------------+--------------+------------------+---------------+
|1f78449c87a54faf9...|                  1|   credit_card|                 6|         341.09|
|d88e0d5fa41661ce0...|                  1|   credit_card|                 8|         188.73|
|12e5cfe0e4716b59a...|                  1|   credit_card|                10|         157.45|
|8ac09207f415d55ac...|                  1|   credit_card|                 4|         244.15|
|4214cda550ece8ee6...|                  1|   credit_card|                 2|         170.57|
|4d680edbaa7d3d9be...|                  1|   credit_card|                10|         353.09|
|8cd68144cdb62dc0d...|                  1|        boleto|

In [29]:
salario_medio_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0] # Calcula a média de valor_pagamento e usa .collect()[0][0] para forçar o cálculo imediato e transferir o valor escalar resultante para a memória do driver Python
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido['valor_pagamento'] > salario_medio_valor) # Filtra o DataFrame usando o valor salario_medio_valor que agora é uma variável nativa do Python. Esta abordagem exige uma pausa no processamento distribuído para a coleta de dados

print("plano de execução - collect") # Imprime o nome da estratégia para identificação no console
pedidos_filtrados_df.explain() # Exibe o Plano de Execução Lógico e Físico do Spark para a Estratégia 1. O plano pode mostrar uma leitura simples seguida por uma filtragem, pois o valor de corte já é conhecido e fixo

salario_medio_df = pagamentos_pedido.select(avg('valor_pagamento').alias('pagamento_medio')) # Calcula a média, mas armazena o resultado em um novo DataFrame de uma linha, mantendo a lógica no cluster do Spark
pedidos_filtrados_join_df = pagamentos_pedido.join( # Inicia a operação de join
    salario_medio_df, # Define o DataFrame de uma linha com a média como o lado a ser unido
    pagamentos_pedido['valor_pagamento'] > salario_medio_df["pagamento_medio"] # A condição de união é definida como valor_pagamento do DataFrame original sendo maior que a pagamento_medio do DataFrame da média
).select(pagamentos_pedido["*"]) # Seleciona todas as colunas do DataFrame original após a filtragem implícita pelo join.

print("plano de execução - join") # Imprime o nome da estratégia para identificação no console
pedidos_filtrados_join_df.explain() # Exibe o Plano de Execução do Spark para a Estratégia 2

plano de execução - collect
== Physical Plan ==
*(1) Filter (isnotnull(valor_pagamento#1255) AND (valor_pagamento#1255 > 154.10038041698573))
+- FileScan csv [id_pedido#1251,sequencia_pagamento#1252,tipo_pagamento#1253,parcelas_pagamento#1254,valor_pagamento#1255] Batched: false, DataFilters: [isnotnull(valor_pagamento#1255), (valor_pagamento#1255 > 154.10038041698573)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Material de apoio - M27/pagamentos_pedido...., PartitionFilters: [], PushedFilters: [IsNotNull(valor_pagamento), GreaterThan(valor_pagamento,154.10038041698573)], ReadSchema: struct<id_pedido:string,sequencia_pagamento:int,tipo_pagamento:string,parcelas_pagamento:int,valo...


plano de execução - join
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id_pedido#1251, sequencia_pagamento#1252, tipo_pagamento#1253, parcelas_pagamento#1254, valor_pagamento#1255]
   +- BroadcastNestedLoopJoin BuildRight, Inner, (valor_pagamento#1

# Medição e Comparação de Performance (Collect vs. Join)

In [30]:
start_time = time.time() # Marca o ponto de partida do tempo para a Estratégia 1

salario_medio_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0] # Calcula a média. O uso de .collect() força a execução do cálculo e a transferência do valor resultante para a memória do Python (driver)
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido['valor_pagamento'] > salario_medio_valor) #  Filtra o DataFrame usando o valor coletado

end_time = time.time() # Marca o tempo final da Estratégia 1
print(end_time - start_time) # Imprime o tempo total de execução gasto pela Estratégia 1 (cálculo + coleta + filtragem).

start_time = time.time() # Marca o ponto de partida do tempo para a Estratégia 2
salario_medio_df = pagamentos_pedido.select(avg("valor_pagamento").alias("pagamento-medio")) # Calcula a média e a armazena em um DataFrame de uma linha, mantendo os dados no cluster do Spark.
pedidos_filtrados_join_df = pagamentos_pedido.join( # Inicia a operação de join
    salario_medio_df, # Define o DataFrame de uma linha com a média como o lado a ser unido
    pagamentos_pedido["valor_pagamento"] > salario_medio_df["pagamento-medio"] # Realiza um join não equitativo, comparando o valor do pagamento com o valor da média diretamente no ambiente distribuído
).select(pagamentos_pedido["*"]) # Seleciona as colunas do DataFrame original que passaram na condição de filtro

end_time = time.time() # Marca o tempo final da Estratégia 2
print(end_time - start_time) # Imprime o tempo total de execução gasto pela Estratégia 2 (que é processada de ponta a ponta pelo otimizador do Spark)

0.2366342544555664
0.026787996292114258


# Comparação de Performance e Otimização (UDF vs. Nativo)

In [31]:
start_time = time.time() # Marca o ponto de partida do tempo para a Estratégia 1 (UDF)

# Define a lógica de classificação de preço em "baixo", "médio" ou "alto". Esta função será executada fora do motor otimizado do Spark
def classificar_preco(preco):
  if preco < 50: # Primeira condição de preço
    return 'baixo' # Retorno para a primeira condição
  elif preco >= 50 and preco < 500: # Segunda condição de preço (intervalo entre 50 e 500)
    return 'médio' # Retorno para a segunda condição
  else: # Condição final (preço maior ou igual a 500)
    return 'alto'  # Retorno final
classificar_preco_udf = udf(classificar_preco, StringType()) # Registra a função Python como uma UDF do Spark

pedidos_classificacao_udf_df = itens_pedido.withColumn('classificacao_preco', classificar_preco_udf(itens_pedido['preco'])) # Cria a coluna classificacao_preco aplicando a UDF sobre a coluna preco
end_time = time.time() # Marca o tempo final da Estratégia 1
print(end_time - start_time) # Imprime o tempo total de execução gasto pela UDF. Este valor será geralmente maior, refletindo a sobrecarga da execução do código Python
pedidos_classificacao_udf_df.explain() # Exibe o Plano de Execução do Spark.

start_time = time.time() # Marca o ponto de partida do tempo para a Estratégia 2 (Nativa)
pedidos_classificacao_df = itens_pedido.withColumn( 'classificacao_preco', # Cria a coluna classificacao_preco...
                                                   when(itens_pedido['preco'] < 50, 'baixo') # ..usando a função nativa when para a primeira condição
                                                   .when((itens_pedido['preco'] >= 50) & (itens_pedido['preco'] < 500), 'médio') # ..usando outra when para a segunda condição
                                                   .otherwise('alto')) # ..e a função nativa otherwise para o caso final

end_time = time.time() # Marca o tempo final da Estratégia 2
print(end_time - start_time) # Imprime o tempo total de execução gasto pelas funções nativas
pedidos_classificacao_df.explain() # Exibe o Plano de Execução do Spark. O plano não mostrará o "PythonUDF" e exibirá a lógica como uma expressão SQL otimizada

0.020650148391723633
== Physical Plan ==
*(1) Project [id_pedido#1220, item_id_pedido#1221, id_produto#1222, id_vendedor#1223, data_limite_envio#1224, preco#1225, valor_frete#1226, pythonUDF0#2185 AS classificacao_preco#2176]
+- BatchEvalPython [classificar_preco(preco#1225)#2175], [pythonUDF0#2185]
   +- FileScan csv [id_pedido#1220,item_id_pedido#1221,id_produto#1222,id_vendedor#1223,data_limite_envio#1224,preco#1225,valor_frete#1226] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Material de apoio - M27/itens_pedido.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id_pedido:string,item_id_pedido:int,id_produto:string,id_vendedor:string,data_limite_envio...


0.013969898223876953
== Physical Plan ==
*(1) Project [id_pedido#1220, item_id_pedido#1221, id_produto#1222, id_vendedor#1223, data_limite_envio#1224, preco#1225, valor_frete#1226, CASE WHEN (preco#1225 < 50.0) THEN baixo WHEN ((preco#1225 >= 50.0) 