# Simulando Dados 


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, sum, avg

#OBS: as 3 linhas de codigo estão comentadas pois a versão free do databricks não me permite ler arquivos do storage/DBFS
#vendas_df = spark.read.csv("dbfs:/FileStore/tables/vendas.csv", header=True, inferSchema=True)
#clientes_df = spark.read.csv("dbfs:/FileStore/tables/clientes.csv", header=True, inferSchema=True)
#produtos_df = spark.read.csv("dbfs:/FileStore/tables/produtos.csv", header=True, inferSchema=True)

#criei 3 dataframes ficticios para realização da prova pratica

#vendas.csv
vendas_data = [
    (1, "2024-06-01", 101, 200.0, 2),
    (2, "2024-06-02", 102, None, 1),
    (3, "2024-06-03", 103, 300.0, 0),
    (4, "2024-06-04", None, 150.0, 1),
    (5, "2024-06-05", 104, -50.0, 1),
    (6, "2024-06-06", 105, 500.0, 2),
]
vendas_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Data", StringType(), True),
    StructField("ProdutoID", IntegerType(), True),
    StructField("Valor", DoubleType(), True),
    StructField("Quantidade", IntegerType(), True),
])
df_vendas = spark.createDataFrame(vendas_data, vendas_schema)

#clientes.csv
clientes_data = [
    (1, "Ana", 30, "São Paulo"),
    (2, "Bruno", 40, "Rio de Janeiro"),
    (None, "Carlos", 25, "Belo Horizonte"), 
    (4, "Daniela", 28, "Curitiba"),
]
clientes_schema = StructType([
    StructField("ClienteID", IntegerType(), True),
    StructField("Nome", StringType(), True),
    StructField("Idade", IntegerType(), True),
    StructField("Cidade", StringType(), True),
])
df_clientes = spark.createDataFrame(clientes_data, clientes_schema)

#produtos.csv
produtos_data = [
    (101, "Camiseta", "Vestuário"),
    (102, "Notebook", "Eletrônicos"),
    (103, "Cafeteira", "Eletrodomésticos"),
    (104, "Fone de Ouvido", "Eletrônicos"),
    (105, "Tênis", "Vestuário"),
]
produtos_schema = StructType([
    StructField("ProdutoID", IntegerType(), True),
    StructField("NomeProduto", StringType(), True),
    StructField("Categoria", StringType(), True),
])
df_produtos = spark.createDataFrame(produtos_data, produtos_schema)

#Visualização dos dados

In [0]:
display(df_clientes)
display(df_produtos)
display(df_vendas)

#Verificando inconsistencia de dados

In [0]:
#Verificando se há valores nulos e dados inconsistentes em vendas

df_vendas.select(
    col("ProdutoID").isNull().cast("int").alias("ProdutoID_nulo"),
    col("Valor").isNull().cast("int").alias("Valor_nulo"),
    col("Quantidade").isNull().cast("int").alias("Quantidade_nulo")
).groupBy().sum().show()

df_vendas.filter((col("Valor") <= 0) | (col("Quantidade") <= 0)).display()

In [0]:
#verificando se tem clientes com dados nulos
df_clientes.select(
    col("ClienteID").isNull().cast("int").alias("ClienteID_nulo"),
    col("Nome").isNull().cast("int").alias("Nome_nulo"),
    col("Idade").isNull().cast("int").alias("Idade_nulo"),
    col("Cidade").isNull().cast("int").alias("Cidade_nulo")
).groupBy().sum().display()

#Limpeza dos dados

In [0]:
#Removendo linhas que contem valores nulos
df_vendas = df_vendas.dropna(subset=["ProdutoID", "Valor", "Quantidade"]).filter((col("Valor") > 0) & (col("Quantidade") > 0))
df_clientes = df_clientes.dropna(subset=["ClienteID"]).filter(col("Idade") > 0)

In [0]:
#visualização final depois das linhas excluidas
display(df_vendas)
display(df_clientes)

#Adicionando coluna TotalVenda em Vendas

In [0]:
df_vendas = df_vendas.withColumn("TotalVenda", col("Valor") * col("Quantidade"))
display(df_vendas)

#junção de Dados - venda e clientes

In [0]:
df_vendas_clientes = df_vendas.join(df_clientes, df_vendas["ID"] == df_clientes["ClienteID"], how ="inner")
display(df_vendas_clientes)

#Junção de dados - Produtos

In [0]:
df_produtos_vendas_clientes = df_vendas_clientes.join(df_produtos, on="ProdutoID", how="inner")
display(df_produtos_vendas_clientes)

#Análise Simples - Total de vendas por produto

In [0]:
df_total_vendas_produto = df_produtos_vendas_clientes.groupBy("NomeProduto").agg(sum("TotalVenda").alias("TotalVendas"))
display(df_total_vendas_produto)

#Análise de Idade dos Clientes por Produto


In [0]:
df_media_idade_produto = df_produtos_vendas_clientes.groupBy("NomeProduto").agg(avg("Idade").alias("MediaIdadeClientes"))
display(df_media_idade_produto)

#Salvar os Dados Processados


In [0]:
df_media_idade_produto.write.mode("overwrite").parquet("dbfs:/FileStore/tables/vendas_clientes_produtos/")


#Consulta ao Conjunto de Dados Processados


In [0]:
df_vendas_clientes_produtos = spark.read.parquet("dbfs:/FileStore/tables/vendas_clientes_produtos/")
display(df_vendas_clientes_produtos)