In [0]:
from pyspark.sql import functions as F

In [0]:
fact_metro_ml = (spark.table("hive_metastore.tfm.fact_metro"))
dim_metro = (spark.table("hive_metastore.tfm.dim_metro"))
fact_weather_enriched = (spark.table("hive_metastore.tfm.fact_weather_enriched"))
dim_calendar = (spark.table("hive_metastore.tfm.dim_calendar"))

fact_metro_ml.cache()
dim_metro.cache()
fact_weather_enriched.cache()
dim_calendar.cache()

DataFrame[date: date, is_holiday: int, is_pre_holiday: int, is_weekend: int, day_of_week: int, month: int]

In [0]:
display(fact_metro_ml.limit(10))

day,metro_station_id,total_ridership
2023-02-05,502,189.0
2023-02-05,51,5661.0
2023-02-05,52,2576.0
2023-02-05,53,1474.0
2023-02-05,54,3206.0
2023-02-05,55,3762.0
2023-02-05,56,1110.0
2023-02-05,57,650.0
2023-02-05,58,4125.0
2023-02-05,59,2026.0


1. JOIN DIM METRO

In [0]:
fact_metro_ml = (
    fact_metro_ml
    .join(
        dim_metro,
        on = 'metro_station_id',
        how = 'left'
    )
    .select(
        F.col('day'),
        F.col('total_ridership'),
        F.col('weather_station_id')
    )
)

In [0]:
display(fact_metro_ml.limit(10))

day,total_ridership,weather_station_id
2023-02-05,189.0,USW00014734
2023-02-05,5661.0,USW00094789
2023-02-05,2576.0,USW00094789
2023-02-05,1474.0,USW00094789
2023-02-05,3206.0,USW00094789
2023-02-05,3762.0,USW00094789
2023-02-05,1110.0,USW00094789
2023-02-05,650.0,USW00094789
2023-02-05,4125.0,USW00094789
2023-02-05,2026.0,USW00094728


2. Crear columna concatenada para el JOIN con la tabla weather

In [0]:
fact_metro_ml = (
    fact_metro_ml
    .withColumn(
        'weather_station_date',
        F.concat_ws('/', F.col('weather_station_id'), F.col('day'))
    )
)

In [0]:
display(fact_metro_ml.limit(10))

day,total_ridership,weather_station_id,weather_station_date
2023-02-05,189.0,USW00014734,USW00014734/2023-02-05
2023-02-05,5661.0,USW00094789,USW00094789/2023-02-05
2023-02-05,2576.0,USW00094789,USW00094789/2023-02-05
2023-02-05,1474.0,USW00094789,USW00094789/2023-02-05
2023-02-05,3206.0,USW00094789,USW00094789/2023-02-05
2023-02-05,3762.0,USW00094789,USW00094789/2023-02-05
2023-02-05,1110.0,USW00094789,USW00094789/2023-02-05
2023-02-05,650.0,USW00094789,USW00094789/2023-02-05
2023-02-05,4125.0,USW00094789,USW00094789/2023-02-05
2023-02-05,2026.0,USW00094728,USW00094728/2023-02-05


3. JOIN WEATHER

In [0]:
fact_metro_ml = (
    fact_metro_ml
    .join(
        fact_weather_enriched,
        on = 'weather_station_date',
        how = 'left'
    )
    .select(
        F.col('day'),
        F.col('total_ridership'),
        F.col('awnd_bin'),
        F.col('prcp_bin'),
        F.col('snow_bin'),
        F.col('snwd_bin'),
        F.col('tmax_bin'),
        F.col('tmin_bin'),
        F.col('trange_bin')
    )
)

In [0]:
display(fact_metro_ml.limit(10))

day,total_ridership,awnd_bin,prcp_bin,snow_bin,snwd_bin,tmax_bin,tmin_bin,trange_bin
2023-02-05,189.0,cat3,cat0,cat0,cat0,cat2,cat0,cat7
2023-02-05,5661.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,2576.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,1474.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,3206.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,3762.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,1110.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,650.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,4125.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4
2023-02-05,2026.0,cat0,cat0,cat0,cat0,cat0,cat0,cat3


4. JOIN CALENDAR

In [0]:
fact = fact_metro_ml.alias('fact')
calendar = dim_calendar.alias('calendar')

fact_metro_ml = (
    fact
    .join(
        calendar,
        fact['day'] == calendar['date'],
        how='left'
    )
    .select(
        F.col('fact.day').alias('date'),
        F.col('fact.total_ridership'),
        F.col('fact.awnd_bin'),
        F.col('fact.prcp_bin'),
        F.col('fact.snow_bin'),
        F.col('fact.snwd_bin'),
        F.col('fact.tmax_bin'),
        F.col('fact.tmin_bin'),
        F.col('fact.trange_bin'),
        F.col('calendar.is_holiday'),
        F.col('calendar.is_pre_holiday'),
        F.col('calendar.is_weekend'),
        F.col('calendar.day_of_week'),
        F.col('calendar.month')
    )
)

In [0]:
display(fact_metro_ml.limit(10))

date,total_ridership,awnd_bin,prcp_bin,snow_bin,snwd_bin,tmax_bin,tmin_bin,trange_bin,is_holiday,is_pre_holiday,is_weekend,day_of_week,month
2023-02-05,189.0,cat3,cat0,cat0,cat0,cat2,cat0,cat7,0,0,1,1,2
2023-02-05,5661.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,2576.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,1474.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,3206.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,3762.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,1110.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,650.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,4125.0,cat5,cat0,cat0,cat0,cat1,cat0,cat4,0,0,1,1,2
2023-02-05,2026.0,cat0,cat0,cat0,cat0,cat0,cat0,cat3,0,0,1,1,2


5. Guardar tabla 

In [0]:
fact_metro_ml.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("tfm.fact_metro_ml")