In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Imputer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import col

In [3]:
# Iniciar sessão Spark
spark = SparkSession.builder \
    .appName("Pipeline de Regressão GBT - Apenas Numéricos") \
    .master("local[*]") \
    .getOrCreate()

# Carregar dados
df = spark.read.csv("used_cars_data.csv", header=True, inferSchema=True)

# Imprimir o schema para ver quais colunas estão disponíveis
print("Esquema do DataFrame Original:")
df.printSchema()


Original DataFrame Schema:
root
 |-- vin: string (nullable = true)
 |-- back_legroom: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bed_height: string (nullable = true)
 |-- bed_length: string (nullable = true)
 |-- body_type: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_fuel_economy: string (nullable = true)
 |-- combine_fuel_economy: string (nullable = true)
 |-- daysonmarket: string (nullable = true)
 |-- dealer_zip: string (nullable = true)
 |-- description: string (nullable = true)
 |-- engine_cylinders: string (nullable = true)
 |-- engine_displacement: string (nullable = true)
 |-- engine_type: string (nullable = true)
 |-- exterior_color: string (nullable = true)
 |-- fleet: string (nullable = true)
 |-- frame_damaged: string (nullable = true)
 |-- franchise_dealer: string (nullable = true)
 |-- franchise_make: string (nullable = true)
 |-- front_legroom: string (nullable = true)
 |-- fuel_tank_vo

In [4]:
# 1. Converter franchise_dealer e is_new para 1 e 0
df = df.withColumn("franchise_dealer", col("franchise_dealer").cast("int"))
if "is_new" in df.columns:
    df = df.withColumn("is_new", col("is_new").cast("int"))

# 2. Remover colunas indesejadas, se existirem
colunas_para_remover = ["is_certified", "listing_id", "sp_id", "vehicle_damage_category", "combine_fuel_economy"]
colunas_existentes_para_remover = [c for c in colunas_para_remover if c in df.columns]
if colunas_existentes_para_remover:
    df = df.drop(*colunas_existentes_para_remover)

# Obter a lista real de colunas no DataFrame
colunas_atuais = df.columns
print("\nColunas atuais no DataFrame após remoção:")
print(colunas_atuais)

# 3. Converter para float64 (DoubleType) as colunas que existirem
colunas_float = [
    "city_fuel_economy", "highway_fuel_economy", "engine_displacement",
    "latitude", "longitude", "mileage", "price"
]
# Verificar quais colunas float existem
colunas_float_existentes = [c for c in colunas_float if c in colunas_atuais]
for nome_coluna in colunas_float_existentes:
    df = df.withColumn(nome_coluna, col(nome_coluna).cast(DoubleType()))

# 4. Converter para int64 (IntegerType) as colunas que existirem
colunas_int = ["horsepower", "savings_amount", "daysonmarket", "owner_count", "seller_rating"]
# Verificar quais colunas int existem
colunas_int_existentes = [c for c in colunas_int if c in colunas_atuais]
for nome_coluna in colunas_int_existentes:
    df = df.withColumn(nome_coluna, col(nome_coluna).cast(IntegerType()))



Actual columns in DataFrame after dropping:
['vin', 'back_legroom', 'bed', 'bed_height', 'bed_length', 'body_type', 'cabin', 'city', 'city_fuel_economy', 'daysonmarket', 'dealer_zip', 'description', 'engine_cylinders', 'engine_displacement', 'engine_type', 'exterior_color', 'fleet', 'frame_damaged', 'franchise_dealer', 'franchise_make', 'front_legroom', 'fuel_tank_volume', 'fuel_type', 'has_accidents', 'height', 'highway_fuel_economy', 'horsepower', 'interior_color', 'isCab', 'is_cpo', 'is_new', 'is_oemcpo', 'latitude', 'length', 'listed_date', 'listing_color', 'longitude', 'main_picture_url', 'major_options', 'make_name', 'maximum_seating', 'mileage', 'model_name', 'owner_count', 'power', 'price', 'salvage', 'savings_amount', 'seller_rating', 'sp_name', 'theft_title', 'torque', 'transmission', 'transmission_display', 'trimId', 'trim_name', 'wheel_system', 'wheel_system_display', 'wheelbase', 'width', 'year']


In [5]:
# Coluna alvo
coluna_alvo = "price"
if coluna_alvo not in colunas_atuais:
    raise ValueError(f"A coluna alvo '{coluna_alvo}' não existe no DataFrame")

# Definir colunas numéricas potenciais com base nos tipos de dados
colunas_numericas_potenciais = [
    "franchise_dealer", "is_new", "city_fuel_economy", "highway_fuel_economy",
    "engine_displacement", "latitude", "longitude", "seller_rating", 
    "mileage", "horsepower", "savings_amount", "daysonmarket", "owner_count"
]

# Selecionar apenas as colunas que existem no DataFrame
colunas_numericas = [c for c in colunas_numericas_potenciais if c in colunas_atuais]
print("\nColunas numéricas que existem no DataFrame:")
print(colunas_numericas)

# Exibir as primeiras linhas para visualizar os dados
print("\nAmostra dos dados:")
df.select(colunas_numericas + [coluna_alvo]).show(5)

