In [0]:
%sql

SELECT * FROM sc_gold.historico_de_servicos

In [0]:
%sql
SELECT * FROM sc_gold.viaturas

In [0]:
%sql
DROP TABLE IF EXISTS sc_gold.historico_de_servicos_2;

CREATE TABLE sc_gold.historico_de_servicos_2 AS
SELECT numero_do_servico_pos_venda,data_de_fecho,data_de_abertura,canal_de_venda,data_servico_pos_venda,descricao_servico_pos_venda,viatura,ordem_reparacao,tipo_de_servico,origem_registo,pedido_do_cliente
FROM sc_gold.historico_de_servicos;


In [0]:
%sql
SELECT * FROM sc_gold.historico_de_servicos_2

In [0]:
%sql

DROP TABLE IF EXISTS sc_gold.viaturas_2;

CREATE TABLE sc_gold.viaturas_2 AS
SELECT id,designacao_comercial,modelo,motorizacao,versao,data_de_matricula,cilindrada__cm3_,potencia_maxima__kw_,combustivel,gwms_engine,production_date
FROM sc_gold.viaturas;

In [0]:
%sql
SELECT * FROM sc_gold.viaturas_2

In [0]:
from pyspark.sql import functions as F

table_name = "sc_gold.viaturas_2"

# Load the table
df = spark.table(table_name)

# Get total rows
total_rows = df.count()

# Calculate percentage of nulls for each column
null_percentages = (
    df.select([
        (F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100)
        .alias(c)
        for c in df.columns
    ])
)

display(null_percentages)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType, DecimalType
from pyspark.sql.functions import split

In [0]:
#passo usado para remover linhas (excluir linhas vazias na coluna modelo -486 linhas)
table_name = "sc_gold.viaturas_2"
df0 = spark.table(table_name)


# Aplicar o filtro (excluir linhas vazias na coluna modelo -486 linhas)
df = df0.filter(
    (F.col("modelo").isNotNull())
    & (F.trim(F.col("modelo")) != "")
    & (F.lower(F.trim(F.col("modelo"))) != "null")
)


# 3) Gravar de volta sobrescrevendo a tabela original
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)


In [0]:
#descobrir o numero de dias em media de diferen√ßa entre a data de produ√ß√£o e a data da matricula

# Carregar a tabela
df = spark.table("sc_gold.viaturas_2")

# Converter para DATE (com v√°rios formatos tolerados, se necess√°rio)
df = df.withColumn(
    "data_de_matricula_dt",
    F.coalesce(
        F.expr("try_to_date(data_de_matricula, 'dd-MM-yyyy')"),
        F.expr("try_to_date(data_de_matricula, 'yyyy-MM-dd')"),
        F.expr("try_to_date(data_de_matricula, 'dd/MM/yyyy')")
    )
).withColumn(
    "production_date_dt",
    F.coalesce(
        F.expr("try_to_date(production_date, 'dd-MM-yyyy')"),
        F.expr("try_to_date(production_date, 'yyyy-MM-dd')"),
        F.expr("try_to_date(production_date, 'dd/MM/yyyy')")
    )
)

# Remover linhas com valores nulos em qualquer das datas
df_valid = df.filter(F.col("data_de_matricula_dt").isNotNull() & F.col("production_date_dt").isNotNull())

# Calcular diferen√ßa em dias
df_valid = df_valid.withColumn("diff_days", 
                               F.datediff(F.col("data_de_matricula_dt"), F.col("production_date_dt")))

# Somat√≥rio e m√©dia
agg = df_valid.agg(
    F.sum("diff_days").alias("soma_dias"),
    F.count("diff_days").alias("n_linhas"),
    F.avg("diff_days").alias("media_dias")
).collect()[0]

print("üîπ Soma total dos dias:", agg["soma_dias"])
print("üîπ N√∫mero de linhas usadas:", agg["n_linhas"])
print("üîπ M√©dia de dias:", round(agg["media_dias"], 2))


In [0]:
#substituir data de produ√ß√£o e a data da matricula +/-131 com base no calculo anterior

df = spark.table("sc_gold.viaturas_2")

# Tenta converter em v√°rios formatos comuns
df = df.withColumn(
    "data_de_matricula",
    F.coalesce(
        F.expr("try_to_date(data_de_matricula, 'dd-MM-yyyy')"),
        F.expr("try_to_date(data_de_matricula, 'yyyy-MM-dd')"),
        F.expr("try_to_date(data_de_matricula, 'dd/MM/yyyy')"),
        F.expr("try_to_date(data_de_matricula, 'yyyy/MM/dd')")
    )
)

df = df.withColumn(
    "production_date",
    F.coalesce(
        F.expr("try_to_date(production_date, 'dd-MM-yyyy')"),
        F.expr("try_to_date(production_date, 'yyyy-MM-dd')"),
        F.expr("try_to_date(production_date, 'dd/MM/yyyy')"),
        F.expr("try_to_date(production_date, 'yyyy/MM/dd')")
    )
)

# Aplicar as regras dos ¬±131 dias
df = df.withColumn(
    "production_date",
    F.when(F.col("production_date").isNull() & F.col("data_de_matricula").isNotNull(),
           F.date_sub(F.col("data_de_matricula"), 131))
     .otherwise(F.col("production_date"))
).withColumn(
    "data_de_matricula",
    F.when(F.col("data_de_matricula").isNull() & F.col("production_date").isNotNull(),
           F.date_add(F.col("production_date"), 131))
     .otherwise(F.col("data_de_matricula"))
)

# Reescrever a tabela (permitindo schema overwrite se necess√°rio)
(df.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
#criar nova coluna que √© o ano de produ√ß√£o do carro
# Carregar tabela
df = spark.table("sc_gold.viaturas_2")

# Converter production_date para DATE (caso ainda seja string)
df = df.withColumn(
    "production_date_dt",
    F.coalesce(
        F.expr("try_to_date(production_date, 'dd-MM-yyyy')"),
        F.expr("try_to_date(production_date, 'yyyy-MM-dd')"),
     F.expr("try_to_date(production_date, 'dd/MM/yyyy')")
    )
)

# Extrair o ano de produ√ß√£o
df = df.withColumn("production_year", F.year("production_date_dt"))

# Calcular idade em anos at√© a data de hoje
df = df.withColumn(
    "age_year",
    F.floor(F.datediff(F.current_date(), F.col("production_date_dt")) / 365.25)
)

# Regravar a tabela com as novas colunas
(df.write
   .format("delta")
  .mode("overwrite")
  .option("overwriteSchema", "true")  # garante que aceita as novas colunas
   .saveAsTable("sc_gold.viaturas_2"))

In [0]:
# calcular a m√©dia de cilindrada (cilindrada__cm3_) por (gwms_engine + modelo + motoriza√ß√£o)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Janela por atributos do grupo
w = W.partitionBy("gwms_engine", "motorizacao", "modelo")

# 3) m√©dia global (fallback) ‚Äî j√° arredondada para inteiro
global_avg = df.select(F.avg("cilindrada__cm3_").alias("g")).first()["g"]
if global_avg is not None:
    global_avg = int(round(global_avg))

# 4) Preencher nulos com a m√©dia do grupo arredondada (ou fallback global)
df_filled = (
    df
    .withColumn("avg_grupo", F.avg("cilindrada__cm3_").over(w))
    .withColumn(
        "cilindrada__cm3_",
        F.when(
            F.col("cilindrada__cm3_").isNull(),
            F.coalesce(F.round(F.col("avg_grupo")).cast(IntegerType()), F.lit(global_avg))
        ).otherwise(F.col("cilindrada__cm3_").cast(IntegerType()))
    )
    .drop("avg_grupo")
)


# 5) Escrever o RESULTADO correto
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))

In [0]:
# calcular a m√©dia de potencia (potencia_maxima__kw_) por (gwms_engine + modelo + motorizacao)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Limpeza e cast:
#    - troca v√≠rgula decimal por ponto
#    - remove qualquer caractere n√£o num√©rico (p.ex. ' kW', espa√ßos, etc.)
pot_clean = F.regexp_replace(F.col("potencia_maxima__kw_"), ",", ".")
pot_clean = F.regexp_replace(pot_clean, r"[^0-9.]", "")
df = df.withColumn("potencia_maxima__kw_", pot_clean.cast(DoubleType()))

