In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, year, month, dayofweek, datediff, max as spark_max
from pyspark.sql.types import DateType
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
# Inicializar SparkSession
# spark = SparkSession.builder.appName("RFMAnalysis").getOrCreate()

In [0]:
# Carregar dados
df_spark = spark.table("bus.bronze.ota_bus_ticket_sales")

In [0]:
# Converter a coluna de data para o tipo Date
df_spark = df_spark.withColumn("date_purchase", to_date(col("date_purchase")))

In [0]:
# Criar colunas derivadas
df_spark = df_spark.withColumn("year", year(col("date_purchase"))) \
                   .withColumn("month", month(col("date_purchase"))) \
                   .withColumn("day_of_week", dayofweek(col("date_purchase")))

In [0]:
# Exibir amostra dos dados
print("Amostra")
display(df_spark.limit(10))

In [0]:
# Análise exploratória
print("Informações gerais:")
df_spark.printSchema()

print("\nEstatísticas descritivas:")
display(df_spark.describe())

In [0]:
# Distribuição de gastos (usando Pandas para visualização com Matplotlib)
df_pandas = df_spark.select("gmv_success").toPandas()
plt.figure(figsize=(10, 5))
sns.histplot(df_pandas["gmv_success"])
plt.xlabel("Valor das compras R$")
plt.ylabel("Frequência de compras")
plt.title("Distribuição dos valores de compras (GMV)")
display(plt.gcf())
plt.clf()

In [0]:
# Destinos mais comuns
top_destinos = df_spark.groupBy("place_destination_departure").count().orderBy(col("count").desc()).limit(10)
top_destinos_pandas = top_destinos.toPandas()
plt.figure(figsize=(10, 5))
sns.barplot(x=top_destinos_pandas["count"], y=top_destinos_pandas["place_destination_departure"])
plt.title("Top 10 destinos mais procurados")
display(plt.gcf())
plt.clf()

In [0]:
# Estatísticas por cliente
from pyspark.sql.functions import max as spark_max, count

clientes = df_spark.groupBy("fk_contact").agg(
    spark_max("gmv_success").alias("total_gasto"),
    count("nk_ota_localizer_id").alias("qtd_compras"),
    spark_max("date_purchase").alias("ultima_compra")
)
display(clientes)



In [0]:
from pyspark.sql.functions import datediff, lit

clientes = clientes.withColumn(
    "recencia",
    datediff(
        lit(max_date),
        col("ultima_compra")
    )
)
display(clientes)

In [0]:
# Converter para Pandas para aplicar KMeans
rfm = clientes.select("recencia", "qtd_compras", "total_gasto").toPandas()

In [0]:
# Normalização simples
rfm_norm = (rfm - rfm.min()) / (rfm.max() - rfm.min())

In [0]:
# Aplicar KMeans
kmeans = KMeans(n_clusters=4, random_state=42)
rfm["cluster"] = kmeans.fit_predict(rfm_norm)

In [0]:
# Visualização
plt.figure(figsize=(10, 5))
sns.scatterplot(data=rfm, x="qtd_compras", y="total_gasto", hue="cluster", palette="tab10")
plt.title("Clusterização de Clientes (Frequência x Gasto)")
display(plt.gcf())
plt.clf()

In [0]:
# Resumo dos clusters
rfm["fk_contact"] = clientes.select("fk_contact").toPandas()
print("\nResumo dos clusters:")
rfm_grouped = rfm.groupby("cluster").agg({
    "qtd_compras": "mean",
    "total_gasto": "mean",
    "recencia": "mean",
    "fk_contact": "count"
})
display(rfm_grouped)