In [None]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import regexp_extract
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor

In [None]:
# Lê todos os arquivos parquet no bucket S3 e visualiza o DataFrame. 
file_type = "parquet"
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
s3_bucket_name = "projeto-puc"
s3_folder_path = "ref_db/"

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(f"s3://{s3_bucket_name}/{s3_folder_path}")

In [None]:
# verificando a qualidade dos dados e anomalias da coluna Tempo_Resposta
tempo_resposta_counts = df.groupBy("Tempo_Resposta").agg(count("*").alias("Quantidade_registros")).orderBy("Quantidade_registros")

# excluindo os dados da coluna Tempo_Reposta com anomalias.
tempo_resposta_counts_filtered = tempo_resposta_counts.filter("Quantidade_registros > 1")
df = df.filter(df["Tempo_Resposta"].isin(tempo_resposta_counts_filtered.select("Tempo_Resposta").rdd.flatMap(lambda x: x).collect()))

In [None]:
#verificando dados nulos ou NaN
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ["Tempo_Resposta", "Nota_Consumidor", "Comprou_Contratou"]]).show()

+--------------+---------------+-----------------+
|Tempo_Resposta|Nota_Consumidor|Comprou_Contratou|
+--------------+---------------+-----------------+
|             0|              0|                0|
+--------------+---------------+-----------------+



In [None]:
# Transforma a coluna "Comprou_Contratou" em numérica
# Define um dicionário com as categorias e seus respectivos números sequenciais

categorias = {
    "Stand, feiras e eventos": 0,
    "Catálogo": 1,
    "Ganhei de presente": 2,
    "SMS / Mensagem de texto": 3,
    "Domicílio": 4,
    "Telefone": 5,
    "Loja física": 6,
    "Contratei": 7,
    "Internet": 8
}

df = df.withColumn("Comprou_Contratou", when(col("Comprou_Contratou") == "Sim", 1).otherwise(0))
for categoria, numero in categorias.items():
    df = df.withColumn("Categoria_" + str(numero), when(col("Comprou_Contratou") == categoria, 1).otherwise(0))


In [None]:
# Conversão do tipo string para interger na coluna Tempo_Resposta
df = df.withColumn('Tempo_Resposta', regexp_extract(df['Tempo_Resposta'], '\d+', 0).cast('integer'))
df = df.withColumn('Nota_Consumidor', regexp_extract(df['Nota_Consumidor'], '\d+', 0).cast('integer'))


In [None]:
# Seleciona as colunas relevantes e filtra apenas as situacoes finalizadas
data = df.select("Tempo_Resposta", *["Categoria_" + str(numero) for numero in range(len(categorias))], "Nota_Consumidor").filter(col("Situação") == "Finalizada avaliada")

#### Radom Forest

In [None]:
# Define as informações do experimento e inicia o MLflow
mlflow.set_experiment("/Users/lbragalopes@gmail.com/ProjetoPuc")
mlflow.start_run()

# Transforma as colunas em um vetor de features
assembler = VectorAssembler(inputCols=["Tempo_Resposta", *["Categoria_" + str(numero) for numero in range(len(categorias))]], outputCol="features")

# Cria o modelo de regressão
rf = RandomForestRegressor(featuresCol="features", labelCol="Nota_Consumidor", numTrees=10, maxDepth=5)

# Cria o pipelines com o assembler e cada modelo
rf_pipeline = Pipeline(stages=[assembler, rf])


# Divide os dados em treino e teste
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Treina o modelo
rf_model = rf_pipeline.fit(trainingData)

# Faz as previsões no conjunto de teste
rf_predictions = rf_model.transform(testData)

# Avalia o modelo com as métricas MSE e RMSE
evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="mse")
rf_mse = evaluator.evaluate(rf_predictions)
print("MSE: %.3f" % rf_mse)

evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="rmse")
rf_rmse = evaluator.evaluate(rf_predictions)
print("RMSE: %.3f" % rf_rmse)

# Loga os parâmetros e métricas no MLflow
mlflow.log_param("rf_numTrees", rf.getNumTrees())
mlflow.log_param("rf_maxDepth", rf.getMaxDepth())
mlflow.log_metric("rf_mse", rf_mse)
mlflow.log_metric("rf_rmse", rf_rmse)

