In [1]:
! pip install pyspark



In [3]:
import pyspark
import time

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.storagelevel import StorageLevel

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
produtos = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/produtos.csv', header=True, inferSchema=True)
vendedores = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/vendedores.csv', header=True, inferSchema=True)
clientes = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/clientes.csv', header=True, inferSchema=True)
itens_pedido = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/itens_pedido.csv', header=True, inferSchema=True)
pagamentos_pedido = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/pagamentos_pedido.csv', header=True, inferSchema=True)
avaliacoes_pedido = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/avaliacoes_pedido.csv', header=True, inferSchema=True)
pedidos = spark.read.csv('drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/pedidos.csv', header=True, inferSchema=True)

In [6]:
clientes.show()
pedidos.show()
itens_pedido.show()
pagamentos_pedido.show()

+--------------------+--------------------+-----------+--------------------+--------------+
|          id_cliente|    id_unico_cliente|cep_cliente|      cidade_cliente|estado_cliente|
+--------------------+--------------------+-----------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|      14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|       9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|       1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|       8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|      13056|            campinas|            SP|
|879864dab9bc30475...|4c93744516667ad3b...|      89254|      jaragua do sul|            SC|
|fd826e7cf63160e53...|addec96d2e059c80c...|       4534|           sao paulo|            SP|
|5e274e7a0c3809e14...|57b2a98a409812fe9...|      35182|             timoteo|    

In [7]:
clientes.createOrReplaceTempView('clientes')
spark.sql('CACHE TABLE clientes')
spark.sql('SELECT COUNT(*) FROM clientes').show()
spark.sql('UNCACHE TABLE clientes')

+--------+
|count(1)|
+--------+
|   99441|
+--------+



DataFrame[]

In [8]:
clientes.cache() # = persist(StorageLevel.MEMORY_AND_DISK)

clientes_estado_sp_df = clientes.filter(clientes.estado_cliente == 'SP')
clientes_estado_sp_df.show()

clientes.unpersist()

+--------------------+--------------------+-----------+--------------------+--------------+
|          id_cliente|    id_unico_cliente|cep_cliente|      cidade_cliente|estado_cliente|
+--------------------+--------------------+-----------+--------------------+--------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|      14409|              franca|            SP|
|18955e83d337fd6b2...|290c77bc529b7ac93...|       9790|sao bernardo do c...|            SP|
|4e7b3e00288586ebd...|060e732b5b29e8181...|       1151|           sao paulo|            SP|
|b2b6027bc5c5109e5...|259dac757896d24d7...|       8775|     mogi das cruzes|            SP|
|4f2d8ab171c80ec83...|345ecd01c38d18a90...|      13056|            campinas|            SP|
|fd826e7cf63160e53...|addec96d2e059c80c...|       4534|           sao paulo|            SP|
|b2d1536598b73a9ab...|918dc87cd72cd9f6e...|      18682|    lencois paulista|            SP|
|eabebad39a88bb6f5...|295c05e81917928d7...|       5704|           sao paulo|    

DataFrame[id_cliente: string, id_unico_cliente: string, cep_cliente: int, cidade_cliente: string, estado_cliente: string]

In [9]:
'''
Utilizar o broadcast somente quando se deseja compartilhar uma variável pequena
e imutável (como um DataFrame ou dicionário pequeno) com todos os nós de trabalho (workers)
de forma eficiente. Isso evita que o Spark envie essa variável repetidamente a cada tarefa,
economizando largura de banda e tempo de execução.
'''

print(itens_pedido.count())
print(pedidos.count())

small_pedidos_df = broadcast(pedidos)

broadcast_data = itens_pedido.join(small_pedidos_df, on='id_pedido')
broadcast_data.show()

112650
99441
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+--------------------+-------------+-------------------+---------------------+-------------------------+--------------------+---------------------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|valor_frete|          id_cliente|status_pedido| data_compra_pedido|data_aprovacao_pedido|data_envio_transportadora|data_entrega_cliente|data_estimada_entrega|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+--------------------+-------------+-------------------+---------------------+-------------------------+--------------------+---------------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|      13.29|3ce436f183e68e078...|    delivered|2017-09-13 08:59:02|  2017-09-13 09:45:

