# **Cargado de archivos**

In [1]:
# =============================================================================
# Configuración optimizada de la sesión de Spark
# =============================================================================
# Esta configuración es más robusta para conjuntos de datos grandes.
# Debe ejecutarse una vez al inicio del notebook.

from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .appName("BlueBikes-Project") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

# Establecer variables de entorno para que PySpark funcione correctamente en algunos entornos
import sys
# os.environ["PYSPARK_PYTHON"] = sys.executable
# os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

In [2]:
# =============================================================================
# Cargar DataFrame procesado desde Google Drive (para Colab)
# =============================================================================
# Esta celda activa tu Google Drive y lee los datos procesados del archivo Parquet almacenado allí.
# Usarlo en el entorno de Google Colab.

from google.colab import drive

print("Intentando montar Google Drive...")
try:
    drive.mount('/content/drive')

    gdrive_path = "/content/drive/MyDrive/BlueBikes_PRJ/df_final_bluebikes_v1.parquet"
    print(f"Cargando datos desde la ruta de Google Drive: {gdrive_path}")

    # Spark leerá la carpeta Parquet directamente
    df_final = spark.read.parquet(gdrive_path)

    print("✅ DataFrame cargado exitosamente desde Google Drive.")

    # Verify the schema and show a few rows
    print("Esquema de DataFrame:")
    df_final.printSchema()

    print("Muestra de los datos cargados:")
    df_final.show(5, truncate=False)

except Exception as e:
    print(f"❌ Error al cargar datos desde Google Drive. Asegúrate de que el archivo exista en '{gdrive_path}'.")
    print(f"Detalles del error: {e}")