# 3) M√©dia global (fallback), arredondada a 1 casa
global_avg = df.select(F.avg("potencia_maxima__kw_").alias("g")).first()["g"]
global_avg_1d = round(global_avg, 1) if global_avg is not None else None

# 4) M√©dia por grupo
keys = ["gwms_engine", "motorizacao", "modelo"]
avg_by_group = (
    df.groupBy(*keys)
      .agg(F.avg("potencia_maxima__kw_").alias("avg_grp"))
)

# 5) Preencher apenas nulos com a m√©dia do grupo (1 casa decimal);
#    se o grupo for todo nulo, usa m√©dia global
df_filled = (
    df.join(avg_by_group, on=keys, how="left")
      .withColumn(
          "potencia_maxima__kw_",
          F.when(
              F.col("potencia_maxima__kw_").isNull(),
              F.coalesce(F.round(F.col("avg_grp"), 1), F.lit(global_avg_1d))
          ).otherwise(F.col("potencia_maxima__kw_"))
      )
      .drop("avg_grp")
)

# (Opcional) Se quiser NORMALIZAR toda a coluna para 1 casa decimal, inclusive n√£o nulos:
# df_filled = df_filled.withColumn("potencia_maxima__kw_", F.round(F.col("potencia_maxima__kw_"), 1))

# (Opcional) Fixar o tipo para Decimal(10,1) no schema (em vez de double):
# df_filled = df_filled.withColumn("potencia_maxima__kw_", F.col("potencia_maxima__kw_").cast(DecimalType(10,1)))

# 6) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))

In [0]:
# Preencher 'motorizacao' com a MODA por grupo (gwms_engine, modelo, potencia_maxima__kw_, combustivel)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Pot√™ncia: v√≠rgula -> ponto, remover ru√≠do e cast para double (porque √© chave do grupo)
pot = F.regexp_replace(F.col("potencia_maxima__kw_"), ",", ".")
pot = F.regexp_replace(pot, r"[^0-9.]", "")
df = df.withColumn("potencia_maxima__kw_", pot.cast(DoubleType()))

# 3) Higienizar texto relevante
for col in ["motorizacao", "combustivel", "gwms_engine", "modelo"]:
    df = df.withColumn(col, F.trim(F.col(col)))
df = df.withColumn("motorizacao", F.when(F.col("motorizacao") == "", None).otherwise(F.col("motorizacao")))
df = df.withColumn("combustivel", F.when(F.col("combustivel") == "", None).otherwise(F.col("combustivel")))
df = df.withColumn("gwms_engine", F.when(F.col("gwms_engine") == "", None).otherwise(F.col("gwms_engine")))

# 4) Chaves do grupo
keys_mot = ["gwms_engine", "modelo", "potencia_maxima__kw_", "combustivel"]

# 5) Moda de 'motorizacao' por grupo (desempate alfab√©tico)
counts_mot = (
    df.filter(F.col("motorizacao").isNotNull())
      .groupBy(*keys_mot, "motorizacao")
      .agg(F.count(F.lit(1)).alias("cnt"))
)
w_mot = W.partitionBy(*keys_mot).orderBy(F.col("cnt").desc(), F.col("motorizacao").asc())
mode_motorizacao = (
    counts_mot.withColumn("rn", F.row_number().over(w_mot))
              .filter(F.col("rn") == 1)
              .select(*keys_mot, F.col("motorizacao").alias("mode_motorizacao"))
)

# 6) Moda global de 'motorizacao' (fallback opcional)
row_mot = (
    df.filter(F.col("motorizacao").isNotNull())
      .groupBy("motorizacao").agg(F.count(F.lit(1)).alias("cnt"))
      .orderBy(F.col("cnt").desc(), F.col("motorizacao").asc())
      .limit(1).first()
)
global_mode_mot = row_mot["motorizacao"] if row_mot else None

# 7) Preencher APENAS nulos de 'motorizacao' com a moda do grupo (ou global)
df_filled = (
    df.join(mode_motorizacao, on=keys_mot, how="left")
      .withColumn(
          "motorizacao",
          F.when(F.col("motorizacao").isNull(),
                 F.coalesce(F.col("mode_motorizacao"), F.lit(global_mode_mot)))
           .otherwise(F.col("motorizacao"))
      )
      .drop("mode_motorizacao")
)

# 8) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
# Preencher 'gwms_engine' pela MODA por grupo (cilindrada__cm3_, potencia_maxima__kw_, modelo, motorizacao)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Higienizar num√©ricos (v√≠rgula -> ponto; remover ru√≠do) e fazer cast
cil_clean = F.regexp_replace(F.col("cilindrada__cm3_"), ",", ".")
cil_clean = F.regexp_replace(cil_clean, r"[^0-9.]", "")
df = df.withColumn("cilindrada__cm3_", cil_clean.cast(DoubleType()))

pot_clean = F.regexp_replace(F.col("potencia_maxima__kw_"), ",", ".")
pot_clean = F.regexp_replace(pot_clean, r"[^0-9.]", "")
df = df.withColumn("potencia_maxima__kw_", pot_clean.cast(DoubleType()))

# 3) Higienizar texto: trim e strings vazias -> NULL
df = df.withColumn("gwms_engine", F.when(F.trim(F.col("gwms_engine")) == "", None)
                                  .otherwise(F.trim(F.col("gwms_engine"))))
df = df.withColumn("modelo", F.trim(F.col("modelo")))
# "motorizacao" sem acento ‚Äî confirme o nome exato na tabela
df = df.withColumn("motorizacao", F.when(F.trim(F.col("motorizacao")) == "", None)
                                   .otherwise(F.trim(F.col("motorizacao"))))

# 4) Chaves do grupo
keys = ["cilindrada__cm3_", "potencia_maxima__kw_", "modelo", "motorizacao"]

# 5) Calcular a MODA de gwms_engine por grupo
counts = (
    df.filter(F.col("gwms_engine").isNotNull())
      .groupBy(*keys, "gwms_engine")
      .agg(F.count(F.lit(1)).alias("cnt"))
)

w = W.partitionBy(*keys).orderBy(F.col("cnt").desc(), F.col("gwms_engine").asc())

mode_by_group = (
    counts.withColumn("rn", F.row_number().over(w))
          .filter(F.col("rn") == 1)
          .select(*keys, F.col("gwms_engine").alias("mode_gwms_engine"))
)

# 6) (Opcional) Moda GLOBAL como fallback se o grupo n√£o tiver ocorr√™ncias v√°lidas
row_global = (
    df.filter(F.col("gwms_engine").isNotNull())
      .groupBy("gwms_engine").agg(F.count(F.lit(1)).alias("cnt"))
      .orderBy(F.col("cnt").desc(), F.col("gwms_engine").asc())
      .limit(1).first()
)
global_mode = row_global["gwms_engine"] if row_global else None

# 7) Preencher APENAS nulos com a moda do grupo (ou moda global se necess√°rio)
df_filled = (
    df.join(mode_by_group, on=keys, how="left")
      .withColumn(
          "gwms_engine",
          F.when(
              F.col("gwms_engine").isNull(),
              F.coalesce(F.col("mode_gwms_engine"), F.lit(global_mode))
          ).otherwise(F.col("gwms_engine"))
      )
      .drop("mode_gwms_engine")
)

# 8) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))

In [0]:
# MODA de combustivel por (gwms_engine, cilindrada__cm3_, modelo, motorizacao)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Higienizar colunas num√©ricas (garantir double)
cil = F.regexp_replace(F.col("cilindrada__cm3_"), ",", ".")
cil = F.regexp_replace(cil, r"[^0-9.]", "")
df = df.withColumn("cilindrada__cm3_", cil.cast(DoubleType()))

# 3) Higienizar colunas de texto
df = df.withColumn("gwms_engine", F.when(F.length(F.trim("gwms_engine")) == 0, None)
                                   .otherwise(F.trim(F.col("gwms_engine")).cast("string")))
df = df.withColumn("modelo", F.trim(F.col("modelo")).cast("string"))
df = df.withColumn("motorizacao", F.when(F.length(F.trim("motorizacao")) == 0, None)
                                   .otherwise(F.trim(F.col("motorizacao")).cast("string")))
df = df.withColumn("combustivel", F.when(F.length(F.trim("combustivel")) == 0, None)
                                   .otherwise(F.trim(F.col("combustivel")).cast("string")))

