In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
import utils as utils
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from xgboost.spark import SparkXGBClassifier
from pyspark.ml import Pipeline
from builtins import min as python_min
import mlflow
import mlflow.spark

In [2]:
mlflow.set_experiment("visitas-ciudad-model")

2025/08/27 18:53:19 INFO mlflow.tracking.fluent: Experiment with name 'visitas-ciudad-model' does not exist. Creating a new experiment.


<Experiment: artifact_location='file:///d:/Quim/Documents/quim%20documents/Master/TFM/TravelMind/ml_models/mlruns/279040930689046531', creation_time=1756313599561, experiment_id='279040930689046531', last_update_time=1756313599561, lifecycle_stage='active', name='visitas-ciudad-model', tags={}>

In [3]:
spark = utils.create_context()

In [None]:
# Load weather data
#print("  Loading weather data...")
#df_weather = utils.read_iceberg_table( spark, "trusted", "aemetTrustedDiario")
#
#weather_count = df_weather.count()
#print(f"    Weather records: {weather_count}")

# Load hotel occupancy data
print("=== Loading source tables ===")

# ------------------------------
# Hoteles
print("  Loading hotel occupancy data...")
df_hotels = utils.read_iceberg_table(
    spark=spark, 
    db_name="exploitation", 
    table_name="f_ocupacion_barcelona"
)
print(f"    Hotel records: {df_hotels.count()}")

# ------------------------------
# Apartamentos turísticos
print("  Loading apartamentos data...")
df_apartamentos = utils.read_iceberg_table(
    spark=spark,
    db_name="landing",
    table_name="apartamentos_turisticos"
)
print(f"    Apartamentos records: {df_apartamentos.count()}")

# ------------------------------
# Actividades de ocio
print("  Loading actividades ocio data...")
df_ocio = utils.read_iceberg_table(
    spark=spark,
    db_name="landing",
    table_name="actividades_ocio"
)
print(f"    Ocio records: {df_ocio.count()}")

# ------------------------------
# Calidad del aire
print("  Loading calidad aire data...")
df_calidad = utils.read_iceberg_table(
    spark=spark,
    db_name="landing",
    table_name="calidad_aire"
)
print(f"    Calidad aire records: {df_calidad.count()}")

# ------------------------------
# Tráfico semanal
print("  Loading trafico semanal data...")
df_trafico = utils.read_iceberg_table(
    spark=spark,
    db_name="landing",
    table_name="trafico_semana"
)
print(f"    Trafico records: {df_trafico.count()}")

print("✅ Data loaded successfully")

  Loading hotel occupancy data...
    Hotel records: 265
✅ Data loaded successfully


In [None]:
def create_hotel_features(df_hotels):
        """Create hotel-based features."""
        print("  Creating hotel features...")
        
        df_hotel_features = df_hotels.groupBy(
            col('año'),
            col('mes')
        ).agg(
            sum('viajeros').alias('hotel_viajeros'),
            sum('pernoctaciones').alias('hotel_pernoctaciones'),
            avg('estanciaMedia').alias('hotel_estancia_media'),
            avg('gradoOcupaPlazas').alias('avg_ocupacion')
        ).withColumn(
            # Hotel availability score
            'hotel_availability_score',
            100 - col('avg_ocupacion')
        )
        
        return df_hotel_features

def create_apartment_features(df_apartments):
    """Create apartment-based features."""
    print("  Creating apartment features...")
    
    df_apartment_features = df_apartments.groupBy(
        col("AÑO"),
        col("MES")
    ).agg(
        sum("VIAJEROS").alias("apt_viajeros"),
        sum("PERNOCTACIONES").alias("apt_pernoctaciones"),
        avg("ESTANCIA_MEDIA").alias("apt_estancia_media"),
        avg("GRADO_OCUPA_PLAZAS").alias("avg_ocupa_plazas"),
        avg("GRADO_OCUPA_APART").alias("avg_ocupa_apart"),
        avg("GRADO_OCUPA_APART_FIN_SEMANA").alias("avg_ocupa_apart_weekend"),
        sum("APARTAMENTOS_ESTIMADOS").alias("apt_estimados"),
        sum("PLAZAS_ESTIMADAS").alias("plazas_estimadas"),
        sum("PERSONAL_EMPLEADO").alias("apt_personal_empleado")
    ).withColumn(
        # Apartment availability score (basado en ocupación de plazas)
        "apt_availability_score",
        100 - col("avg_ocupa_plazas")
    )
    
    return df_apartment_features