# Salva o modelo no MLflow
mlflow.spark.log_model(rf_model, "random_forest")

# Finaliza o MLflow
mlflow.end_run()


MSE: 3.067
RMSE: 1.751


2023/05/14 21:44:44 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


#### Regression Linear

In [None]:
# Define as informações do experimento e inicia o MLflow
mlflow.set_experiment("/Users/lbragalopes@gmail.com/ProjetoPuc")
mlflow.start_run()

# Transforma as colunas em um vetor de features
assembler = VectorAssembler(inputCols=["Tempo_Resposta", *["Categoria_" + str(numero) for numero in range(len(categorias))]], outputCol="features")

# Cria o modelo de regressão
lr = LinearRegression(featuresCol="features", labelCol="Nota_Consumidor", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Cria os pipelines com o assembler e cada modelo
lr_pipeline = Pipeline(stages=[assembler, lr])

# Divide os dados em treino e teste
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Treina o modelo
lr_model = lr_pipeline.fit(trainingData)

# Faz as previsões no conjunto de teste
lr_predictions = lr_model.transform(testData)

# Avalia o modelo com as métricas MSE 
evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="mse")
lr_mse = evaluator.evaluate(lr_predictions)
print("MSE: %.3f" % lr_mse)

# Avalia o modelo com as métricas RMSE 
evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="rmse")
lr_rmse = evaluator.evaluate(lr_predictions)
print("RMSE: %.3f" % lr_rmse)

# Loga os parâmetros e métricas no MLflow
mlflow.log_param("lr_maxIter", lr.getMaxIter())
mlflow.log_param("lr_regParam", lr.getRegParam())
mlflow.log_param("lr_elasticNetParam", lr.getElasticNetParam())
mlflow.log_metric("lr_mse", lr_mse)
mlflow.log_metric("lr_rmse", lr_rmse)

# Salva o modelo no MLflow
mlflow.spark.log_model(lr_model, "regressao_linear")

# Finaliza o MLflow
mlflow.end_run()

MSE: 3.091
RMSE: 1.758


2023/05/14 21:49:08 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


#### Gradient Boosting Machine (GBM)

In [None]:
mlflow.set_experiment("/Users/lbragalopes@gmail.com/ProjetoPuc")
mlflow.start_run()

# Transforma as colunas em um vetor de features
assembler = VectorAssembler(inputCols=["Tempo_Resposta", *["Categoria_" + str(numero) for numero in range(len(categorias))]], outputCol="features")

# Cria o modelo de regressão
gbt = GBTRegressor(featuresCol="features", labelCol="Nota_Consumidor", maxIter=10, maxDepth=5)

# Cria os pipelines com o assembler e cada modelo
gbt_pipeline = Pipeline(stages=[assembler, gbt])

# Divide os dados em treino e teste
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Treina o modelo
gbt_model = gbt_pipeline.fit(trainingData)

# Faz as previsões no conjunto de teste
gbt_predictions = gbt_model.transform(testData)

# Avalia os modelos com as métricas MSE e RMSE
evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="mse")
gbt_mse = evaluator.evaluate(gbt_predictions)
print("MSE: %.3f" % gbt_mse)

evaluator = RegressionEvaluator(labelCol="Nota_Consumidor", predictionCol="prediction", metricName="rmse")
gbt_rmse = evaluator.evaluate(gbt_predictions)
print("MSE: %.3f" % gbt_rmse)

# Loga os parâmetros e métricas no MLflow
mlflow.log_param("gbt_maxIter", gbt.getMaxIter())
mlflow.log_param("gbt_regParam", gbt.getMaxDepth())
mlflow.log_metric("gbt_mse", gbt_mse)
mlflow.log_metric("gbt_rmse", gbt_rmse)

# Salva o modelo no MLflow
mlflow.spark.log_model(gbt_model, "gbt")

# Finaliza o MLflow
mlflow.end_run()

MSE: 3.044
MSE: 1.745


2023/05/14 21:55:47 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
