In [1]:
#import de bibliotecas

import pyspark as spark
import pandas as pd
import numpy as np

In [2]:
#Inicia sessão Spark

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vendas e-commerce").getOrCreate()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import col, min, max, unix_timestamp, count, mean, stddev, floor 
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
spark

In [None]:
#Lê arquivo e mostra resultado para visualizar tabela

df_spark = spark.read.csv(r'C:\Users\Paulo\Videos\Downloads\Projeto_Spark\2019-Nov.csv')
print('\n')
df_spark.show()



+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|                 _c0|       _c1|       _c2|                _c3|                 _c4|     _c5|   _c6|      _c7|                 _c8|
+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|                NULL|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-11-01 00:00:...|      view|   3601530|2053013563810775923|app

In [4]:
#Preparação dos dados com ajuste do nome das colunas para melhor identificação

df_spark_v1 = df_spark.withColumnRenamed("_c0", "event_time") \
    .withColumnRenamed("_c1", "event_type") \
    .withColumnRenamed("_c2", "product_id") \
    .withColumnRenamed("_c3", "category_id") \
    .withColumnRenamed("_c4", "category_code") \
    .withColumnRenamed("_c5", "brand") \
    .withColumnRenamed("_c6", "price") \
    .withColumnRenamed("_c7", "user_id") \
    .withColumnRenamed("_c8", "user_session") 


#Tira primeira linha, pois o cabeçalho dentro da tabela atrapalha a análise
df_spark_v1 = df_spark_v1.filter(df_spark_v1["event_time"] != 'event_time')

#Ajusta os tipos das colunas para o tipo correspondente
from pyspark.sql.types import DateType, IntegerType

df_spark_v1 = df_spark_v1.withColumn('event_time', col('event_time').cast(DateType())) \
              .withColumn('product_id', col('product_id').cast(IntegerType())) \
              .withColumn('price', col('price').cast('float')) \
              .withColumn('user_id', col('user_id').cast(IntegerType())) \
              .withColumn('category_id', col('category_id').cast("long")) 


In [5]:
#Verificação dos tipos

df_spark_v1.printSchema()