def create_leisure_features(df_leisure):
    """Create leisure-based features from actividades_ocio table."""
    print("  Creating leisure features...")

    df_leisure_features = df_leisure.groupBy(
        col("AÑO"),
        col("MES")
    ).agg(
        sum("ENTRADAS").alias("ocio_total_entradas"),
        sum("VISITAS_PAGINAS").alias("ocio_total_visitas_paginas"),
        sum("GASTO_TOTAL").alias("ocio_gasto_total"),
        avg("PRECIO_MEDIO_ENTRADA").alias("ocio_precio_medio_entrada"),
        sum("TRANSACCIONES").alias("ocio_total_transacciones")
    ).withColumn(
        # Engagement score: visitas / transacciones (más alto = más interés online por compra)
        "ocio_engagement_score",
        (col("ocio_total_visitas_paginas") / (col("ocio_total_transacciones") + 1))
    ).withColumn(
        # Gasto medio por entrada
        "ocio_gasto_medio_por_entrada",
        (col("ocio_gasto_total") / (col("ocio_total_entradas") + 1))
    )

    return df_leisure_features

def create_air_quality_features(df_air):
    """Create air quality features from calidad_aire table."""
    print("  Creating air quality features...")

    df_air_features = df_air.groupBy(
        col("AÑO"),
        col("MES")
    ).agg(
        # Porcentaje medio del mes de calidad de aire buena
        avg(when(col("CALIDAD_AIRE") == "Buena", col("PORCENTAJE_CALIDAD_AIRE"))).alias("aire_pct_buena"),
        # Porcentaje medio del mes de calidad de aire aceptable
        avg(when(col("CALIDAD_AIRE") == "Aceptable", col("PORCENTAJE_CALIDAD_AIRE"))).alias("aire_pct_aceptable"),
        # Porcentaje medio del mes de calidad de aire mala
        avg(when(col("CALIDAD_AIRE") == "Mala", col("PORCENTAJE_CALIDAD_AIRE"))).alias("aire_pct_mala"),
        # Número de estaciones monitorizadas
        countDistinct("ESTACION").alias("aire_num_estaciones")
    ).withColumn(
        # Índice simplificado: pondera calidad (buena=2, aceptable=1, mala=0)
        "aire_quality_index",
        (
            col("aire_pct_buena") * 2 +
            col("aire_pct_aceptable") * 1 +
            col("aire_pct_mala") * 0
        ) / (col("aire_pct_buena") + col("aire_pct_aceptable") + col("aire_pct_mala") + 1e-6)
    )

    return df_air_features

def create_trafico_features(df_trafico):
    """Create traffic features from trafico_semana table."""
    print("  Creating traffic features...")

    df_trafico_features = df_trafico.groupBy(
        col("AÑO"),
        col("MES")
    ).agg(
        # Intensidad media de vehículos ligeros en el mes
        avg("IMD_VEHICULO_LIGERO").alias("trafico_imd_ligeros"),
        # Intensidad media de vehículos pesados en el mes
        avg("IMD_VEHICULO_PESADO").alias("trafico_imd_pesados"),
        # Intensidad media total
        avg("IMD_VEHICULO_TOTAL").alias("trafico_imd_total"),
        # Total mensual de vehículos (ligeros + pesados)
        sum("IMD_VEHICULO_TOTAL").alias("trafico_total_mes"),
        # Número de estaciones activas
        countDistinct("ESTACION").alias("trafico_num_estaciones")
    ).withColumn(
        # Ratio de pesados sobre total (indicador económico-logístico)
        "trafico_pct_pesados",
        col("trafico_imd_pesados") / (col("trafico_imd_total") + 1e-6)
    )

    return df_trafico_features

In [None]:
#2. Generar features a partir de df_hotels
print("=== Generating features ===")

# Llamar a las funciones de features
df_hotels_features       = create_hotel_features(df_hotels)
df_apartamentos_features = create_apartamentos_features(df_apartamentos)
df_ocio_features         = create_ocio_features(df_ocio)
df_calidad_features      = create_calidad_aire_features(df_calidad)
df_trafico_features      = create_trafico_features(df_trafico)

print("=== Joining all feature sets ===")

# Unir por AÑO y MES (base temporal común)
df_features = df_hotels_features \
    .join(df_apartamentos_features, ["AÑO", "MES"], "outer") \
    .join(df_ocio_features, ["AÑO", "MES"], "outer") \
    .join(df_calidad_features, ["AÑO", "MES"], "outer") \
    .join(df_trafico_features, ["AÑO", "MES"], "outer")

print(f"Final feature dataset has {df_features.count()} rows")

# 🚨 Aquí necesitas una etiqueta (label).
# Ejemplo: 1 si es "buen momento", 0 si no. 
# Esto normalmente lo defines con criterios propios.
# Supongamos que un buen momento es cuando hotel_availability_score > 40
df_labeled = df_hotel_features.withColumn(
    "label", (col("hotel_availability_score") > 40).cast("int")
)

