In [None]:
%%sh
sudo pip install spark
sudo pip install pyspark

# IMPORTS 

In [None]:


#Pandas, podemos transformar um dataframe do pandas em um dataframe do spark e o contrário também
import pandas as pd

#Importando o spark e o pyspark
import spark,pyspark

#Importando as bibliotecas do pyspark.sql 
from pyspark.sql import *

#Importando as funções sql do spark
#documentação https://spark.apache.org/docs/latest/api/sql/index.html
from pyspark.sql import functions as f

#Importando os tipos de dados do spark
#documentação https://spark.apache.org/docs/latest/sql-ref-datatypes.html
from pyspark.sql import types as t 

#Biblioteca datetime
from datetime import datetime, date,timedelta

# EXERCÍCIOS

## Criar um spark context com o app name = “AC4 DataEng”

In [None]:
#Criando uma Sessão do Spark (Spark Session)
spark = SparkSession.builder.master("local").appName("AC4 DataEng").getOrCreate()
#spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

## Criar um dataframe lendo o arquivo vendas.parquet

In [None]:
# Criação do Dataframe Vendas, através do Spark
#df_vendas = spark.read.format("parquet").option("delimiter",";").option("header","true").load("/content/vendas.parquet")
df_vendas = spark.read.format("parquet").option("header","true").load("/content/vendas.parquet")

In [None]:
df_vendas.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  Belo Horizonte|                null|       null|        null|         null|  

## Converter a coluna data para o tipo DateType

In [None]:
#Convertendo uma coluna
df_vendas = df_vendas.withColumn("data",df_vendas.data.cast(t.DateType()))

## Criar as colunas: 
### A - ano = ano da coluna “data”.
### B - mes= mes da coluna “data”.
### C - dia= dia da coluna “data”.


In [None]:
df_vendas = df_vendas.withColumn("Ano",f.year("data"))
df_vendas = df_vendas.withColumn("Mes",f.month("data"))
df_vendas = df_vendas.withColumn("Dia",f.dayofmonth("data"))

In [None]:
df_vendas.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__| Ano|Mes|Dia|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|2020| 10| 26|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  Belo Horizonte|          

## Na coluna sms, se caso o valor for null, troque por “False” 

In [None]:
# Trocando o valor 'Nulo' para 'False' e mantendo os outros valores

df_vendas = df_vendas.withColumn("sms",f.when(df_vendas.sms.isNull(),False).otherwise(df_vendas.sms))

## Crie uma coluna chamada “total” que deve ser a multiplicação das colunas “vlr” e “qtd”.


In [None]:
# Nova coluna "total", multiplicação da quantidade e valor
df_vendas = df_vendas.withColumn("total",df_vendas.qtd * df_vendas.vlr)

## Grave o dataframe particionado por ano, mes e dia no formato “parquet” de nome “vendas_tratadas”

In [None]:
# dataframe particionado por ano, mes e dia
df_vendas.write.partitionBy("Ano","Mes","Dia").parquet("/content/vendas_tratadas")

## Crie uma tabela temporaria chamada “vendas_table”

In [None]:
# Criando a tabela temporária "vendas_table"
df_vendas.registerTempTable("vendas_table")

In [None]:
# Visualizando dados da tabela temporária
spark.sql("select * from vendas_table").show()

## Crie um dataframe spark.sql utilizando a query spark.sql. Grave esse dataframe no formato “parquet” de nome  “cidade”.


In [None]:
# Gravando dataframe "cidade"
df_cidade = spark.sql("select distinct cidade from vendas_table")
df_cidade.write.parquet("/content/cidade")

## Criar um dataframe com a soma do vlr, soma da qtd agrupado pela data. Gravar esse dataframe no formato “parquet” de nome “vendas_total”

In [None]:
# Agrupamento de data somando o campo "vlr" e "qtd" e salvando arquivo parquet
vendas_total = df_vendas.groupBy("data").sum("vlr","qtd")
vendas_total = vendas_total.withColumnRenamed("sum(vlr)","soma_vlr")
vendas_total = vendas_total.withColumnRenamed("sum(qtd)","soma_qtd")
vendas_total.write.parquet("/content/vendas_total")

## Crie um novo dataframe chamado “produto” selecionando as colunas: “des_produto”,”des_familia”, “des_secao”, “des_categoria”,”des_sub_categoria”. Apenas valores unicos (distinct()). Grave esse dataframe como parquet “produto”


In [None]:
# Distinct das colunas e gravar arquivos parquet
df_produto = spark.sql("select distinct des_produto,des_familia,des_secao,des_categoria,des_sub_categoria from vendas_table")
df_produto.write.parquet("/content/produto")

##  Crie um dataframe chamado “venda_mulheres”, selecionando todas as colunas e filtrando apenas vendas realizadas para o sexo “F”, grave esse dataframe no formato json “venda_mulher”. 


In [None]:
# Vendas realizadas para mulheres
vendas_mulheres = df_vendas.where(df_vendas.sexo=="F")
vendas_mulheres.write.format("json").save("venda_mulher")

## Crie um dataframe chamado “venda_unicas”, selecionando todas as colunas e filtrando apenas vendas com a qtd igual 1 e o vlr menor que 10. Grave esse dataframe no formato csv“venda_unica”.

In [None]:
# Vendas realizadas para mulheres
vendas_unicas = df_vendas.where((df_vendas.qtd==1) & (df_vendas.vlr<10))
vendas_unicas.write.format("csv").save("venda_unica")