In [10]:
pagamento_medio_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0]
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido.valor_pagamento > pagamento_medio_valor)

pagamento_medio_df = pagamentos_pedido.select(avg('valor_pagamento').alias('pagamento_medio'))
pedidos_filtrados_join_df = pagamentos_pedido.join(
    pagamento_medio_df,
    pagamentos_pedido.valor_pagamento > pagamento_medio_df.pagamento_medio
).select(pagamentos_pedido['*'])

pagamento_medio_df.show()
pedidos_filtrados_join_df.show()

+------------------+
|   pagamento_medio|
+------------------+
|154.10038041698573|
+------------------+

+--------------------+-------------------+--------------+------------------+---------------+
|           id_pedido|sequencia_pagamento|tipo_pagamento|parcelas_pagamento|valor_pagamento|
+--------------------+-------------------+--------------+------------------+---------------+
|1f78449c87a54faf9...|                  1|   credit_card|                 6|         341.09|
|d88e0d5fa41661ce0...|                  1|   credit_card|                 8|         188.73|
|12e5cfe0e4716b59a...|                  1|   credit_card|                10|         157.45|
|8ac09207f415d55ac...|                  1|   credit_card|                 4|         244.15|
|4214cda550ece8ee6...|                  1|   credit_card|                 2|         170.57|
|4d680edbaa7d3d9be...|                  1|   credit_card|                10|         353.09|
|8cd68144cdb62dc0d...|                  1|        boleto|

In [12]:
# utilizando o Explain para entender as diferenças nas etapas de execução entre os dois métodos:

pagamento_medio_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0]
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido.valor_pagamento > pagamento_medio_valor)

print('Plano de Execução - Collect')
pedidos_filtrados_df.explain()

pagamento_medio_df = pagamentos_pedido.select(avg('valor_pagamento').alias('pagamento_medio'))
pedidos_filtrados_join_df = pagamentos_pedido.join(
    pagamento_medio_df,
    pagamentos_pedido.valor_pagamento > pagamento_medio_df.pagamento_medio
).select(pagamentos_pedido['*'])

print('Plano de Execução - Join')
pedidos_filtrados_join_df.explain()

Plano de Execução - Collect
== Physical Plan ==
*(1) Filter (isnotnull(valor_pagamento#139) AND (valor_pagamento#139 > 154.10038041698573))
+- FileScan csv [id_pedido#135,sequencia_pagamento#136,tipo_pagamento#137,parcelas_pagamento#138,valor_pagamento#139] Batched: false, DataFilters: [isnotnull(valor_pagamento#139), (valor_pagamento#139 > 154.10038041698573)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/Colab Notebooks/colab_ebac/spark_data/paga..., PartitionFilters: [], PushedFilters: [IsNotNull(valor_pagamento), GreaterThan(valor_pagamento,154.10038041698573)], ReadSchema: struct<id_pedido:string,sequencia_pagamento:int,tipo_pagamento:string,parcelas_pagamento:int,valo...


Plano de Execução - Join
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id_pedido#135, sequencia_pagamento#136, tipo_pagamento#137, parcelas_pagamento#138, valor_pagamento#139]
   +- BroadcastNestedLoopJoin BuildRight, Inner, (valor_pagamento#139 > pagamento

In [16]:
# Comparando a performance utilizando a função time:

start_time = time.time()

pagamento_medio_valor = pagamentos_pedido.agg(avg('valor_pagamento').alias('pagamento_medio')).collect()[0][0]
pedidos_filtrados_df = pagamentos_pedido.filter(pagamentos_pedido.valor_pagamento > pagamento_medio_valor)

end_time = time.time()
print(f'Collect: {end_time - start_time}')

start_time = time.time()

pagamento_medio_df = pagamentos_pedido.select(avg('valor_pagamento').alias('pagamento_medio'))
pedidos_filtrados_join_df = pagamentos_pedido.join(
    pagamento_medio_df,
    pagamentos_pedido.valor_pagamento > pagamento_medio_df.pagamento_medio
).select(pagamentos_pedido['*'])

end_time = time.time()
print(f'Join: {end_time - start_time}')

Collect: 0.37997913360595703
Join: 0.03263139724731445


In [17]:
spark.stop()