# Selecionar apenas as colunas numéricas + coluna alvo
df = df.select(colunas_numericas + [coluna_alvo])

# Verificar valores nulos em cada coluna
print("\nContagem de nulos por coluna:")
for coluna in df.columns:
    qtd_nulos = df.filter(col(coluna).isNull()).count()
    print(f"{coluna}: {qtd_nulos}")



Numerical columns that exist in the DataFrame:
['franchise_dealer', 'is_new', 'city_fuel_economy', 'highway_fuel_economy', 'engine_displacement', 'latitude', 'longitude', 'seller_rating', 'mileage', 'horsepower', 'savings_amount', 'daysonmarket', 'owner_count']

Sample data:
+----------------+------+-----------------+--------------------+-------------------+--------+---------+-------------+-------+----------+--------------+------------+-----------+-------+
|franchise_dealer|is_new|city_fuel_economy|highway_fuel_economy|engine_displacement|latitude|longitude|seller_rating|mileage|horsepower|savings_amount|daysonmarket|owner_count|  price|
+----------------+------+-----------------+--------------------+-------------------+--------+---------+-------------+-------+----------+--------------+------------+-----------+-------+
|            NULL|  NULL|             NULL|                NULL|             1300.0| 18.3988| -66.1582|            2|    7.0|       177|             0|         522|    

In [10]:
# Inicializar o Imputador
imputer = Imputer(
    inputCols=colunas_numericas,
    outputCols=[f"{c}_imputado" for c in colunas_numericas],
    strategy="mean"  # Estratégia: média
)

# Ajustar (fit) o Imputador
modelo_imputador = imputer.fit(df)
df_imputado = modelo_imputador.transform(df)

# Usar colunas imputadas para o assembler
colunas_imputadas = [f"{c}_imputado" for c in colunas_numericas]

# Montar (assemble) as features
assembler = VectorAssembler(
    inputCols=colunas_imputadas, 
    outputCol="features", 
    handleInvalid="keep"  # Manter linhas com valores inválidos
)


In [11]:
# Modelo GBT com parâmetros conservadores
gbt = GBTRegressor(
    featuresCol="features", 
    labelCol=coluna_alvo,
    maxIter=10,           # Reduzido de 20 (padrão)
    maxDepth=5,           # Profundidade padrão
    stepSize=0.1          # Taxa de aprendizado
)

# Pipeline
pipeline = Pipeline(stages=[assembler, gbt])

# Filtrar as linhas com valores nulos na coluna alvo antes de dividir
df_filtrado = df_imputado.filter(col(coluna_alvo).isNotNull())
print(f"\nContagem do DataFrame filtrado (alvo não nulo): {df_filtrado.count()}")



Filtered DataFrame count (non-null target): 1880082


In [12]:
# Divisão entre treino e teste
dados_treino, dados_teste = df_filtrado.randomSplit([0.7, 0.3], seed=42)
print(f"Contagem dos dados de treino: {dados_treino.count()}")
print(f"Contagem dos dados de teste: {dados_teste.count()}")


Training data count: 1315457
Test data count: 564625


In [13]:
# Continuar apenas se tivermos dados
if dados_treino.count() > 0:
    try:
        # Ajustar o modelo
        modelo = pipeline.fit(dados_treino)
        
        # Fazer previsões
        previsoes = modelo.transform(dados_teste)
        
        # Avaliar
        avaliador = RegressionEvaluator(labelCol=coluna_alvo, predictionCol="prediction", metricName="rmse")
        rmse = avaliador.evaluate(previsoes)
        print(f"\nErro Quadrático Médio (RMSE) nos dados de teste = {rmse}")
        
        # Mostrar amostra das previsões
        print("\nAmostra das previsões:")
        previsoes.select("price", "prediction").show(5)
        
        # Importância das features (se disponível)
        try:
            modelo_gbt = modelo.stages[-1]
            importancias_features = modelo_gbt.featureImportances
            print("\nImportância das features:")
            for i, importancia in enumerate(importancias_features):
                print(f"Feature {i}: {importancia}")
        except:
            print("Não foi possível extrair a importância das features")
            
    except Exception as e:
        print(f"Erro durante o ajuste ou avaliação do modelo: {e}")
else:
    print("ERRO: Dados de treino estão vazios. Verifique suas etapas de pré-processamento dos dados.")



Root Mean Squared Error (RMSE) on test data = 832106.9667397768

Sample predictions:
+--------+-----------------+
|   price|       prediction|
+--------+-----------------+
|-73.7742|891.4978329475962|
|    21.0|891.4978329475962|
|-83.2411|891.4978329475962|
|-83.2411|891.4978329475962|
|-83.2411|891.4978329475962|
+--------+-----------------+
only showing top 5 rows


Feature importances:
Feature 0: 0.0
Feature 1: 0.0
Feature 2: 1.229438029930215e-05
Feature 3: 4.1251612344828363e-08
Feature 4: 4.3294729515052967e-07
Feature 5: 0.0001292675463791075
Feature 6: 1.7003674033283123e-05
Feature 7: 0.07448297524794328
Feature 8: 0.8148780423708476
Feature 9: 6.839934110410476e-07
Feature 10: 0.10373828443474532
Feature 11: 0.0010658852267034027
Feature 12: 0.005675088926730042