# 4) Chave do grupo
keys = ["gwms_engine", "cilindrada__cm3_", "modelo", "motorizacao"]

# 5) Moda de combustivel por grupo
counts = (
    df.filter(F.col("combustivel").isNotNull())
      .groupBy(*keys, "combustivel")
      .agg(F.count(F.lit(1)).alias("cnt"))
)

w = W.partitionBy(*keys).orderBy(F.col("cnt").desc(), F.col("combustivel").asc())

mode_by_group = (
    counts.withColumn("rn", F.row_number().over(w))
          .filter(F.col("rn") == 1)
          .select(*keys, F.col("combustivel").alias("mode_combustivel"))
)

# 6) Moda global como fallback
row_global = (
    df.filter(F.col("combustivel").isNotNull())
      .groupBy("combustivel").agg(F.count(F.lit(1)).alias("cnt"))
      .orderBy(F.col("cnt").desc(), F.col("combustivel").asc())
      .limit(1).first()
)
global_mode = row_global["combustivel"] if row_global else None

# 7) Preencher apenas nulos
df_filled = (
    df.join(mode_by_group, on=keys, how="left")
      .withColumn(
          "combustivel",
          F.when(
              F.col("combustivel").isNull(),
              F.coalesce(F.col("mode_combustivel"), F.lit(global_mode))
          ).otherwise(F.col("combustivel"))
      )
      .drop("mode_combustivel")
)

# 8) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
# MODA de designacao_comercial por (gwms_engine, modelo, motorizacao)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Higienizar texto (trim + strings vazias -> NULL)
for col in ["designacao_comercial", "modelo", "gwms_engine", "motorizacao"]:
    df = df.withColumn(col, F.trim(F.col(col)))
    df = df.withColumn(col, F.when(F.col(col) == "", None).otherwise(F.col(col)))

# 3) Chaves do grupo
keys = ["gwms_engine", "modelo", "motorizacao"]

# 4) Moda por grupo
counts = (
    df.filter(F.col("designacao_comercial").isNotNull())
      .groupBy(*keys, "designacao_comercial")
      .agg(F.count(F.lit(1)).alias("cnt"))
)

w = W.partitionBy(*keys).orderBy(F.col("cnt").desc(), F.col("designacao_comercial").asc())

mode_by_group = (
    counts.withColumn("rn", F.row_number().over(w))
          .filter(F.col("rn") == 1)
          .select(*keys, F.col("designacao_comercial").alias("mode_designacao_comercial"))
)

# 5) Moda global como fallback
row_global = (
    df.filter(F.col("designacao_comercial").isNotNull())
      .groupBy("designacao_comercial").agg(F.count(F.lit(1)).alias("cnt"))
      .orderBy(F.col("cnt").desc(), F.col("designacao_comercial").asc())
      .limit(1).first()
)
global_mode = row_global["designacao_comercial"] if row_global else None

# 6) Preencher apenas nulos
df_filled = (
    df.join(mode_by_group, on=keys, how="left")
      .withColumn(
          "designacao_comercial",
          F.when(
              F.col("designacao_comercial").isNull(),
              F.coalesce(F.col("mode_designacao_comercial"), F.lit(global_mode))
          ).otherwise(F.col("designacao_comercial"))
      )
      .drop("mode_designacao_comercial")
)

# 7) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))

In [0]:
# Preencher 'versao' com a MODA por grupo (modelo, gwms_engine, motorizacao, age_year)
# 1) Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# 2) Higienizar texto (trim e strings vazias -> NULL)
for col in ["versao", "modelo", "gwms_engine", "motorizacao", "age_year"]:
    df = df.withColumn(col, F.trim(F.col(col)))
    df = df.withColumn(col, F.when(F.col(col) == "", None).otherwise(F.col(col)))

# 3) Definir chaves do grupo
keys = ["modelo", "gwms_engine", "motorizacao", "age_year"]

# 4) Calcular a moda de 'versao' por grupo
counts = (
    df.filter(F.col("versao").isNotNull())
      .groupBy(*keys, "versao")
      .agg(F.count(F.lit(1)).alias("cnt"))
)

w = W.partitionBy(*keys).orderBy(F.col("cnt").desc(), F.col("versao").asc())

mode_by_group = (
    counts.withColumn("rn", F.row_number().over(w))
          .filter(F.col("rn") == 1)
          .select(*keys, F.col("versao").alias("mode_versao"))
)

# 5) Moda global como fallback
row_global = (
    df.filter(F.col("versao").isNotNull())
      .groupBy("versao").agg(F.count(F.lit(1)).alias("cnt"))
      .orderBy(F.col("cnt").desc(), F.col("versao").asc())
      .limit(1).first()
)
global_mode = row_global["versao"] if row_global else None

# 6) Preencher apenas nulos de 'versao'
df_filled = (
    df.join(mode_by_group, on=keys, how="left")
      .withColumn(
          "versao",
          F.when(
              F.col("versao").isNull(),
              F.coalesce(F.col("mode_versao"), F.lit(global_mode))
          ).otherwise(F.col("versao"))
      )
      .drop("mode_versao")
)

# 7) Escrever resultado
(df_filled.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
# Passo usado para remover nulos das datas (confirmar) e remover coluna duplicada 'data_de_matricula_dt'
#  Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# Remover linhas onde production_date √© NULL
df_clean = df.filter(F.col("age_year").isNotNull())


# (opcional) sobrescrever a tabela com o dataset limpo
(df_clean.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("sc_gold.viaturas_2"))

In [0]:
#from pyspark.sql.functions import corr
#df.select(corr("production_year", "age_year").alias("corr")).show()

In [0]:
# Passo usado para remover coluna "production_year" vai distorcer o clustering porque est√°s a ‚Äúcontar duas vezes‚Äù o mesmo fator "age_year". Alem disso removemos "data_de_matricula","production_date" pois nao √© bom usar datas para os clusters

#  Ler a tabela
df = spark.table("sc_gold.viaturas_2")

# Remover a coluna 'production_date_dt'
df_clean = df.drop("production_year","data_de_matricula","production_date","production_date_dt","designacao_comercial", 'versao','gwms_engine')

# (opcional) sobrescrever a tabela com o dataset limpo
(df_clean.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("sc_gold.viaturas_2"))


In [0]:
# Converter age_year para inteiro
df = df.withColumn("age_year", F.col("age_year").cast(IntegerType()))

# Guardar a tabela sobrescrevendo a anterior
(df.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))

# Confirmar no DF atualizado
df.select("age_year").distinct().show(20)
df.printSchema()


In [0]:
#colocar anos em intervalos

df = df.withColumn(
    "age_interval",
    F.when(F.col("age_year") >= 20, "20+")
     .otherwise(
         F.concat(
             (F.floor(F.col("age_year") / 5) * 5).cast("string"),
             F.lit("-"),
             ((F.floor(F.col("age_year") / 5) * 5) + 4).cast("string")
         )
     )
)


(df.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
#escolher primeira palavra da coluna modelo para diminuir a granularidade desta coluna
#  Ler a tabela
df = spark.table("sc_gold.viaturas_2")

df = df.withColumn("modelo", F.split(F.col("modelo"), " ").getItem(0))

(df.write
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .saveAsTable("sc_gold.viaturas_2"))


In [0]:
#para reduzir numero de linhas
#df = df.filter(F.col("age_interval") != "20+")

#(df.write
 #  .mode("overwrite")
 #  .option("overwriteSchema", "true")
 #  .saveAsTable("sc_gold.viaturas_2"))


In [0]:
from pyspark.sql import functions as F

table_name = "sc_gold.viaturas_2"

# Load the table
df = spark.table(table_name)

# Get total rows
total_rows = df.count()

# Calculate percentage of nulls for each column
null_percentages = (
    df.select([
        (F.count(F.when(F.col(c).isNull(), c)) / total_rows * 100)
        .alias(c)
        for c in df.columns
    ])
)

display(null_percentages)

prepara√ß√£o para K-MEANS

In [0]:
table_name = "sc_gold.viaturas_2"

# Load the table
df = spark.table(table_name)

df.printSchema()
df.show(5)


In [0]:
#Transformar categorias em vari√°veis num√©ricas
from pyspark.ml.feature import StringIndexer, OneHotEncoder

#categorical_cols = ["motorizacao", "combustivel", "gwms_engine", "designacao_comercial", "versao","modelo"]
categorical_cols = ["motorizacao", "combustivel", "age_interval","modelo"]
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in categorical_cols]
encoders = [OneHotEncoder(inputCol=c+"_idx", outputCol=c+"_vec") for c in categorical_cols]


