In [1]:
from pyspark.sql import SparkSession, dataframe
from pyspark.sql.functions import when, col, sum, count, isnan, round
from pyspark.sql.functions import regexp_replace, concat_ws, sha2, rtrim, substring
from pyspark.sql.functions import unix_timestamp, from_unixtime, to_date
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql import HiveContext
from pyspark.sql.functions import year, month, dayofmonth, quarter
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import trim, regexp_replace, when, col
from pyspark.sql.functions import regexp_replace

import os
import re

from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when

spark = SparkSession.builder.master("local[*]")\
    .enableHiveSupport()\
    .getOrCreate()

In [2]:
# Carregar tabelas endereco e remover a primeira linha do cabeçalho

df_endereco = spark.read.table("desafio_curso.endereco")
rdd = df_endereco.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
df_endereco = rdd.toDF(df_endereco.schema)

df_clientes = spark.read.table("desafio_curso.clientes")
rdd = df_clientes.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
df_clientes = rdd.toDF(df_clientes.schema)

df_divisao = spark.read.table("desafio_curso.divisao")
rdd = df_divisao.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
df_divisao = rdd.toDF(df_divisao.schema)

df_regiao = spark.read.table("desafio_curso.regiao")
rdd = df_regiao.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
df_regiao = rdd.toDF(df_regiao.schema)

df_vendas = spark.read.table("desafio_curso.vendas")
rdd = df_vendas.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])
df_vendas = rdd.toDF(df_vendas.schema)

In [3]:
# 1 - soma de sales_amount
print("Total de valor de vendas:")
df_vendas = df_vendas.withColumn("sales_amount", regexp_replace("sales_amount", ",", "."))
resultado = df_vendas.agg({"sales_amount": "sum"}).withColumnRenamed("sum(sales_amount)", "total_vendas")
resultado_decimal = resultado.select(resultado["total_vendas"].cast(DecimalType(18, 2)).alias("total_vendas_decimal"))
resultado_decimal.show()

Total de valor de vendas:
+--------------------+
|total_vendas_decimal|
+--------------------+
|        186186769.05|
+--------------------+



In [4]:
# 2 - soma de sales_quantity
print("Total produtos vendidos:")
total_vendas = df_vendas.agg(sum('sales_quantity').alias('total_vendas'))
total_vendas.show()

Total produtos vendidos:
+------------+
|total_vendas|
+------------+
|   2943194.0|
+------------+



In [5]:
# 3 - produto mais vendido
print("Produto mais vendido:")
agrupado_por_item = df_vendas.groupBy('item').agg(sum('sales_quantity').alias('quantidade_total'))
produto_mais_vendido = agrupado_por_item.orderBy('quantidade_total', ascending=False).limit(1)
produto_mais_vendido.show()

Produto mais vendido:
+--------------------+----------------+
|                item|quantidade_total|
+--------------------+----------------+
|Better Large Cann...|        590343.0|
+--------------------+----------------+



In [6]:
# 4 - 5 produtos mais sales_quantity
print("5 Produtos mais vendidos:")
resultado = df_vendas.groupBy('item').agg(sum('sales_quantity').alias('quantidade_total')) \
            .orderBy('quantidade_total', ascending=False) \
            .limit(5)
resultado.show()

5 Produtos mais vendidos:
+--------------------+----------------+
|                item|quantidade_total|
+--------------------+----------------+
|Better Large Cann...|        590343.0|
|High Top Dried Mu...|        377259.0|
|Better Canned Tun...|        266996.0|
|   Walrus Chardonnay|        212022.0|
|Red Spade Pimento...|        163296.0|
+--------------------+----------------+



In [7]:
# 5 - 5 produtos mais sales_amount
print("5 Produtos com maior valor de venda:")
resultado = df_vendas.groupBy('item').agg(sum('sales_amount').alias('valor_total_vendas')) \
            .orderBy('valor_total_vendas', ascending=False) \
            .limit(5) \
            .withColumn('valor_total_vendas', col('valor_total_vendas').cast(DecimalType(18, 2)))

resultado.show()

5 Produtos com maior valor de venda:
+--------------------+------------------+
|                item|valor_total_vendas|
+--------------------+------------------+
|Better Large Cann...|       15454172.47|
|High Top Dried Mu...|       13368414.53|
|Red Spade Pimento...|        5711486.45|
|Better Canned Tun...|        5693075.12|
|        Ebony Squash|        5380727.75|
+--------------------+------------------+



In [8]:
# 6 - sales_quantity por mes e  sales_amount por mes
print("Valor de venda e quantidade de vendas por mês:")

from pyspark.sql.functions import col, sum, to_date, date_format

