In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import mlflow
import mlflow.spark

### Crear SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("EntrenarModeloTaxi") \
    .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.30.0") \
    .config("spark.sql.catalogImplementation", "in-memory") \
    .getOrCreate()

### Config MLflow

In [3]:
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment("DuracionViajeNYC")
#mlflow.spark.autolog()  # 🔥 Activa autologging

<Experiment: artifact_location='mlflow-artifacts:/1', creation_time=1745038826046, experiment_id='1', last_update_time=1745038826046, lifecycle_stage='active', name='DuracionViajeNYC', tags={}>

### Leer datos

In [4]:
df = spark.read.parquet("../data/processed/yellow_2019_raw.parquet")

### Indexación de ubicaciones

In [5]:
indexers = [
    StringIndexer(inputCol="PULocationID", outputCol="PULocationIndex", handleInvalid="keep"),
    StringIndexer(inputCol="DOLocationID", outputCol="DOLocationIndex", handleInvalid="keep")
]

### Columnas

In [6]:
features = [
    "trip_distance", "passenger_count", "hora_recogida", "dia_semana",
    "PULocationIndex", "DOLocationIndex"
]

assembler = VectorAssembler(inputCols=features, outputCol="features")

### Modelo con hiperparámetros

In [7]:
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="duracion_min",
    maxDepth=7,
    numTrees=50,
    maxBins=64
)

pipeline = Pipeline(stages=indexers + [assembler, rf])

### Entrenamiento

In [8]:
train, test = df.randomSplit([0.8, 0.2], seed=42)

with mlflow.start_run():
    model = pipeline.fit(train)
    pred = model.transform(test)

    evaluator = RegressionEvaluator(labelCol="duracion_min", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(pred)

    mlflow.log_metric("rmse_manual", rmse)  # Si quieres agregar métricas adicionales

    print(f"✅ Entrenado y logueado automáticamente en MLflow (RMSE: {rmse:.2f})")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 247, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  

ConnectionRefusedError: [Errno 111] Connection refused