In [0]:
#Normalizar vari√°veis num√©ricas
from pyspark.ml.feature import VectorAssembler, StandardScaler

numeric_cols = ["cilindrada__cm3_", "potencia_maxima__kw_"] 


assembler_num = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric", withMean=True, withStd=True)


In [0]:
#Juntar tudo em vetor de features
feature_cols = ["scaled_numeric"] + [c+"_vec" for c in categorical_cols]
final_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import FeatureHasher, VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# 1) Simplificar categorias: usar MARCA e colapsar raros
df = df.withColumn("marca", F.split(F.trim(F.col("modelo")), " ").getItem(0))

def collapse_rare(df, col, min_count=500, new_col=None):
    new_col = new_col or f"{col}_c"
    rare = (df.groupBy(col).count().filter(F.col("count") < min_count)
              .select(col).withColumn("is_rare", F.lit(True)))
    df = df.join(rare, on=col, how="left")
    return df.withColumn(new_col, F.when(F.col("is_rare"), "OUTROS").otherwise(F.col(col))).drop("is_rare")

df = collapse_rare(df, "marca",       500, "marca_c")
df = collapse_rare(df, "motorizacao", 500, "motorizacao_c")

cat_cols = ["motorizacao_c","combustivel","age_interval","marca_c"]
num_cols = ["cilindrada__cm3_","potencia_maxima__kw_"]

# 2) Normaliza√ß√£o manual (sem StandardScalerModel)
from pyspark.sql import functions as F
stats = df.select(
    *[F.mean(c).alias(f"{c}_mean") for c in num_cols],
    *[F.stddev_samp(c).alias(f"{c}_std") for c in num_cols]
).collect()[0]
for c in num_cols:
    mu = float(stats[f"{c}_mean"]) if stats[f"{c}_mean"] is not None else 0.0
    sd = float(stats[f"{c}_std"])  if stats[f"{c}_std"] not in (None,0) else 1.0
    df = df.withColumn(f"{c}_z", (F.col(c)-F.lit(mu))/F.lit(sd))
num_z_cols = [f"{c}_z" for c in num_cols]

# 3) Hashing (sem fit) + assemble
hasher = FeatureHasher(inputCols=cat_cols, outputCol="cat_hashed", numFeatures=1024)
df_h = hasher.transform(df)
from pyspark.ml.feature import VectorAssembler
df_ready = VectorAssembler(inputCols=["cat_hashed"]+num_z_cols, outputCol="features").transform(df_h)

# 4) KMeans + Silhouette (sem PCA, leve)
FEATURES_COL = "features"
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol=FEATURES_COL, predictionCol="prediction",
                                metricName="silhouette", distanceMeasure="squaredEuclidean")

best_k, best_s = None, None
for k in range(3,9):
    m = KMeans(featuresCol=FEATURES_COL, predictionCol="prediction", k=k, seed=42, maxIter=15).fit(df_ready)
    s = evaluator.evaluate(m.transform(df_ready))
    print(f"k={k} | silhouette={s:.4f}")
    if best_s is None or s>best_s or (s==best_s and (best_k is None or k<best_k)): best_k, best_s = k, s

final = KMeans(featuresCol=FEATURES_COL, predictionCol="cluster", k=best_k, seed=42, maxIter=20).fit(df_ready)
df_clusters = final.transform(df_ready)
df_clusters.groupBy("cluster").count().orderBy("cluster").show()


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

cats = ["motorizacao_c","combustivel","age_interval","marca_c"]

# Top 10 de cada categ√≥rica por cluster, com % interna
for c in cats:
    print(f"\n=== {c} ===")
    (df_clusters
     .groupBy("cluster", c)
     .count()
     .withColumn("pct", F.col("count")/F.sum("count").over(Window.partitionBy("cluster")))
     .orderBy("cluster", F.desc("count"))
     .show(10, truncate=False))

# M√©dias/medianas num√©ricas por cluster
(df_clusters
 .groupBy("cluster")
 .agg(
     F.count("*").alias("n"),
     F.avg("cilindrada__cm3_").alias("cilindrada_avg"),
     F.expr("percentile_approx(cilindrada__cm3_, 0.5)").alias("cilindrada_med"),
     F.avg("potencia_maxima__kw_").alias("pot_kw_avg"),
     F.expr("percentile_approx(potencia_maxima__kw_, 0.5)").alias("pot_kw_med")
 )
 .orderBy("cluster")
 .show(truncate=False))


üü° Cluster 0 (50 992 viaturas, ~33%)

Resumo: maioritariamente ve√≠culos antigos, cilindrada alta, gas√≥leo, motores 1.0/1.6 T-GDI? ‚Üí parece um cluster h√≠brido entre comerciais antigos + SUVs Diesel

Dimens√£o	Observa√ß√£o
Motoriza√ß√£o	70 % ‚Äú1.0 T-GDI‚Äù (curioso ‚Äî pode ser efeito do hashing colapsado) + 1.6 T-GDI + 1.6/1.7 CRDi ‚Üí mistura de gasolina turbo e diesel
Combust√≠vel	98 % gas√≥leo (clar√≠ssimo: cluster Diesel)
Idade	65 % com 20+ anos, 16 % entre 15‚Äì19 anos ‚Üí viaturas mais velhas
Marca (modelo simplificado)	Dom√≠nio de H-1, Galloper, Getz, Santa Fe, Tucson ‚Üí Hyundai/Kia antigos e comerciais
Motor	Cilindrada m√©dia 2078 cm¬≥, mediana 2328 cm¬≥ ‚Üí motores grandes
Pot√™ncia	M√©dia ~75 kW ‚Üí diesel de baixa pot√™ncia t√≠pica de comerciais/antigos

‚û°Ô∏è Label poss√≠vel: ‚ÄúDiesel antigos / comerciais / SUV anos 2000‚Äù

üü° Cluster 1 (14 454 viaturas, ~9%)

Resumo: viaturas novas / semi-novas, pot√™ncia elevada, grande presen√ßa de el√©tricos e h√≠bridos.

Dimens√£o	Observa√ß√£o
Combust√≠vel	39 % el√©trico, 19 % gasolina, 18 % h√≠brido gasolina ‚Üí cluster mais moderno / eletrificado
Idade	67 % entre 0‚Äì4 anos, 28 % entre 5‚Äì9 anos ‚Üí viaturas recentes
Motor	Cilindrada m√©dia ~1500 cm¬≥, pot√™ncia m√©dia 138 kW (alto)
Marcas	(n√£o mostrado aqui mas deve ser Hyundai Ioniq, Kona EV, Kia EV6, etc.)

‚û°Ô∏è Label poss√≠vel: ‚ÄúNovos / eletrificados / alta pot√™ncia‚Äù

üü° Cluster 2 (79 382 viaturas, ~52%)

Resumo: viaturas pequenas, baixas pot√™ncias, idade mista mas tend√™ncia interm√©dia, gasolina dominante.

Dimens√£o	Observa√ß√£o
Cilindrada	M√©dia ~1166 cm¬≥, mediana ~1086 ‚Üí segmento pequeno
Pot√™ncia	M√©dia 66 kW ‚Üí citadinos / utilit√°rios
Combust√≠vel	(n√£o mostrado, mas deve ser gasolina/1.0 MPI/etc.)
Idade	mix, mas n√£o t√£o velho como cluster 0 nem novo como cluster 1

‚û°Ô∏è Label poss√≠vel: ‚ÄúPequenos utilit√°rios / baixa pot√™ncia / econ√≥mico‚Äù

association rules

In [0]:
spark.table("sc_gold.historico_de_servicos_2") \
     .select("descricao_servico_pos_venda") \
     .groupBy("descricao_servico_pos_venda") \
     .count() \
     .orderBy(F.desc("count")) \
     .show(50, truncate=False)


In [0]:
# ================== Imports ==================
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.fpm import FPGrowth