# 3. Seleccionar las features para el modelo
feature_cols = [
    "hotel_viajeros",
    "hotel_pernoctaciones",
    "hotel_estancia_media",
    "avg_ocupacion",
    "hotel_availability_score"
]

# VectorAssembler para convertir a vector de features
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)
# Contar positivos (label=1) y negativos (label=0)
counts = df_labeled.groupBy("label").count().collect()

# Inicializamos variables
num_positivos = 0
num_negativos = 0

for row in counts:
    if row['label'] == 1:
        num_positivos = row['count']
    else:
        num_negativos = row['count']

# Ratio para scale_pos_weight
ratio_negativos_sobre_positivos = num_negativos / num_positivos
# 4. Definir modelo XGBoost
xgb = SparkXGBClassifier(
    features_col="features",       # <- snake_case
    label_col="label",
    prediction_col="prediction",
    probability_col="probability",
    num_round=50,
    max_depth=5,
    eta=0.1,
    scale_pos_weight=ratio_negativos_sobre_positivos
)

# 5. Construir pipeline
pipeline = Pipeline(stages=[assembler, xgb])

# 6. Entrenar modelo
train_df, test_df = df_labeled.randomSplit([0.8, 0.2], seed=42)
#model = pipeline.fit(train)
#
## 7. Evaluar modelo
#predictions = model.transform(test)
#predictions.select("año", "mes", "label", "probability", "prediction").show(10, truncate=False)

with mlflow.start_run() as run:
    # Entrenar
    model = pipeline.fit(train_df)
    preds = model.transform(test_df)

    # Evaluación
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy"
    )
    acc = evaluator.evaluate(preds)

    # Log de parámetros, métricas y modelo
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("eta", 0.1)
    mlflow.log_param("num_round", 50)
    mlflow.log_param("scale_pos_weight", ratio_negativos_sobre_positivos)
    mlflow.log_metric("accuracy", acc)

    # Loggear modelo Spark
    mlflow.spark.log_model(model, "spark_xgb_model")

    print("Modelo registrado en MLflow con run_id:", run.info.run_id)

  Creating hotel features...


2025-08-27 18:53:42,890 INFO XGBoost-PySpark: _fit Running xgboost-3.0.4 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 5, 'scale_pos_weight': 0.9506172839506173, 'num_round': 50, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2025-08-27 18:53:51,277 INFO XGBoost-PySpark: _fit Finished xgboost training!


Modelo registrado en MLflow con run_id: 467a75a48f5a4cf48f7b4627a9daf6df


In [None]:
from pyspark.sql import Row
from pyspark.sql.functions import col

# Supongamos que queremos predecir para Barcelona, 20/09/2025
ciudad = "Barcelona"
fecha = "2025-09-20"
year, month, day = map(int, fecha.split("-"))

# Generar features para esa fecha
# En tu caso, solo agregados históricos por mes y año
# Aquí podemos usar medias históricas del mes de septiembre
df_month_avg = df_hotel_features.filter(col("mes") == month).agg(
    avg("hotel_viajeros").alias("hotel_viajeros"),
    avg("hotel_pernoctaciones").alias("hotel_pernoctaciones"),
    avg("hotel_estancia_media").alias("hotel_estancia_media"),
    avg("avg_ocupacion").alias("avg_ocupacion"),
    avg("hotel_availability_score").alias("hotel_availability_score")
).collect()[0]

# Crear fila para predicción
future_row = Row(
    año=year,
    mes=month,
    hotel_viajeros=df_month_avg["hotel_viajeros"],
    hotel_pernoctaciones=df_month_avg["hotel_pernoctaciones"],
    hotel_estancia_media=df_month_avg["hotel_estancia_media"],
    avg_ocupacion=df_month_avg["avg_ocupacion"],
    hotel_availability_score=df_month_avg["hotel_availability_score"]
)

# Convertir a DataFrame Spark
future_df = spark.createDataFrame([future_row])

model_uri = f"file:///D:/Quim/Documents/quim documents/Master/TFM/TravelMind/ml_models/mlruns/279040930689046531/467a75a48f5a4cf48f7b4627a9daf6df/artifacts/spark_xgb_model"
loaded_model = mlflow.spark.load_model(model_uri)
# Aplicar pipeline entrenado
prediction = model.transform(future_df)

# Mostrar resultado
prediction.select(
    "año", "mes", "prediction", "probability"
).show(truncate=False)


2025/08/27 19:01:29 INFO mlflow.spark: File 'd:/Quim/Documents/quim documents/Master/TFM/TravelMind/ml_models/mlruns/279040930689046531/467a75a48f5a4cf48f7b4627a9daf6df/artifacts/spark_xgb_model/sparkml' is already on DFS, copy is not necessary.


Py4JJavaError: An error occurred while calling o800.partitions.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:608)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:934)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:848)
	at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:816)
	at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
	at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2199)
	at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2179)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