Intentando montar Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Cargando datos desde la ruta de Google Drive: /content/drive/MyDrive/BlueBikes_PRJ/df_final_bluebikes_v1.parquet
✅ DataFrame cargado exitosamente desde Google Drive.
Esquema de DataFrame:
root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lng: double (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lng: double (nullable = true)
 |-- member_casual: string (nullable = true)
 |-- duration_sec: long (nullable = true)
 |-- schema_version: string (nullable = true)
 |-- periodo: string (nullable = true)

Muestra de los datos cargados:
+---------

# **Ingeniería de Características**

In [3]:
import pyspark.sql.functions as F

# Asegura tipos y una columna de fecha base para los flags
df_final = (
    df_final
    .withColumn("start_lat", F.col("start_lat").cast("double"))
    .withColumn("start_lng", F.col("start_lng").cast("double"))
    .withColumn("end_lat",   F.col("end_lat").cast("double"))
    .withColumn("end_lng",   F.col("end_lng").cast("double"))
    .withColumn("started_at", F.to_timestamp("started_at"))
    .withColumn("ended_at",   F.to_timestamp("ended_at"))
)

## Conversión a Zona Horaria de Boston (hora y truncado a hora)

In [4]:
import pyspark.sql.functions as F

# asegurar tipo timestamp
df_final = (df_final
    .withColumn("started_at", F.to_timestamp("started_at"))
    .withColumn("ended_at",   F.to_timestamp("ended_at"))
)

# === A) si tus timestamps están en UTC → conviértelos a Boston ===
df_final = df_final.withColumn(
    "started_at_local",
    F.from_utc_timestamp("started_at", "America/New_York")
)
# === B) si YA están en hora local, usa esta en lugar de A
df_final = (df_final
    .withColumn("ts_hour", F.date_trunc("hour", F.col("started_at_local")))  # inicio de la hora
)

## Ingeniería de características temporales

In [5]:
# Extraeremos varias características basadas en el tiempo de la marca de tiempo 'started_at'.
# Esto ayudará al modelo a capturar patrones relacionados con la hora del día, la semana y el año.

from pyspark.sql.functions import col, year, month, dayofweek, hour, dayofyear, weekofyear, when, sin, cos, radians, date_format
import numpy as np

# Extraer características temporales básicas
df_with_temporal = df_final \
    .withColumn('trip_date', col('started_at_local').cast('date')) \
    .withColumn('trip_year', year(col('started_at_local'))) \
    .withColumn('trip_month', month(col('started_at_local'))) \
    .withColumn('trip_day_of_week', dayofweek(col('started_at_local'))) \
    .withColumn('trip_hour', hour(col('started_at_local'))) \
    .withColumn('is_weekend', when(col('trip_day_of_week').isin([1, 7]), 1).otherwise(0))

# Definir estaciones según los meses
df_with_temporal = df_with_temporal.withColumn(
    'season',
    when(col('trip_month').isin([12, 1, 2]), 'winter')
    .when(col('trip_month').isin([3, 4, 5]), 'spring')
    .when(col('trip_month').isin([6, 7, 8]), 'summer')
    .otherwise('fall')
)

# Codificar características cíclicas (hora y día de la semana) mediante transformaciones de seno y coseno.
# Esto ayuda al modelo a comprender la naturaleza continua del tiempo.
# Ciclo horario (24 horas)
df_with_temporal = df_with_temporal \
    .withColumn('hour_sin', sin(2 * np.pi * col('trip_hour') / 24)) \
    .withColumn('hour_cos', cos(2 * np.pi * col('trip_hour') / 24))

# Ciclo de día de la semana (7 días)
df_with_temporal = df_with_temporal \
    .withColumn('day_of_week_sin', sin(2 * np.pi * col('trip_day_of_week') / 7)) \
    .withColumn('day_of_week_cos', cos(2 * np.pi * col('trip_day_of_week') / 7))

# Actualizar el DataFrame principal
df_final = df_with_temporal

## Ingeniería de características geoespaciales

In [6]:
# Crearemos nuevas funciones basadas en las coordenadas de la estación para capturar patrones geoespaciales.
# La distancia de Haversine es una función clave para predecir la duración del viaje.
# También crearemos funciones relacionadas con la popularidad/densidad de la estación.

from pyspark.sql.functions import udf, lit, count
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, sqrt, atan2

# Distancia de Haversina UDF
def haversine(lon1, lat1, lon2, lat2):
    R = 6371.0 # Radio de la Tierra en km

    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    dlon = lon2 - lon1
    dlat = lat2 - lat1

    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c
    return distance

haversine_udf = udf(haversine, DoubleType())

# Calcular la distancia haversina entre las estaciones inicial y final
df_with_geospatial = df_final.withColumn(
    'haversine_distance_km',
    haversine_udf(
        col('start_lng'), col('start_lat'),
        col('end_lng'), col('end_lat')
    )
)

# Función: Calcular indicadores de "estación popular" según el historial de viajes.
# Definiremos una estación como popular si su total de viajes se encuentra en el 10% superior.
trip_counts_by_station = df_with_geospatial.groupBy('start_station_name').agg(count('*').alias('trip_count'))
percentile_threshold = trip_counts_by_station.approxQuantile('trip_count', [0.90], 0.05)[0]

popular_stations = trip_counts_by_station.filter(col('trip_count') > percentile_threshold) \
    .select('start_station_name').withColumn('is_popular_start', lit(1))

# Une el flag binario de la estación popular al DataFrame principal
df_with_geospatial = df_with_geospatial.join(
    popular_stations,
    on='start_station_name',
    how='left'
).fillna(0, subset=['is_popular_start'])

# Actualizar el DataFrame principal
df_final = df_with_geospatial

## Ingeniería de características de interacción

In [7]:
# Crearemos características de interacción que capturen relaciones entre variables existentes,
# como la velocidad del viaje y si el viaje fue de ida y vuelta.

from pyspark.sql.functions import col, when

# Crear una característica para la velocidad promedio en km/h
# Se añade una pequeña constante (1e-6) al denominador para evitar la división por cero,
df_with_interaction_features = df_final.withColumn(
    "avg_speed_kmh",
    col("haversine_distance_km") / (col("duration_sec") / 3600 + 1e-6)
)

# Crear una característica binaria para viajes de ida y vuelta (misma estación de inicio y fin)
df_with_interaction_features = df_with_interaction_features.withColumn(
    "is_round_trip",
    when(col("start_station_name") == col("end_station_name"), 1).otherwise(0)
)


# Actualizar el DataFrame principal para incluir estas nuevas características
df_final = df_with_interaction_features

## Feriados Massachusetts (USA): is_holiday

In [9]:
import pyspark.sql.functions as F
import datetime as dt

# A) rango de años presentes
mm = df_final.agg(F.min("trip_date").alias("min_d"), F.max("trip_date").alias("max_d")).first()
years = list(range(mm["min_d"].year, mm["max_d"].year + 1))

# B) construir calendario de feriados (Massachusetts)
try:
    import holidays
except ImportError as e:
    raise RuntimeError("Instala el paquete 'holidays' con: pip install holidays") from e

us_ma = holidays.US(subdiv="MA", years=years)  # incluye 'observed'
holiday_dates = sorted(us_ma.keys())

# C) tabla de feriados -> Spark
holiday_df = (
    spark.createDataFrame([(d.isoformat(),) for d in holiday_dates], ["date_str"])
         .select(F.to_date("date_str").alias("trip_date"))
         .withColumn("is_holiday", F.lit(True))
)

# D) unir al hecho por fecha
df_final = (
    df_final
    .join(holiday_df, on="trip_date", how="left")
    .withColumn("is_holiday", F.coalesce("is_holiday", F.lit(False)).cast("integer")) # Cast boolean a entero (0 o 1)
)

## Columnas creadas

In [12]:
# Lista de columnas originales basadas en el esquema
original_cols = [
    'ride_id', 'rideable_type', 'started_at', 'ended_at', 'start_station_name',
    'start_lat', 'start_lng', 'end_station_name', 'end_lat', 'end_lng',
    'member_casual', 'duration_sec', 'schema_version', 'periodo'
]

# Obtener la lista de todas las columnas en el DataFrame final
all_cols = df_final.columns

# Identificar las columnas creadas por la Ing. de Características excluyendo las columnas originales
feature_engineered_cols = [col for col in all_cols if col not in original_cols]

# Conteo del número de columnas agregadas por la Ing. de Características
num_feature_engineered_cols = len(feature_engineered_cols)
print(f"Cantidad de columnas agregadas por la Ing. de Características: {num_feature_engineered_cols}")

# Seleccionar y mostrar solo las columnas creadas por la Ing. de Características
df_final.select(feature_engineered_cols).show(5, truncate=False)

Cantidad de columnas agregadas por la Ing. de Características: 18
+----------+-----------------------+-------------------+---------+----------+----------------+---------+----------+------+----------------------+-------------------+-------------------+--------------------+---------------------+----------------+------------------+-------------+----------+
|trip_date |started_at_local       |ts_hour            |trip_year|trip_month|trip_day_of_week|trip_hour|is_weekend|season|hour_sin              |hour_cos           |day_of_week_sin    |day_of_week_cos     |haversine_distance_km|is_popular_start|avg_speed_kmh     |is_round_trip|is_holiday|
+----------+-----------------------+-------------------+---------+----------+----------------+---------+----------+------+----------------------+-------------------+-------------------+--------------------+---------------------+----------------+------------------+-------------+----------+
|2024-09-29|2024-09-29 05:33:59.648|2024-09-29 05:00:00|2024    

# **Guardado de datos a formato PARQUET**

In [11]:
# Guardar el DataFrame limpio en un archivo Parquet es un paso crucial.
# Conserva el trabajo, lo que  permite comenzar la fase de Ingeniería de Características y
# Modelado en un cuaderno aparte sin tener que volver a ejecutar todo el proceso.
# Esto ahorra mucho tiempo y recursos computacionales.

from pyspark.sql.functions import col

# --- Definir ruta de Google Drive ---

# Asegurar de crear una carpeta llamada 'BlueBikes_PRJ' en el Drive.
gdrive_path = "/content/drive/MyDrive/BlueBikes_PRJ/df_final_bluebikes_v2.parquet"

# Esta parte del código solo se ejecutará si detecta que está en un entorno de Google Colab.
try:
    from google.colab import drive

    print("\nIntentando montar Google Drive...")
    drive.mount('/content/drive')

    print(f"Intentando guardar DataFrame en Google Drive: {gdrive_path}")

    # Al escribir en un sistema de archivos distribuido como Google Drive,
    # suele ser mejor dejar que Spark administre las particiones.
    # No usaremos coalesce(1) aquí para mantener el paralelismo.
    df_final.write.mode("overwrite").parquet(gdrive_path)

    print(f"✅ DataFrame se guardó correctamente en Google Drive.")

except ImportError:
    # Esto se activará si no se encuentra 'google.colab',
    # lo que significa que probablemente se tiene un entorno local.
    print("\nOmitir el guardado de Google Drive: no se ejecuta en un entorno de Google Colab.")
except Exception as e:
    print(f"❌ Se produjo un error al guardar en Google Drive: {e}")


Intentando montar Google Drive...
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Intentando guardar DataFrame en Google Drive: /content/drive/MyDrive/BlueBikes_PRJ/df_final_bluebikes_v2.parquet
✅ DataFrame se guardó correctamente en Google Drive.
