## Extract

In [0]:
tables = [
    {'name': 'payments',
     'columns': ['payment_id', 'date', 'amount', 'rider_id']
    },
    {'name': 'riders',
     'columns': ['rider_id', 'first', 'last', 'address', 'birthday', 'account_start_date', 'account_end_date', 'is_member']
    },
    {'name': 'stations',
     'columns': ['station_id', 'name', 'latitude', 'longitude']
    },
    {'name': 'trips',
     'columns': ['trip_id', 'rideable_type', 'started_at', 'ended_at', 'start_station_id', 'end_station_id', 'rider_id']
    }]
for table in tables:
    df = spark.read.format("csv") \
        .option("inferSchema", "false") \
        .option("header", "false") \
        .option("sep", ",") \
        .load("/FileStore/dlaz/{}.csv".format(table['name'])) \
        .toDF(*table['columns'])
    display(df)
    df.write.format("delta") \
        .mode("overwrite") \
        .save("/delta/{}".format(table['name']))


## Load

In [0]:
tables = ['payments', 'riders', 'stations', 'trips']
for table in tables:
    spark.sql("CREATE TABLE {} USING DELTA LOCATION '/delta/{}'".format(table, table))

## Transform

### 1. Dimension and fact tables

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, DateType, DoubleType, StringType, BooleanType, FloatType
from pyspark.sql import functions as F

fact_payments_df = spark.read.format("delta").load("/delta/payments").alias('fact_payments')
fact_payments_df = fact_payments_df \
    .withColumn('payment_id', F.col('payment_id').cast(IntegerType())) \
    .withColumn('rider_id', F.col('rider_id').cast(IntegerType())) \
    .withColumn('date', F.col('date').cast(DateType())) \
    .withColumn('amount', F.col('amount').cast(DoubleType()))
display(fact_payments_df)
fact_payments_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/{}".format('fact_payments'))

riders_df = spark.read.format("delta").load("/delta/riders").alias('riders')
riders_df = riders_df \
    .withColumn('rider_id', F.col('rider_id').cast(IntegerType())) \
    .withColumn('birthday', F.col('birthday').cast(DateType())) \
    .withColumn('account_start_date', F.col('account_start_date').cast(DateType())) \
    .withColumn('account_end_date', F.col('account_end_date').cast(DateType())) \
    .withColumn('is_member', F.col('is_member').cast(BooleanType())) \
    .withColumn('age_on_account_start',
                F.floor(F.datediff(F.col('account_start_date'),
                                   F.col('birthday')) / 365.25).cast(IntegerType()))
display(riders_df)
riders_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/{}".format('riders'))

fact_trips_df = spark.read.format("delta").load("/delta/trips").alias('fact_trips')
fact_trips_df = fact_trips_df \
    .withColumn('rider_id', F.col('rider_id').cast(IntegerType())) \
    .withColumn('started_at', F.col('started_at').cast(TimestampType())) \
    .withColumn('ended_at', F.col('ended_at').cast(TimestampType())) \
    .join(riders_df, on=['rider_id']).select('fact_trips.*', 'rider_id', 'started_at', 'ended_at', 'birthday') \
    .withColumn('rider_age_on_started_at',
                F.floor(F.datediff(F.col('started_at'),
                                   F.col('birthday')) / 365.25).cast(IntegerType())) \
    .withColumn('duration_seconds',
                F.unix_timestamp(F.col('ended_at')) - F.unix_timestamp(F.col('started_at')))
display(fact_trips_df)
fact_trips_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/{}".format('fact_trips'))

stations_df = spark.read.format("delta").load("/delta/stations").alias("stations") \
    .withColumn('latitude', F.col('latitude').cast(FloatType())) \
    .withColumn('longitude', F.col('longitude').cast(FloatType()))
display(stations_df)
stations_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/{}".format('stations'))


payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member,age_on_account_start
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True,30
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True,43
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True,23
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False,20
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True,50
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False,45
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True,16
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False,28
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True,34
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True,39