# ================== 0) Helpers ==================
def remove_acentos(col):
    # mapa b√°sico pt: √£√°√†√¢√§√©√®√™√´√≠√¨√Æ√Ø√µ√≥√≤√¥√∂√∫√π√ª√º√ß -> aaaaaeeeeiiiiooooouuuuc
    accents = "√£√°√†√¢√§√©√®√™√´√≠√¨√Æ√Ø√µ√≥√≤√¥√∂√∫√π√ª√º√ß√É√Å√Ä√Ç√Ñ√â√à√ä√ã√ç√å√é√è√ï√ì√í√î√ñ√ö√ô√õ√ú√á"
    no_acc  = "aaaaaeeeeiiiiooooouuuucAAAAAEEEEIIIIOOOOOUUUUC"
    return F.translate(col, accents, no_acc)

# ================== 1) Ler hist√≥rico e normalizar texto ==================
tabela_servicos = "sc_gold.historico_de_servicos_2"

df_serv = (spark.table(tabela_servicos)
    .select(
        F.col("viatura").alias("id_viatura"),
        F.lower(remove_acentos(F.col("descricao_servico_pos_venda"))).alias("serv_raw"),
        F.col("data_de_abertura").alias("data_servico")
    )
)

# remover nulos/strings vazias (importante)
df_serv = df_serv.filter(F.col("serv_raw").isNotNull() & (F.length(F.trim(F.col("serv_raw"))) > 0))

# ================== 2) Mapear para categorias t√©cnicas (servico_norm) ==================
# op√ß√£o C: t√©cnico + manuten√ß√£o preventiva

# palavras-chave ‚Üí categoria
mapping = [
    ("revisao_programada", ["revis", "manutencao", "inspecao", " km", "anos", "60.000", "45.000", "30.000", "15.000", "60 000", "45 000", "30 000", "15 000"]),
    ("oleo_motor",         ["oleo", "filtro oleo", "pack"]),
    ("travagem",           ["pastilh", "trav", "disco"]),
    ("pneus",              ["pneu"]),
    ("bateria",            ["bateria"]),
    ("ar_condicionado",    ["clima", "ar cond", "ac ", " ac", "climat"]),
    ("diagnostico_motor",  ["luz motor", "diagnos", "avaria"]),
    ("sinistro",           ["chapa", "pintur", "sinistr"]),
    ("ipo",                ["ipo"]),
    ("combustivel",        ["combust"])
]

# termos a excluir (ru√≠do administrativo/cosm√©tico)
excluir = ["lavag", "oferta", "check up", "tapete", "residu", "prepar", "estacao serv", "limpeza", "entrega", "pintura polimento", "chapas matric"]

# come√ßar coluna vazia
df_map = df_serv.withColumn("servico_norm", F.lit(None).cast("string"))

# aplicar mapeamento
for cat, kws in mapping:
    cond = F.lit(False)
    for kw in kws:
        cond = cond | F.col("serv_raw").contains(kw)
    df_map = df_map.withColumn(
        "servico_norm",
        F.when(cond, F.lit(cat)).otherwise(F.col("servico_norm"))
    )

# remover ru√≠do
cond_ruido = F.lit(False)
for kw in excluir:
    cond_ruido = cond_ruido | F.col("serv_raw").contains(kw)

df_clean = df_map.filter(~cond_ruido & F.col("servico_norm").isNotNull())

# (opcional) ver distribui√ß√£o das categorias
df_clean.groupBy("servico_norm").count().orderBy(F.desc("count")).show(50, truncate=False)

# ================== 3) Juntar clusters (assumindo df_clusters com colunas: id, cluster) ==================
# se o id nas viaturas tiver outro nome, adapta abaixo
df_clusters_sel = df_clusters.select(F.col("id").alias("id_viatura"), "cluster")

base = (df_clean
    .join(df_clusters_sel, on="id_viatura", how="inner")
    .select("id_viatura", "cluster", "servico_norm")
    .distinct()  # evita duplica√ß√µes do mesmo servi√ßo na mesma viatura
)

# ================== 4) Construir "cestos" por viatura (por cluster) ==================
basket = (base
    .groupBy("cluster", "id_viatura")
    .agg(F.collect_set("servico_norm").alias("items"))
)

print("Total de cestos:", basket.count())

# ================== 5) FP-Growth por cluster (minSupport adaptativo) ==================
def fp_growth_por_cluster(basket_df, cluster_id, min_conf=0.3, min_support_floor=0.01, min_abs=30):
    dfc = basket_df.filter(F.col("cluster")==cluster_id).select("items")
    n = dfc.count()
    if n == 0:
        print(f"Cluster {cluster_id}: sem cestos.")
        return None, None
    min_support = max(min_support_floor, min_abs / n)

    print(f"\n=== Cluster {cluster_id} | n={n} | minSupport={min_support:.4f} | minConf={min_conf} ===")
    fpg = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=min_conf)
    model = fpg.fit(dfc)

    itemsets = (model.freqItemsets
        .withColumn("support", F.col("freq")/F.lit(n))
        .orderBy(F.desc("freq")))

    # acrescentar lift manualmente (confidence / consequentSupport)
    rules = (model.associationRules
        .withColumn("lift", F.col("confidence")/F.col("consequentSupport"))
        .orderBy(F.desc("lift"), F.desc("confidence")))

    return itemsets, rules

# correr para todos os clusters
clusters = [r["cluster"] for r in df_clusters.select("cluster").distinct().collect()]
resultados = {}
for k in sorted(clusters):
    its, rls = fp_growth_por_cluster(basket, k, min_conf=0.30, min_support_floor=0.01, min_abs=30)
    resultados[k] = (its, rls)
    if its is not None:
        print("\n--- Frequent itemsets ---")
        its.show(20, truncate=False)
        print("\n--- Association rules ---")
        rls.select("antecedent","consequent","confidence","lift","support").show(20, truncate=False)

# ================== 6) (Opcional) guardar resultados ==================
salvar = False
if salvar:
    for k, (its, rls) in resultados.items():
        if its is not None:
            its.write.mode("overwrite").parquet(f"/tmp/fp_itemsets_cluster_{k}")
            rls.write.mode("overwrite").parquet(f"/tmp/fp_rules_cluster_{k}")


In [0]:
# ================== Imports ==================
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.fpm import FPGrowth

# ================== Helpers ==================
def remove_acentos(col):
    # Mapa PT/BR comum (mai√∫sculas inclu√≠das)
    accents = "√£√°√†√¢√§√©√®√™√´√≠√¨√Æ√Ø√µ√≥√≤√¥√∂√∫√π√ª√º√ß√É√Å√Ä√Ç√Ñ√â√à√ä√ã√ç√å√é√è√ï√ì√í√î√ñ√ö√ô√õ√ú√á"
    no_acc  = "aaaaaeeeeiiiiooooouuuucAAAAAEEEEIIIIOOOOOUUUUC"
    return F.translate(col, accents, no_acc)

# ================== 1) Ler hist√≥rico e normalizar ==================
tabela_servicos = "sc_gold.historico_de_servicos_2"
df_serv = (spark.table(tabela_servicos)
    .select(
        F.col("viatura").alias("id_viatura"),
        F.lower(remove_acentos(F.col("descricao_servico_pos_venda"))).alias("serv_raw"),
        F.col("data_de_abertura").alias("data_servico")
    )
    .filter(F.col("serv_raw").isNotNull() & (F.length(F.trim(F.col("serv_raw"))) > 0))
)

# ================== 2) Mapear para categorias t√©cnicas (servico_norm) ==================
# Op√ß√£o C: t√©cnico + manuten√ß√£o preventiva
mapping = [
    ("revisao_programada", ["revis", "manutencao", "inspecao", " km", "anos", "60.000", "45.000", "30.000", "15.000", "60 000", "45 000", "30 000", "15 000"]),
    ("oleo_motor",         ["oleo", "filtro oleo", "pack"]),
    ("travagem",           ["pastilh", "trav", "disco"]),
    ("pneus",              ["pneu"]),
    ("bateria",            ["bateria"]),
    ("ar_condicionado",    ["clima", "ar cond", " ac", "ac ", "climat"]),
    ("diagnostico_motor",  ["luz motor", "diagnos", "avaria"]),
    ("sinistro",           ["chapa", "pintur", "sinistr"]),
    ("ipo",                ["ipo"]),
    ("combustivel",        ["combust"])
]

