## Transform data into Bikeshare star schema

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
from pyspark.sql.types import StringType
import pyspark.sql.functions as f


### Fact_payment

In [None]:
payments = spark.table("default.staging_payments")

In [None]:
display(payments.limit(10))

In [None]:
# Write data to fact_payments
spark.sql("drop table if exists default.fact_payments")
payments.dropDuplicates(["payment_id"]).write.format("delta").mode("overwrite").saveAsTable("default.fact_payments")

### Dim_rider

In [None]:
riders = spark.table("default.staging_riders")

In [None]:
display(riders.limit(10))

In [None]:
# Write data to dim_rider
spark.sql("drop table if exists default.dim_rider")
riders.dropDuplicates(["rider_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_rider")

### Dim_station

In [None]:
stations = spark.table("default.staging_stations")


In [None]:
display(stations.limit(10))

In [None]:
# Write data to dim_station
spark.sql("drop table if exists default.dim_station")
stations.dropDuplicates(["station_id"]).write.format("delta").mode("overwrite").saveAsTable("default.dim_station")

### Fact_trip

In [None]:
trips = spark.table("default.staging_trips")


In [None]:
display(trips.limit(10))

In [None]:
fact_trips = trips.join(riders, trips.rider_id == riders.rider_id,"inner") \
            .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
            .withColumn('rider_age', round(datediff( to_date("start_at"), to_date("birthday"))/365.25)) \
            .select("trip_id", riders.rider_id, "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", "duration", "rider_age")
            
display(fact_trips.limit(10))

In [None]:
# Write data to fact_trip
spark.sql("drop table if exists default.fact_trip")
fact_trips.write.format("delta").mode("overwrite").saveAsTable("default.fact_trip")

### Dim_date

In [None]:
# Get min date from trips
min_date = trips.selectExpr('MIN(start_at) as started_at').first().asDict()['started_at']
max_date = trips.selectExpr('MAX(ended_at) as ended_at').first().asDict() ['ended_at']

expression = f"sequence(to_date('{min_date}'), to_date('{max_date}'), interval 1 day)"
dim_date = spark.createDataFrame([(1,)], ["date_id"])

dim_date = dim_date.withColumn("dateinit", f.explode(f.expr(expression)))
dim_date = dim_date.withColumn("date", f.to_timestamp(dim_date.dateinit, "yyyy-MM-dd"))

dim_date = dim_date \
            .withColumn("day", f.dayofmonth(dim_date.date)) \
            .withColumn("month", f.month(dim_date.date)) \
            .withColumn("quarter", f.quarter(dim_date.date)) \
            .withColumn("year", f.year(dim_date.date)) \
            .withColumn("day_of_year", f.dayofyear(dim_date.date)) \
            .withColumn("day_of_week", f.dayofweek(dim_date.date)) \
            .withColumn("date_id", dim_date.date.cast(StringType())) \
            .drop(f.col("dateinit"))

display(dim_date.limit(10))

In [None]:
# Write data to dim_time
spark.sql("drop table if exists default.dim_time")
dim_date.write.format("delta").mode("overwrite").saveAsTable("default.dim_date")