df_vendas = df_vendas.withColumn("month", date_format(to_date(col("invoice_date"), "dd/MM/yyyy"), "MM"))
vendas_por_mes = df_vendas.groupBy("month")\
                    .agg(sum("sales_quantity").alias("quantidade_total"), sum("sales_amount").alias("valor_total_vendas"))\
                    .orderBy('month')\
                    .withColumn('valor_total_vendas', col('valor_total_vendas').cast(DecimalType(18, 2)))

vendas_por_mes.show()

Valor de venda e quantidade de vendas por mês:
+-----+----------------+------------------+
|month|quantidade_total|valor_total_vendas|
+-----+----------------+------------------+
| null|            null|              null|
|   01|        325560.0|       19471739.54|
|   02|        327750.0|       20497349.91|
|   03|        328798.0|       21714172.68|
|   04|        193063.0|       12112134.49|
|   05|        202787.0|       11053298.15|
|   06|        264470.0|       15852396.38|
|   07|        196932.0|       13287585.39|
|   08|        242825.0|       14590611.40|
|   09|        241863.0|       16466268.87|
|   10|        184381.0|       12829983.51|
|   11|        221260.0|       13794762.06|
|   12|        213505.0|       14516466.67|
+-----+----------------+------------------+



In [9]:
# 7 - valor de venda por country - mostrar porcentagem
print("Valor de venda em moeda e em porcentagem por país:")

from pyspark.sql.functions import col, sum

df_vendas_pais = df_vendas.join(df_endereco, df_vendas.customer_key == df_endereco.address_number)\
                         .groupBy("country")\
                         .agg(sum("sales_amount").alias("valor_total_vendas"))\
                         .withColumn('valor_total_vendas', col('valor_total_vendas').cast(DecimalType(18, 2)))

total_vendas = df_vendas.agg(sum("sales_amount")).collect()[0][0]
df_vendas_pais = df_vendas_pais.withColumn("porcentagem", col("valor_total_vendas") / total_vendas * 100)\
                    .orderBy('porcentagem', ascending=False)\
                    .withColumn('porcentagem', col('porcentagem').cast(DecimalType(18, 2)))

df_vendas_pais.show()


Valor de venda em moeda e em porcentagem por país:
+-------+------------------+-----------+
|country|valor_total_vendas|porcentagem|
+-------+------------------+-----------+
|     US|      100312451.37|      53.88|
|     UK|       15555235.25|       8.35|
|     AU|       10962238.21|       5.89|
|     CA|        6320257.36|       3.39|
|     IR|        2109229.10|       1.13|
+-------+------------------+-----------+



In [10]:
# 8 - sales_amount por ano
print("Valor de venda por ano:")

df_vendas = df_vendas.withColumn("year", date_format(to_date(col("invoice_date"), "dd/MM/yyyy"), "yyyy"))
vendas_por_ano = df_vendas.groupBy("year")\
                    .agg(sum("sales_amount").alias("valor_total_vendas"))\
                    .orderBy('year')\
                    .withColumn('valor_total_vendas', col('valor_total_vendas').cast(DecimalType(18, 2)))

vendas_por_ano.show()


Valor de venda por ano:
+----+------------------+
|year|valor_total_vendas|
+----+------------------+
|null|              null|
|2017|       77906591.65|
|2018|       87462706.40|
|2019|       20817471.00|
+----+------------------+



In [11]:
# # 9 - tabela com customer, sales_amount, country. Ordenar melhores clientes pelo valor de sales_amount 10 melhores
# # e mostrar o total de sales_amount dos 10 melhores
# from pyspark.sql.functions import sum, col


from pyspark.sql.functions import sum

df_joined = df_clientes.join(df_vendas, "customer_key")
df_joined = df_joined.join(df_endereco, df_joined.customer_key == df_endereco.address_number,"left")
df_resultado = df_joined.groupBy("customer", 'country').agg(sum("sales_amount").alias("valor_total_vendas"))
df_top10 = df_resultado.orderBy("valor_total_vendas", ascending=False).withColumn('valor_total_vendas', col('valor_total_vendas').cast(DecimalType(18, 2))).limit(10)
df_top10.show()



+------------------+-------+------------------+
|          customer|country|valor_total_vendas|
+------------------+-------+------------------+
| Paracel Gigaplace|     US|       11397206.36|
|           Pereras|     US|       10843991.23|
|         Talarians|   null|        9254771.72|
|PageWave Megastore|   null|        8707904.14|
|   Target Gigstore|     AU|        5433005.93|
|Userland Maxistore|   null|        5202201.60|
|  Tandy Superstore|     US|        3275015.91|
|          Vanstars|     US|        3251414.29|
|   Acer Superstore|   null|        3122752.50|
|    Kerite Company|   null|        3113493.93|
+------------------+-------+------------------+



----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 52956)
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/anaconda3/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/opt/anaconda3/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/anaconda3/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/opt/spark/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/opt/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/opt/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/opt/spark/python/pyspark/seri