# termos a excluir (ru√≠do administrativo/cosm√©tico)
excluir = ["lavag", "oferta", "check up", "tapete", "residu", "prepar", "estacao serv", "limpeza", "entrega", "pintura polimento", "chapas matric"]

df_map = df_serv.withColumn("servico_norm", F.lit(None).cast("string"))

for cat, kws in mapping:
    cond = F.lit(False)
    for kw in kws:
        cond = cond | F.col("serv_raw").contains(kw)
    df_map = df_map.withColumn("servico_norm", F.when(cond, F.lit(cat)).otherwise(F.col("servico_norm")))

cond_ruido = F.lit(False)
for kw in excluir:
    cond_ruido = cond_ruido | F.col("serv_raw").contains(kw)

df_clean = df_map.filter(~cond_ruido & F.col("servico_norm").isNotNull())

# (opcional) ver distribui√ß√£o das categorias
df_clean.groupBy("servico_norm").count().orderBy(F.desc("count")).show(50, truncate=False)

# ================== 3) Juntar clusters (ajusta colunas se necess√°rio) ==================
# df_clusters deve existir da fase de clustering e conter: id (viatura), cluster
df_clusters_sel = df_clusters.select(F.col("id").alias("id_viatura"), "cluster")

base = (df_clean
    .join(df_clusters_sel, on="id_viatura", how="inner")
    .select("id_viatura", "cluster", "servico_norm")
    .distinct()  # evita duplica√ß√£o do mesmo servi√ßo na mesma viatura
)

# ================== 4) Construir baskets por viatura (por cluster) ==================
basket = (base
    .groupBy("cluster", "id_viatura")
    .agg(F.collect_set("servico_norm").alias("items"))
)

print("Total de cestos:", basket.count())

# ================== 5) FP-Growth por cluster (minSupport adaptativo) ==================
def fp_growth_por_cluster(basket_df, cluster_id, min_conf=0.3, min_support_floor=0.01, min_abs=30):
    dfc = basket_df.filter(F.col("cluster")==cluster_id).select("items")
    n = dfc.count()
    if n == 0:
        print(f"Cluster {cluster_id}: sem cestos.")
        return None, None
    min_support = max(min_support_floor, min_abs / n)
    print(f"\n=== Cluster {cluster_id} | n={n} | minSupport={min_support:.4f} | minConf={min_conf} ===")

    fpg = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=min_conf)
    model = fpg.fit(dfc)

    itemsets = (model.freqItemsets
        .withColumn("support", F.col("freq")/F.lit(n))
        .orderBy(F.desc("freq")))

    rules = (model.associationRules
        .withColumn("lift", F.col("confidence")/F.col("consequentSupport"))
        .orderBy(F.desc("lift"), F.desc("confidence")))

    return itemsets, rules

clusters = [r["cluster"] for r in df_clusters.select("cluster").distinct().collect()]
resultados = {}
for k in sorted(clusters):
    its, rls = fp_growth_por_cluster(basket, k, min_conf=0.30, min_support_floor=0.01, min_abs=30)
    resultados[k] = (its, rls)
    if its is not None:
        print("\n--- Frequent itemsets ---")
        its.show(20, truncate=False)
        print("\n--- Association rules ---")
        rls.select("antecedent","consequent","confidence","lift","support").show(20, truncate=False)

# ================== 6) (Opcional) guardar resultados ==================
# salvar = True
# if salvar:
#     for k, (its, rls) in resultados.items():
#         if its is not None:
#             its.write.mode("overwrite").parquet(f"/tmp/fp_itemsets_cluster_{k}")
#             rls.write.mode("overwrite").parquet(f"/tmp/fp_rules_cluster_{k}")


FALHAS

In [0]:
#Construir pipeline
from pyspark.ml import Pipeline

stages = []
stages += indexers
stages += encoders
stages += [assembler_num, scaler, final_assembler]

pipeline = Pipeline(stages=stages)
df_ready = pipeline.fit(df).transform(df)

In [0]:
#Elbow Method
from pyspark.ml.clustering import KMeans
import matplotlib.pyplot as plt

costs = []
Ks = range(2, 11)

for k in Ks:
    km = KMeans(featuresCol="features", k=k, seed=42)
    model = km.fit(df_ready)
    # custo = soma das dist√¢ncias ao centro (in√©rcia)
    cost = model.summary.trainingCost
    costs.append(cost)

plt.plot(Ks, costs, marker="o")
plt.xlabel("k (n¬∫ clusters)")
plt.ylabel("In√©rcia (trainingCost)")
plt.title("M√©todo do Cotovelo")
plt.show()


In [0]:
#Silhouette Score
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction",
                                metricName="silhouette", distanceMeasure="squaredEuclidean")

results = []
for k in Ks:
    km = KMeans(featuresCol="features", k=k, seed=42)
    model = km.fit(df_ready)
    preds = model.transform(df_ready)
    score = evaluator.evaluate(preds)
    results.append(score)
    print(f"k={k}, silhouette={score:.4f}")

plt.plot(Ks, results, marker="o")
plt.xlabel("k (n¬∫ clusters)")
plt.ylabel("Silhouette")
plt.title("Silhouette vs k")
plt.show()

In [0]:
# Avaliar k por Elbow (trainingCost) e Silhouette e treinar modelo final

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import functions as F
import gc

# 0) Prepara√ß√£o: df_ready tem de ter a coluna 'features'
assert "features" in df_ready.columns, "df_ready precisa da coluna 'features'."

Ks = list(range(2, 11))  # ajusta se quiseres
results = []

# Usar a coluna default 'prediction' para evitar erros no evaluator
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette",
    distanceMeasure="squaredEuclidean"
)

for k in Ks:
    km = KMeans(featuresCol="features", k=k, seed=42)  # predictionCol default = 'prediction'
    model = km.fit(df_ready)
    preds = model.transform(df_ready)
    
    # Elbow: in√©rcia (soma das dist√¢ncias ao centro)
    inertia = model.summary.trainingCost
    
    # Silhouette
    sil = evaluator.evaluate(preds)
    
    results.append((k, inertia, sil))
    print(f"k={k:2d} | inertia={inertia:.2f} | silhouette={sil:.4f}")
    
    # libertar cache de modelos/DFs (Spark Connect)
    del model, preds
    gc.collect()

# 1) Tabela de resultados
res_df = spark.createDataFrame(results, schema=["k", "inertia", "silhouette"]) \
              .orderBy("k")
res_df.show(truncate=False)

# 2) Escolher k: m√°ximo silhouette (em caso de empate, menor k)
row_best = res_df.orderBy(F.col("silhouette").desc(), F.col("k").asc()).first()
best_k = row_best["k"]
best_s = row_best["silhouette"]
print(f"\n>> Melhor k pelo Silhouette: k={best_k} (silhouette={best_s:.4f})")

# (Opcional) Se tamb√©m quiseres um "cotovelo" heur√≠stico simples:
# calcula a maior queda relativa de in√©rcia
from pyspark.sql.window import Window
w = Window.orderBy("k")
elbow_df = (res_df
    .withColumn("inertia_prev", F.lag("inertia").over(w))
    .withColumn("drop", (F.col("inertia_prev") - F.col("inertia"))/F.col("inertia_prev"))
)
elbow_row = elbow_df.orderBy(F.col("drop").desc_nulls_last()).first()
elbow_k = elbow_row["k"] if elbow_row and elbow_row["drop"] is not None else None
print(f">> Sugerido pelo 'cotovelo' (heur√≠stico): k={elbow_k}")

# 3) Treinar KMeans final com o k escolhido (Silhouette)
final_kmeans = KMeans(featuresCol="features", predictionCol="cluster", k=best_k, seed=42)
final_model = final_kmeans.fit(df_ready)
df_clusters = final_model.transform(df_ready)

# 4) Distribui√ß√£o por cluster
df_clusters.groupBy("cluster").count().orderBy("cluster").show()

# 5) (Opcional) guardar resultados
#df_clusters.write.mode("overwrite").saveAsTable("sc_gold.viaturas_2_clusters")

In [0]:
from pyspark.ml.feature import PCA
import matplotlib.pyplot as plt
import pandas as pd

# 1) Reduzir features a 2 dimens√µes para visualiza√ß√£o
pca = PCA(k=2, inputCol="features", outputCol="pca2d")
pca_model = pca.fit(df_clusters)
df_pca = pca_model.transform(df_clusters).select("pca2d", "cluster")

