###Bronze tables

In [0]:
bronze_payments = spark.read.format("csv").option("inferSchema","true").option("sep",",").load("/FileStore/tables/payments.csv")
bronze_riders = spark.read.format("csv").option("inferSchema","true").option("sep",",").load("/FileStore/tables/riders.csv")
bronze_stations = spark.read.format("csv").option("inferSchema","true").option("sep",",").load("/FileStore/tables/stations.csv")
bronze_trips = spark.read.format("csv").option("inferSchema","true").option("sep",",").load("/FileStore/tables/trips.csv")

In [0]:
bronze_payments.write.format("delta").mode("overwrite").save("/delta/bronze_payments")
bronze_riders.write.format("delta").mode("overwrite").save("/delta/bronze_riders")
bronze_stations.write.format("delta").mode("overwrite").save("/delta/bronze_stations")
bronze_trips.write.format("delta").mode("overwrite").save("/delta/bronze_trips")

###Silver tables

In [0]:
spark.sql("DROP TABLE IF EXISTS silver_payments")
spark.sql("DROP TABLE IF EXISTS silver_riders")
spark.sql("DROP TABLE IF EXISTS silver_stations")
spark.sql("DROP TABLE IF EXISTS silver_trips")

silver_payments = bronze_payments.withColumnRenamed("_c0", "payment_id") \
    .withColumnRenamed("_c1", "date") \
    .withColumnRenamed("_c2", "amount") \
    .withColumnRenamed("_c3", "rider_id")

silver_riders = bronze_riders.withColumnRenamed("_c0", "rider_id") \
    .withColumnRenamed("_c1", "first") \
    .withColumnRenamed("_c2", "last") \
    .withColumnRenamed("_c3", "address") \
    .withColumnRenamed("_c4", "birthday") \
    .withColumnRenamed("_c5", "account_start_date") \
    .withColumnRenamed("_c6", "account_end_date") \
    .withColumnRenamed("_c7", "is_member")

silver_stations = bronze_stations.withColumnRenamed("_c0", "station_id") \
    .withColumnRenamed("_c1", "name") \
    .withColumnRenamed("_c2", "latitude") \
    .withColumnRenamed("_c3", "longitude")
    
silver_trips = bronze_trips.withColumnRenamed("_c0", "trip_id") \
    .withColumnRenamed("_c1", "rideable_type") \
    .withColumnRenamed("_c2", "start_at") \
    .withColumnRenamed("_c3", "ended_at") \
    .withColumnRenamed("_c4", "start_station_id") \
    .withColumnRenamed("_c5", "end_station_id") \
    .withColumnRenamed("_c6", "rider_id")

In [0]:
silver_payments.write.format("delta").mode("overwrite").saveAsTable("silver_payments")
silver_riders.write.format("delta").mode("overwrite").saveAsTable("silver_riders")
silver_stations.write.format("delta").mode("overwrite").saveAsTable("silver_stations")
silver_trips.write.format("delta").mode("overwrite").saveAsTable("silver_trips")

###Gold tables

In [0]:
dim_rider = spark.sql("""
    SELECT rider_id AS rider_key,
           first,
           last,
           address,
           account_start_date,
           account_end_date,
           is_member
    FROM silver_riders
""")

dim_rider.write.format("delta").mode("overwrite").saveAsTable("dim_rider")



In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth, weekofyear, expr, when, dayofweek
from pyspark.sql.types import IntegerType

dim_date = (
    silver_payments.select(
        col("date").cast("date").alias("date"),
        year(col("date")).alias("year"),
        expr("(CAST(month(date) AS INT) - 1) DIV 3 + 1").alias("quarter"),
        month(col("date")).alias("month"),
        dayofmonth(col("date")).alias("day"),
        weekofyear(col("date")).alias("week"),
        when(dayofweek(col("date")).isin([1, 7]), 1).otherwise(0).alias("is_weekend")
    )
    .distinct()
)

dim_date.write.format("delta").mode("overwrite").saveAsTable("dim_date")


In [0]:

dim_station = spark.sql("""
    SELECT
        station_id AS station_key,
        name AS station_name,
        latitude,
        longitude
    FROM silver_stations
""")

dim_station.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_station")

In [0]:
fact_payment = spark.sql("""
    SELECT
       payment_id AS payment_key,
       CAST(date_format(date, 'yyyyMMdd') AS INT) AS date_key,
       rider_id AS rider_key,
       amount
    FROM silver_payments
""")

fact_payment.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")



In [0]:
fact_trip = spark.sql("""
    SELECT t.trip_id AS trip_key,
           t.rider_id AS rider_key,
           CAST(FROM_UNIXTIME(UNIX_TIMESTAMP(p.date, 'yyyy-MM-dd'), 'yyyyMMdd') AS INT) AS date_key,
           start_station.station_id AS start_station_key,
           end_station.station_id AS end_station_key,
           DATEDIFF(t.ended_at, t.start_at) AS trip_duration,
           (DATEDIFF(year, r.birthday, t.start_at) - 
            (CASE WHEN MONTH(r.birthday) > MONTH(t.start_at)
                  OR (MONTH(r.birthday) = MONTH(t.start_at) AND DAY(r.birthday) > DAY(t.start_at))
                  THEN 1
                  ELSE 0
             END)) AS rider_age
    FROM silver_trips t
    JOIN silver_riders r ON r.rider_id = t.rider_id
    JOIN silver_payments p ON t.rider_id = p.rider_id
    LEFT JOIN silver_stations start_station ON t.start_station_id = start_station.station_id
    LEFT JOIN silver_stations end_station ON t.end_station_id = end_station.station_id
""")


fact_trip.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_trip")
