In [0]:
# Importação das bibliotecas necessárias no código 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
# 1 - Inicialize uma Spark Session
spark = SparkSession.builder \
    .appName("credit_card_transactions") \
    .getOrCreate()


In [0]:
# 2 - Ler a tabela que o arquivo foi salvo

df = spark.read.table("hive_metastore.default.vendas_oficina_csv")

In [0]:
# 3 - Checar o schema do dataframe

df.printSchema()

In [0]:
# 4 - Contar quantos registros tem o dataframe

total_vendas = df.count()
print(f"Total de vendas: {total_vendas}")


In [0]:
# 5 - Filtrar vendas por forma de pagamento PIX

df_pix = df.filter(df.forma_pagamento == "PIX")
display(df_pix)

In [0]:
# 6 - Calcular a receita total da oficina

## posso selecionar e somar, retornando um dataframe

receita_total = df.withColumn('valor_servico', col('valor_servico').cast(DecimalType(18,2)))

receita_total = receita_total.select(sum("valor_servico"))

display(receita_total)

## se eu coletar, me retorna uma valor decimal
receita_total = df.agg({"valor_servico": "sum"}).collect()[0][0]
print(f"Receita total da oficina: R$ {receita_total:.2f}")

# Observação: cuidado com o collect(). o uso do collect() com um grande volume de dados pode estourar a memória do driver

In [0]:
# 7 - Agrupar vendas por serviço e contar quantas vendas tem cada um

df_vendas_servico = df.groupBy("servico").count()

display(df_vendas_servico)

In [0]:
# 8 - Calcular a receita por cidade 

df_receita_cidade = df.withColumn('valor_servico', col('valor_servico').cast(DecimalType(18,2)))

df_receita_cidade = df_receita_cidade.groupBy("cidade").sum("valor_servico")

display(df_receita_cidade)

In [0]:
# 9 - Encontrar a maior venda 

df_maior_venda = df.withColumn('valor_servico', col('valor_servico').cast(DecimalType(18,2)))

df_maior_venda = df_maior_venda.select(max("valor_servico"))

display(df_maior_venda)

In [0]:
# 10 - Filtrar os registros apenas desse mês

df = df.withColumn('data_venda', col('data_venda').cast(DateType()))

df = df.filter(month(col('data_venda')) == 12)

display(df)

In [0]:
# 11 - Salvar o dataframe em uma tabela em parquet com somente os registros do mês de dezembro (feito no exercicio 10)

df.write.format("parquet").saveAsTable("vendas_oficina_dezembro")