# 2) Converter para Pandas para plotar
pdf = df_pca.toPandas()
pdf[["x", "y"]] = pdf["pca2d"].apply(lambda v: pd.Series([float(v[0]), float(v[1])]))

# 3) Desenhar gr√°fico
plt.figure(figsize=(8,6))
for c in sorted(pdf["cluster"].unique()):
    subset = pdf[pdf["cluster"] == c]
    plt.scatter(subset["x"], subset["y"], label=f"Cluster {c}", alpha=0.6)

plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.title("Clusters K-Means (proje√ß√£o PCA 2D)")
plt.legend()
plt.show()


**# dbscan nao consegui por causa da cache **

In [0]:
# apaga objetos de modelos j√° criados nesta sess√£o
for name in ["model", "pca_model", "pca_db_model", "pca_2d_model"]:
    if name in globals():
        del globals()[name]

import gc; gc.collect()


In [0]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("dbscan-viaturas")
         .config("spark.connect.ml.cache.size", str(2 * 1024 * 1024 * 1024))  # 2 GB
         .getOrCreate())


_**Association Rules / Market Basket Analysis**_

In [0]:
from pyspark.sql import functions as F

# Se o teu DF com o resultado do k-means/DBSCAN se chama df_clusters:
# certificar que tem as colunas id e cluster
if "cluster" in df_clusters.columns:
    df_id_cluster = df_clusters.select("id", F.col("cluster").cast("int").alias("cluster"))
elif "prediction" in df_clusters.columns:
    df_id_cluster = df_clusters.select("id", F.col("prediction").cast("int").alias("cluster"))
else:
    raise ValueError("O DF de clusters tem de ter 'cluster' ou 'prediction' e a coluna 'id'.")

# (opcional) guardar numa tabela para uso posterior
df_id_cluster.write.mode("overwrite").saveAsTable("sc_gold.viaturas_clusters")


In [0]:
#Limpar nulos no servi√ßo
df_hist = spark.table("sc_gold.historico_de_servicos_2")

# Usa o c√≥digo do servi√ßo se for mais est√°vel que a descri√ß√£o
# Aqui vou usar descricao_servico_pos_venda
df_hist = df_hist.filter(df_hist.descricao_servico_pos_venda.isNotNull())


In [0]:
# join: viatura (hist√≥rico) ‚Üî id (clusters)
df_join = df_hist.join(df_clusters, df_hist.viatura == df_clusters.id, how="inner")


# tenta usar uma chave de visita se existir na tabela
cols = df_join.columns
visit_key = None
for candidate in ["numero_do_servico_pos_venda","canal_de_venda","tipo_de_servico"]:
    if candidate in cols:
        visit_key = candidate
        break

group_cols = ["id","cluster"] + ([visit_key] if visit_key else [])

baskets = (df_join
    .groupBy(*group_cols)
    .agg(F.collect_set("descricao_servico_pos_venda").alias("items"))
    .filter(F.size("items") > 0))

# ver formato
baskets.show(truncate=False)


In [0]:
#nao sucedi cache offloading is enabled
from pyspark.sql import functions as F
from pyspark.ml.fpm import FPGrowth

# -------- Par√¢metros --------
TABLE_HIST = "sc_gold.historico_de_servicos_2"
TABLE_CLUST = "sc_gold.viaturas_clusters"   # (id, cluster)
MIN_SUP = 0.01
MIN_CONF = 0.20
TOP_N = 30

# -------- 1) Hist√≥rico -> baskets --------
df_hist = spark.table(TABLE_HIST)

# item: usa a DESCRI√á√ÉO (o n√∫mero √© a chave do cesto)
item_col = F.col("descricao_servico_pos_venda").alias("item_raw")

df_hist = (df_hist
           .select("*", item_col)
           .filter(F.col("viatura").isNotNull())
           .filter(F.col("numero_do_servico_pos_venda").isNotNull())
           .filter(F.col("item_raw").isNotNull()))

# clusters (id -> cluster)
df_clu = spark.table(TABLE_CLUST).select("id","cluster")

# join: viatura (hist√≥rico) ‚Üî id (clusters)
df_join = df_hist.join(df_clu, df_hist.viatura == df_clu.id, how="inner").drop(df_clu.id)

# CHAVE DE CESTO = numero_do_servico_pos_venda
visit_key = "numero_do_servico_pos_venda"

group_cols = ["viatura","cluster", visit_key]

baskets = (df_join
           .groupBy(*group_cols)
           .agg(F.collect_set("item_raw").alias("items"))
           .filter(F.size("items") > 0))

# -------- 2) FP-Growth por cluster --------
clusters = [r["cluster"] for r in baskets.select("cluster").distinct().collect()]
results = []

for c in clusters:
    b_c = baskets.filter(F.col("cluster")==c).select("items")
    n_trans = b_c.count()
    if n_trans == 0:
        continue

    # >>> CORRIGIDO: n√£o usamos predictionCol=None <<<
    fp = FPGrowth(minSupport=MIN_SUP,
                  minConfidence=MIN_CONF,
                  itemsCol="items")   # usa default predictionCol="prediction"
    model = fp.fit(b_c)

    rules = model.associationRules      # antecedent, consequent, confidence
    freq  = model.freqItemsets          # items, freq

    # suportes e lift
    sup_all = freq.withColumn("items_str", F.array_join("items","|"))
    sup_ab = sup_all.select(F.col("items_str").alias("ab_str"),
                            (F.col("freq")/F.lit(n_trans)).alias("support_ab"))
    sup_a  = sup_all.select(F.col("items_str").alias("a_str"),
                            (F.col("freq")/F.lit(n_trans)).alias("support_a"))
    sup_b  = sup_all.select(F.col("items_str").alias("b_str"),
                            (F.col("freq")/F.lit(n_trans)).alias("support_b"))

    rules_aug = (rules
        .withColumn("antecedent_str", F.array_join("antecedent","|"))
        .withColumn("consequent_str", F.array_join("consequent","|"))
        .join(sup_ab, F.concat_ws("|","antecedent_str","consequent_str")==F.col("ab_str"), "left")
        .join(sup_a, F.col("antecedent_str")==F.col("a_str"), "left")
        .join(sup_b, F.col("consequent_str")==F.col("b_str"), "left")
        .withColumn("lift", F.col("support_ab")/(F.col("support_a")*F.col("support_b")))
        .withColumn("cluster", F.lit(c))
        .select("cluster","antecedent","consequent",
                "support_a","support_b","support_ab","confidence","lift"))

    rules_top = (rules_aug
        .filter(F.size("consequent")==1)
        .orderBy(F.col("lift").desc(), F.col("support_ab").desc())
        .limit(TOP_N))

    results.append(rules_top)

rules_all = None
if results:
    rules_all = results[0]
    for r in results[1:]:
        rules_all = rules_all.unionByName(r)

# -------- 3) Mostrar --------
if rules_all is None or rules_all.rdd.isEmpty():
    print("Sem regras para os par√¢metros atuais. Ajusta MIN_SUP/MIN_CONF.")
else:
    rules_all.show(truncate=False)


In [0]:
%pip install mlxtend



In [0]:
#usar python (nao consegui correr)
from pyspark.sql import functions as F

# -------- Par√¢metros --------
TABLE_HIST = "sc_gold.historico_de_servicos_2"
TABLE_CLUST = "sc_gold.viaturas_clusters"   # (id, cluster)
MIN_SUP = 0.01
MIN_CONF = 0.20
TOP_N = 30

# -------- 1) Hist√≥rico -> baskets --------
df_hist = spark.table(TABLE_HIST)

# item: usa a DESCRI√á√ÉO
item_col = F.col("descricao_servico_pos_venda").alias("item_raw")

df_hist = (df_hist
           .select("*", item_col)
           .filter(F.col("viatura").isNotNull())
           .filter(F.col("numero_do_servico_pos_venda").isNotNull())
           .filter(F.col("item_raw").isNotNull()))

# clusters (id -> cluster)
df_clu = spark.table(TABLE_CLUST).select("id","cluster")

# join: viatura (hist√≥rico) ‚Üî id (clusters)
df_join = df_hist.join(df_clu, df_hist.viatura == df_clu.id, how="inner").drop(df_clu.id)

