In [0]:
from pyspark.sql.functions import *

In [0]:
def read_file_csv(filepath):
    return spark.read.format('csv')\
    .option("inferSchema", "false")\
    .option("header", "false")\
    .option("sep", ",")\
    .load(filepath)

In [0]:
riders_filepath = "/FileStore/tables/projectDL/riders.csv"
stations_filepath = "/FileStore/tables/projectDL/stations.csv"
payments_filepath = "/FileStore/tables/projectDL/payments.csv"
trips_filepath = "/FileStore/tables/projectDL/trips.csv"

riders_cols = ["rider_id", "first", "last", "address", "birthday", "account_start_date", "account_end_date", "is_member"]
stations_cols = ["station_id", "name", "latitude", "longitude"]
payments_cols = ["payment_id", "date", "amount", "rider_id"]
trips_cols = ["trip_id", "rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id"]

df_riders = read_file_csv(riders_filepath)
df_riders = df_riders.toDF(*riders_cols)

df_stations = read_file_csv(stations_filepath)
df_stations = df_stations.toDF(*stations_cols)

df_payments = read_file_csv(payments_filepath)
df_payments = df_payments.toDF(*payments_cols)

df_trips = read_file_csv(trips_filepath)
df_trips = df_trips.toDF(*trips_cols)

In [0]:
display(df_trips)

In [0]:
#delete file/folder
#%fs rm -r dbfs:/delta 

In [0]:
df_riders.write.format("delta").mode("overwrite").save("/delta/bronze_riders")
df_stations.write.format("delta").mode("overwrite").save("/delta/bronze_stations")
df_payments.write.format("delta").mode("overwrite").save("/delta/bronze_payments")
df_trips.write.format("delta").mode("overwrite").save("/delta/bronze_trips")

In [0]:
spark.sql("CREATE TABLE silver_riders USING DELTA LOCATION '/delta/bronze_riders'")
spark.sql("CREATE TABLE silver_stations USING DELTA LOCATION '/delta/bronze_stations'")
spark.sql("CREATE TABLE silver_payments USING DELTA LOCATION '/delta/bronze_payments'")
spark.sql("CREATE TABLE silver_trips USING DELTA LOCATION '/delta/bronze_trips'")

Out[10]: DataFrame[]

In [0]:
df_silver_riders = spark.read.table("silver_riders")
df_silver_stations = spark.read.table("silver_stations")
df_silver_payments = spark.read.table("silver_payments")
df_silver_trips = spark.read.table("silver_trips")

In [0]:
#Create DimStation table
df_dim_station = df_silver_stations.withColumn("latitude", col("latitude").cast("float"))\
.withColumn("longitude", col("longitude").cast("float"))

In [0]:
#Create DimRider table
df_dim_rider = df_silver_riders.withColumn("rider_id", col('rider_id').cast("int"))\
                .withColumn("birthday", to_timestamp("birthday"))\
                .withColumn("account_start_date", to_timestamp("account_start_date"))\
                .withColumn("account_end_date", to_timestamp("account_end_date"))\
                .withColumn("is_member", col("is_member").cast("boolean"))\
                .withColumn("rider_account_age", floor(datediff(to_timestamp(col("account_start_date"), "MM-dd-yyyy"), to_timestamp(col("birthday"), "MM-dd-yyyy"))/365.25))
display(df_dim_rider)

In [0]:
#Create FactPayment table
df_fact_payment = df_silver_payments.withColumn("payment_id", col("payment_id").cast("int"))\
                        .withColumn("rider_id", col("rider_id").cast("int"))\
                        .withColumn("date", to_date(col("date"), "MM-dd-yyyy"))\
                        .withColumn("amount", col("amount").cast("float"))
df_fact_payment.printSchema()

root
 |-- payment_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: float (nullable = true)
 |-- rider_id: integer (nullable = true)



In [0]:
#Create FactTrip
df_fact_trip = df_silver_trips.withColumn("start_at", to_timestamp(col("start_at")))\
                    .withColumn("ended_at", to_timestamp(col("ended_at")))\
                    .withColumn("rider_id", col("rider_id").cast("int"))

df_birthday_rider = df_dim_rider.select("rider_id", "birthday").withColumnRenamed("rider_id", "id")
df_fact_trip = df_fact_trip.join(df_birthday_rider, df_fact_trip.rider_id == df_birthday_rider.id, "inner")
df_fact_trip = df_fact_trip.withColumn("rider_trip_age", floor(datediff(to_timestamp(col("start_at"), "MM-dd-yyyy"), to_timestamp(col("birthday"), "MM-dd-yyyy"))/365.25))\
                .withColumn("total_minutes_trip", round((col("ended_at").cast("long") - col('start_at').cast("long"))/60))
df_fact_trip = df_fact_trip.select("trip_id", "rideable_type", "start_at", "ended_at", "start_station_id", "end_station_id", "rider_id", "rider_trip_age", "total_minutes_trip")
display(df_fact_trip)

In [0]:
#Create DimDate table
def generate_series(start, stop, interval):
    start, stop = spark.createDataFrame([(start, stop)], ("start", "stop"))\
    .select([col(c).cast("timestamp").cast("long") for c in ("start", "stop")]).first()
    # Create range with increments and cast to timestamp
    return spark.range(start, stop, interval).select(col("id").cast("timestamp").alias("date"))

In [0]:
max_payment_date = (df_silver_payments.agg(max("date"))).collect()[0][0]
min_payment_date = (df_silver_payments.agg(min("date"))).collect()[0][0]
max_trip_date = (df_silver_trips.agg(max("ended_at"))).collect()[0][0]
min_trip_date = (df_silver_trips.agg(min("ended_at"))).collect()[0][0]

In [0]:
beginDate = '2013-02-01'
endDate = '2022-02-02'

df_dim_date = generate_series(beginDate, endDate, 60 * 60)
df_dim_date = df_dim_date.withColumn('date', to_timestamp('date', 'EEE, MM/dd/yy hh:mm a')) \
    .withColumn('year', year('date')) \
    .withColumn('quater', quarter('date')) \
    .withColumn('month', month('date')) \
    .withColumn('day', dayofmonth('date')) \
    .withColumn('hour', hour('date')) \
    .withColumn('week', weekofyear('date')) \
    .withColumn('dayofweek', dayofweek('date'))
display(df_dim_date)

In [0]:
#Load 
df_fact_trip.write.format("delta").mode("overwrite").saveAsTable("golden_fact_trip")
df_fact_payment.write.format("delta").mode("overwrite").saveAsTable("golden_fact_payment")
df_dim_rider.write.format("delta").mode("overwrite").saveAsTable("golden_dim_rider")
df_dim_station.write.format("delta").mode("overwrite").saveAsTable("golden_dim_station")
df_dim_date.write.format("delta").mode("overwrite").saveAsTable("golden_dim_date")