root
 |-- event_time: date (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [6]:
#Retirada de nulos e duplicadas

df_spark_v1.dropna()
df_spark_v1.distinct()

DataFrame[event_time: date, event_type: string, product_id: int, category_id: bigint, category_code: string, brand: string, price: float, user_id: int, user_session: string]

In [8]:
#Media e desvio do preço dos produtos

df_analise = df_spark_v1.filter(col("event_type") == 'purchase').select('price').agg(
    min('price').alias('minimo'),
    max('price').alias('maximo'),
    count('price').alias('validos'),
    mean("price").alias("preco_medio"),
    stddev("price").alias("desvio_padrao")
)

df_analise.show()

media = df_analise["preco_medio"]
desvio = df_analise["desvio_padrao"]


+------+-------+-------+-----------------+-----------------+
|minimo| maximo|validos|      preco_medio|    desvio_padrao|
+------+-------+-------+-----------------+-----------------+
|  0.77|2574.07| 916939|300.1234439205989|341.3809936156113|
+------+-------+-------+-----------------+-----------------+



In [9]:
#Valores por categoria (Gemini)
#Calcula a quantidade de produtos vendidos por faixa de preço e a soma dos valores dessa mesma faixa

from pyspark.sql.functions import col, floor, count, sum

tamanho_faixa = 50

df_distribuicao = (
    df_spark_v1
    .filter("event_type == 'purchase'")
    .withColumn("faixa_preco", floor(col("price") / tamanho_faixa) * tamanho_faixa)
    .groupBy("category_code", "faixa_preco")
    .agg(
        count("*").alias("qtd_vendas"),      
        sum("price").alias("valor_total")   
    )
    .orderBy("category_code", "faixa_preco")
)

df_distribuicao.show()


+-------------+-----------+----------+------------------+
|category_code|faixa_preco|qtd_vendas|       valor_total|
+-------------+-----------+----------+------------------+
|         NULL|          0|    100046|3173516.7554556727|
|         NULL|         50|     52929|3748341.7774505615|
|         NULL|        100|     22321|2707156.0482177734|
|         NULL|        150|     16254| 2795173.765930176|
|         NULL|        200|      9246| 2069731.153060913|
|         NULL|        250|      7040|1911858.9961547852|
|         NULL|        300|      6368|2066822.5298156738|
|         NULL|        350|      5793|2200938.1008605957|
|         NULL|        400|      3821|1630068.4167480469|
|         NULL|        450|      2395|1137137.9858703613|
|         NULL|        500|      1897| 981738.4693908691|
|         NULL|        550|      1186| 675335.3883056641|
|         NULL|        600|      1048|  655533.397644043|
|         NULL|        650|       667|452836.33807373047|
|         NULL

In [10]:
#Cálculo de assimetria e curtose (Gemini)

from pyspark.sql.functions import skewness, kurtosis

df_produtos_precos = df_spark_v1.select("product_id", "price").distinct()

df_produtos_precos.select(
    skewness("price").alias("assimetria"),
    kurtosis("price").alias("curtose")
 ).show()


+-----------------+------------------+
|       assimetria|           curtose|
+-----------------+------------------+
|3.284038327571408|13.436492501016133|
+-----------------+------------------+



In [11]:
#Outlier de produtos e sessões (Gemini)

# Pede o percentil 1% (0.01) e o 99% (0.99)
# O último parâmetro (0.01) é a precisão do erro aceitável (padrão do Spark)
quantiles = df_spark_v1.approxQuantile("price", [0.05, 0.95], 0.01)

limite_inferior = quantiles[0] # O valor que delimita os 1% mais baratos
limite_superior = quantiles[1] # O valor que delimita os 1% mais caros

print(f"Abaixo de: R$ {limite_inferior} (Barato demais/Erro)")
print(f"Acima de: R$ {limite_superior} (Caro demais)")

# Filtra quem está FORA desse miolo saudável
df_outliers_percentil = df_spark_v1.filter(
    (col("price") < limite_inferior) | 
    (col("price") > limite_superior)
)

df_outliers_percentil.show()

Abaixo de: R$ 18.479999542236328 (Barato demais/Erro)
Acima de: R$ 979.4299926757812 (Caro demais)
+----------+----------+----------+-------------------+--------------------+----------------+-------+---------+--------------------+
|event_time|event_type|product_id|        category_id|       category_code|           brand|  price|  user_id|        user_session|
+----------+----------+----------+-------------------+--------------------+----------------+-------+---------+--------------------+
|2019-11-01|      view|  24900193|2053013562183385881|                NULL|            NULL|   1.09|512651494|f603c815-f51a-46f...|
|2019-11-01|      view|  27400066|2053013563391345499|                NULL|            NULL|   8.55|551061950|3f6112f1-5695-4e8...|
|2019-11-01|      view|  26022534|2053013562837697343|                NULL|            NULL|   7.07|566280860|341a87d8-8cf4-4b4...|
|2019-11-01|      view|  26019863|2053013562837697343|                NULL|            NULL|  11.79|566280860

In [12]:
#Popularidade de produto

df_prod_pop = df_spark_v1.groupBy("product_id", "category_code").agg(
    count("event_type").alias("numero de interações")
)

df_prod_pop.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- category_code: string (nullable = true)
 |-- numero de interações: long (nullable = false)



In [20]:
#Cálculo de taxa de conversão

# 1. Prepara a tabela pivotada (Transforma linhas em colunas) (Gemini)
df_metricas = df_spark_v1.groupBy("product_id") \
    .pivot("event_type") \
    .count() \
    .fillna(0) # Zeros no lugar de Nulos são OBRIGATÓRIOS pra matemática funcionar

# df_metricas.show()

# 2. Faz a divisão (Compra / Visita)
df_conversao = df_metricas.withColumn(
    "taxa_conversao", 
    col("purchase") / col("view")
)

#df_conversao.orderBy(col("taxa_conversao").desc()).show()

df_conversao.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- cart: long (nullable = true)
 |-- purchase: long (nullable = true)
 |-- view: long (nullable = true)
 |-- taxa_conversao: double (nullable = true)



In [14]:
#Métricas sem outliers

df_comportamento_real = df_spark_v1.filter(
    (col("event_type") == 'purchase') &  # Tem que ser compra
    (col("price") >= limite_inferior) &  # E tem que ser maior que o piso
    (col("price") <= limite_superior)    # E tem que ser menor que o teto
).select('price').agg(
    min('price').alias('minimo'),
    max('price').alias('maximo'),
    count('price').alias('validos'),
    mean("price").alias("preco_medio"),
    stddev("price").alias("desvio_padrao")
)

df_comportamento_real.show()

+------+------+-------+-----------------+------------------+
|minimo|maximo|validos|      preco_medio|     desvio_padrao|
+------+------+-------+-----------------+------------------+
| 18.48|979.43| 836169|249.7782162880237|232.61505677829325|
+------+------+-------+-----------------+------------------+



In [15]:
#Configuração para possibilitar criação dos arquivos com os dados trabalhados (Gemini)
import os
import sys

# Aponte para onde você salvou a pasta (tem que ter a subpasta /bin dentro com o winutils.exe)
os.environ['HADOOP_HOME'] = "C:\\hadoop"
sys.path.append("C:\\hadoop\\bin")

In [16]:
caminho_saida = r"C:\Users\Paulo\Videos\Downloads\diretorio_csv"

#df_spark_v1.coalesce(1).write.csv(caminho_saida, header=True, mode='overwrite')

In [None]:
# Definindo a base
base = r"C:\Users\Paulo\Videos\Downloads\diretorio_csv"

# Salvando cada um em sua respectiva pasta
df_analise.coalesce(1).write.csv(base + "\\analise_preco", header=True, mode='overwrite')
df_distribuicao.coalesce(1).write.csv(base + "\\distribuicao", header=True, mode='overwrite')
df_prod_pop.coalesce(1).write.csv(base + "\\prod_pop", header=True, mode='overwrite')
df_conversao.coalesce(1).write.csv(base + "\\conversao", header=True, mode='overwrite')
df_comportamento_real.coalesce(1).write.csv(base + "\\comportamento", header=True, mode='overwrite')
df_distribuicao.coalesce(1).write.csv(base + "\\distribuicao", header=True, mode='overwrite')