trip_id,rideable_type,start_station_id,end_station_id,rider_id,started_at,ended_at,birthday,rider_age_on_started_at,duration_seconds
222BB8E5059252D7,classic_bike,KA1503000064,13021,34062,2021-06-13T09:48:47.000+0000,2021-06-13T10:07:23.000+0000,1991-01-26,30,1116
1826E16CB5486018,classic_bike,TA1306000010,13021,5342,2021-06-21T22:59:13.000+0000,2021-06-21T23:04:29.000+0000,1995-06-07,26,316
3D9B6A0A5330B04D,classic_bike,TA1305000030,13021,3714,2021-06-18T16:06:42.000+0000,2021-06-18T16:12:02.000+0000,1995-04-30,26,320
07E82F5E9C9E490F,classic_bike,TA1305000034,13021,18793,2021-06-17T16:46:23.000+0000,2021-06-17T17:02:45.000+0000,2002-09-11,18,982
A8E94BAECBF0C2DD,docked_bike,TA1308000009,TA1308000009,43342,2021-06-13T17:36:29.000+0000,2021-06-13T18:30:39.000+0000,1993-03-27,28,3250
378F4AB323AA1D14,docked_bike,TA1308000009,TA1308000009,6693,2021-06-13T13:20:10.000+0000,2021-06-13T14:06:14.000+0000,1992-07-26,28,2764
38AD311DC2EB1FBE,docked_bike,KA1503000019,KA1503000019,71480,2021-06-16T17:14:30.000+0000,2021-06-16T17:28:34.000+0000,1965-05-27,56,844
1D466737F0B18097,docked_bike,TA1308000009,TA1308000009,50846,2021-06-27T14:51:52.000+0000,2021-06-27T15:26:39.000+0000,1980-12-23,40,2087
27E1142E1ACFAEFB,electric_bike,13257,13257,18951,2021-06-21T13:58:26.000+0000,2021-06-21T13:58:53.000+0000,1999-08-07,21,27
67F2A115DAE77924,classic_bike,TA1308000009,TA1308000009,63987,2021-06-22T00:51:43.000+0000,2021-06-22T01:08:25.000+0000,1984-06-21,37,1002


station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.0127,-87.66606
KA1503000012,Clark St & Lake St,41.885796,-87.6311
637,Wood St & Chicago Ave,41.895634,-87.672066
13216,State St & 33rd St,41.834732,-87.625824
18003,Fairbanks St & Superior St,41.89581,-87.620255
KP1705001026,LaSalle Dr & Huron St,41.89488,-87.632324
13253,Lincoln Ave & Waveland Ave,41.948795,-87.67528
KA1503000044,Rush St & Hubbard St,41.890175,-87.62618
KA1504000140,Winchester Ave & Elston Ave,41.924038,-87.676414
TA1305000032,Clinton St & Madison St,41.88224,-87.64107


### 2. Date dimension

Create a table that contains the range of dates between the earliest and latest date + 30 years.

In [0]:
from datetime import datetime, timedelta
num_years = 30

earliest_trip_started_at = fact_trips_df.agg({'started_at': 'min'}).collect()[0][0]
earliest_payment_time = datetime.combine(fact_payments_df.agg({'date': 'min'}).collect()[0][0], datetime.min.time())
earliest_account_start_time = datetime.combine(riders_df.agg({'account_start_date': 'min'}).collect()[0][0], datetime.min.time())
earliest_time = min([earliest_trip_started_at, earliest_payment_time, earliest_account_start_time])
# print("earliest trip ended at:", earliest_trip_started_at)
# print("earliest payment time:", earliest_payment_time)
# print("earliest account end time:", earliest_account_start_time)
print("earliest time:", earliest_time)

latest_trip_ended_at = fact_trips_df.agg({'ended_at': 'max'}).collect()[0][0]
latest_payment_time = datetime.combine(fact_payments_df.agg({'date': 'max'}).collect()[0][0], datetime.min.time())
latest_account_end_time = datetime.combine(riders_df.agg({'account_end_date': 'max'}).collect()[0][0], datetime.min.time())
latest_time = max([latest_trip_ended_at, latest_payment_time, latest_account_end_time])
future_time = latest_time + timedelta(days=365*num_years)
# print("latest trip ended at:", latest_trip_ended_at)
# print("latest payment time:", latest_payment_time)
# print("latest account end time:", latest_account_end_time)
# print("latest time:", latest_time)
print("future time:", future_time)


expr = "sequence(to_timestamp('{}'), to_timestamp('{}'), interval 1 hour)".format(
    earliest_time, future_time)

dates_df = spark.createDataFrame([(1,)], ['id']) \
    .withColumn(
        "datetime",
        F.explode(F.expr(expr))
    ).select('datetime') \
    .withColumn('day_of_week', F.dayofweek(F.col('datetime'))) \
    .withColumn('hour_of_day', F.date_format(F.col('datetime'), "H")) \
    .withColumn('year', F.year(F.col('datetime'))) \
    .withColumn('month', F.month(F.col('datetime'))) \
    .withColumn('day', F.date_format(F.col('datetime'), "d")) \
    .withColumn('quarter', F.quarter(F.col('datetime')))
    
display(dates_df)
stations_df.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/gold/{}".format('stations'))

datetime,day_of_week,hour_of_day,year,month,day,quarter
2013-01-31T00:00:00.000+0000,5,0,2013,1,31,1
2013-01-31T01:00:00.000+0000,5,1,2013,1,31,1
2013-01-31T02:00:00.000+0000,5,2,2013,1,31,1
2013-01-31T03:00:00.000+0000,5,3,2013,1,31,1
2013-01-31T04:00:00.000+0000,5,4,2013,1,31,1
2013-01-31T05:00:00.000+0000,5,5,2013,1,31,1
2013-01-31T06:00:00.000+0000,5,6,2013,1,31,1
2013-01-31T07:00:00.000+0000,5,7,2013,1,31,1
2013-01-31T08:00:00.000+0000,5,8,2013,1,31,1
2013-01-31T09:00:00.000+0000,5,9,2013,1,31,1