# CHAVE DE CESTO = numero_do_servico_pos_venda
visit_key = "numero_do_servico_pos_venda"

group_cols = ["viatura","cluster", visit_key]

baskets = (df_join
           .groupBy(*group_cols)
           .agg(F.collect_set("item_raw").alias("items"))
           .filter(F.size("items") > 0))

# -------- 2) Converter para Pandas --------
baskets_pd = baskets.toPandas()
print("Formato dos cestos:", baskets_pd.head())

# -------- 3) Apriori com mlxtend --------
!pip install mlxtendfrom mlxtend.preprocessing import TransactionEncoder

from mlxtend.frequent_patterns import apriori, association_rules

results = []

for c in sorted(baskets_pd["cluster"].unique()):
    basket_c = baskets_pd[baskets_pd["cluster"] == c]["items"].tolist()
    if not basket_c:
        continue

    # one-hot encoding
    te = TransactionEncoder()
    te_ary = te.fit(basket_c).transform(basket_c)
    df_items = pd.DataFrame(te_ary, columns=te.columns_)

    # Apriori
    freq = apriori(df_items, min_support=MIN_SUP, use_colnames=True)

    # Regras de associa√ß√£o
    rules = association_rules(freq, metric="confidence", min_threshold=MIN_CONF)
    rules["cluster"] = c

    # Ordenar por lift e limitar
    rules_top = rules.sort_values(["lift","support"], ascending=[False,False]).head(TOP_N)
    results.append(rules_top)

# -------- 4) Concatenar resultados --------
import pandas as pd
if results:
    rules_all = pd.concat(results, ignore_index=True)
    print(rules_all[["cluster","antecedents","consequents","support","confidence","lift"]])
else:
    print("Sem regras para os par√¢metros definidos.")


In [0]:
#usar PANDAS PQ NAO DEU OS ULTIMOS 2 BLOCOS
import pandas as pd
from itertools import combinations

def apriori_from_baskets(baskets, min_support=0.01):
    n_trans = len(baskets)
    # Contar frequ√™ncias de itens √∫nicos
    item_counts = {}
    for basket in baskets:
        for item in set(basket):
            item_counts[item] = item_counts.get(item, 0) + 1
    freq_items = {frozenset([k]): v/n_trans for k,v in item_counts.items() if v/n_trans >= min_support}

    # Contar pares
    pair_counts = {}
    for basket in baskets:
        for a,b in combinations(set(basket), 2):
            pair = frozenset([a,b])
            pair_counts[pair] = pair_counts.get(pair, 0) + 1
    freq_pairs = {k:v/n_trans for k,v in pair_counts.items() if v/n_trans >= min_support}

    return freq_items, freq_pairs

# Exemplo de uso (baskets_pd j√° criado antes)
baskets_cluster0 = baskets_pd[baskets_pd["cluster"]==0]["items"].tolist()
freq1, freq2 = apriori_from_baskets(baskets_cluster0, min_support=0.01)

print("Itens frequentes:", freq1)
print("Pares frequentes:", freq2)


In [0]:
#calcular confidence e lift
from itertools import combinations
import pandas as pd

def apriori_from_baskets(baskets, min_support=0.01):
    n_trans = len(baskets)
    # Contar frequ√™ncias de itens √∫nicos
    item_counts = {}
    for basket in baskets:
        for item in set(basket):
            item_counts[item] = item_counts.get(item, 0) + 1
    freq_items = {frozenset([k]): v/n_trans for k,v in item_counts.items() if v/n_trans >= min_support}

    # Contar pares
    pair_counts = {}
    for basket in baskets:
        for a,b in combinations(set(basket), 2):
            pair = frozenset([a,b])
            pair_counts[pair] = pair_counts.get(pair, 0) + 1
    freq_pairs = {k:v/n_trans for k,v in pair_counts.items() if v/n_trans >= min_support}

    return freq_items, freq_pairs

def generate_rules(freq_items, freq_pairs):
    rules = []
    for pair, sup_ab in freq_pairs.items():   # pares frequentes
        items = list(pair)
        for i in range(2):
            A = frozenset([items[i]])
            B = frozenset([items[1-i]])
            sup_a = freq_items.get(A, 0)
            sup_b = freq_items.get(B, 0)

            if sup_a > 0 and sup_b > 0:
                conf = sup_ab / sup_a
                lift = sup_ab / (sup_a * sup_b)
                rules.append({
                    "antecedent": list(A),
                    "consequent": list(B),
                    "support_ab": sup_ab,
                    "support_a": sup_a,
                    "support_b": sup_b,
                    "confidence": conf,
                    "lift": lift
                })
    return pd.DataFrame(rules)

# --- Exemplo com os cestos de um cluster ---
baskets_cluster0 = baskets_pd[baskets_pd["cluster"]==0]["items"].tolist()

freq1, freq2 = apriori_from_baskets(baskets_cluster0, min_support=0.01)
rules_df = generate_rules(freq1, freq2)

print(rules_df.sort_values("lift", ascending=False).head(10))


In [0]:
#calcular confidence e lift (outra forma)
from itertools import combinations
import pandas as pd

COLS = ["cluster","antecedent","consequent",
        "support_ab","support_a","support_b","confidence","lift"]

def apriori_from_baskets(baskets, min_support=0.005):
    n_trans = len(baskets)
    if n_trans == 0:
        return {}, {}
    # 1-itemsets
    item_counts = {}
    for basket in baskets:
        for item in set(basket):
            item_counts[item] = item_counts.get(item, 0) + 1
    freq_items = {frozenset([k]): v/n_trans for k,v in item_counts.items() if v/n_trans >= min_support}
    # 2-itemsets
    pair_counts = {}
    for basket in baskets:
        for a,b in combinations(set(basket), 2):
            pair = frozenset([a,b])
            pair_counts[pair] = pair_counts.get(pair, 0) + 1
    freq_pairs = {k:v/n_trans for k,v in pair_counts.items() if v/n_trans >= min_support}
    return freq_items, freq_pairs

def generate_rules_df(freq_items, freq_pairs, cluster_id=None):
    # garante DataFrame com colunas mesmo se vazio
    rows = []
    for pair, sup_ab in freq_pairs.items():
        items = list(pair)
        for i in range(2):
            A = frozenset([items[i]])
            B = frozenset([items[1-i]])
            sup_a = freq_items.get(A, 0.0)
            sup_b = freq_items.get(B, 0.0)
            if sup_a > 0 and sup_b > 0:
                conf = sup_ab / sup_a
                lift = sup_ab / (sup_a * sup_b)
                rows.append({
                    "cluster": cluster_id,
                    "antecedent": list(A),
                    "consequent": list(B),
                    "support_ab": sup_ab,
                    "support_a": sup_a,
                    "support_b": sup_b,
                    "confidence": conf,
                    "lift": lift
                })
    return pd.DataFrame(rows, columns=COLS)

# ---- Minerar para todos os clusters (usando baskets_pd j√° criado) ----
def mine_all_clusters(baskets_pd, min_support=0.005, top_n=30, min_conf=None):
    all_rules = []
    for c in sorted(baskets_pd["cluster"].unique()):
        baskets_c = baskets_pd[baskets_pd["cluster"] == c]["items"].tolist()
        freq1, freq2 = apriori_from_baskets(baskets_c, min_support=min_support)
        df_rules = generate_rules_df(freq1, freq2, cluster_id=c)

        # (opcional) filtrar por confidence m√≠nima
        if min_conf is not None and not df_rules.empty:
            df_rules = df_rules[df_rules["confidence"] >= min_conf]

        # ordenar com seguran√ßa
        if not df_rules.empty:
            df_rules = df_rules.sort_values(["lift","support_ab"], ascending=[False, False]).head(top_n)
        all_rules.append(df_rules)

    # concatenar garantindo colunas mesmo que todos vazios
    if len(all_rules) == 0:
        return pd.DataFrame(columns=COLS)
    out = pd.concat(all_rules, ignore_index=True) if any([not df.empty for df in all_rules]) else pd.DataFrame(columns=COLS)
    return out

# ---- Executar ----
rules_all = mine_all_clusters(baskets_pd, min_support=0.01, top_n=30, min_conf=0.2)

if rules_all.empty:
    print("Sem regras com os par√¢metros atuais. Tenta diminuir min_support (ex.: 0.005) e/ou min_conf.")
else:
    print(rules_all.head(20))
