
## Overview

This notebook will transform data 

## Transform fact_payment

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType, StringType

In [0]:
# Read data default taging_payments

df_payments = spark.table("taging_payments")

display(df_payments.limit(10))

payment_id,date,amount,rider_id
1,2019-05-01,9.0,1000
2,2019-06-01,9.0,1000
3,2019-07-01,9.0,1000
4,2019-08-01,9.0,1000
5,2019-09-01,9.0,1000
6,2019-10-01,9.0,1000
7,2019-11-01,9.0,1000
8,2019-12-01,9.0,1000
9,2020-01-01,9.0,1000
10,2020-02-01,9.0,1000


In [0]:

#create fact_payment table
df_payments.dropDuplicates(["payment_id"])\
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("fact_payment")
     

## Transform dim_riders

In [0]:
# Read data default taging_riders

df_riders = spark.table("taging_riders")

display(df_riders.limit(10))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True
1005,Christine,Rodriguez,224 Washington Mills Apt. 467,1974-08-27,2020-03-24,,False
1006,Alicia,Taylor,1137 Angela Locks,2004-01-30,2020-11-27,2021-12-01,True
1007,Benjamin,Fernandez,979 Phillips Ways,1988-01-11,2016-12-11,,False
1008,John,Crawford,7691 Evans Court,1987-02-21,2021-03-28,2021-07-01,True
1009,Victoria,Ritter,9922 Jim Crest Apt. 319,1981-02-07,2020-06-12,2021-11-01,True


In [0]:

#create dim_rider table
df_riders.dropDuplicates(["rider_id"])\
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_rider")
     

## Transform dim_station


In [0]:
# Read data default taging_stations

df_stations = spark.table("taging_stations")

display(df_stations.limit(10))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


In [0]:

#create dim_station table
df_stations.dropDuplicates(["station_id"])\
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_station")
     

## Transform fact_trip

In [0]:
# Read data default taging_trips

df_trips = spark.table("taging_trips")

display(df_trips.limit(10))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
F6F309843C09CAAC,classic_bike,2021-09-02 18:36:00,2021-09-02 18:48:43,TA1306000026,TA1307000041,70456
BD496FA19316E89C,classic_bike,2021-09-02 17:23:28,2021-09-02 17:32:43,TA1306000026,TA1309000019,24732
657E606A35206CC1,classic_bike,2021-09-05 23:13:43,2021-09-05 23:30:26,15642,TA1305000041,11875
B10DEEC4A5D90EB3,electric_bike,2021-09-09 18:26:00,2021-09-09 18:40:17,TA1307000001,TA1308000049,33064
268A00298A05078B,classic_bike,2021-09-06 18:49:27,2021-09-06 19:02:22,TA1306000026,TA1308000049,71976
41E9E476004232FD,electric_bike,2021-09-04 21:41:58,2021-09-04 22:03:02,13235,TA1305000041,43348
3F89D8A2BEE07478,classic_bike,2021-09-11 16:10:38,2021-09-11 16:36:36,13235,TA1307000041,1159
0BE315967E011514,electric_bike,2021-09-22 21:53:46,2021-09-22 22:11:58,13235,TA1309000019,75374
AE58DD50675A5E4E,classic_bike,2021-09-05 00:41:37,2021-09-05 00:54:15,TA1306000026,TA1308000049,3290
B435E6AE404F547E,classic_bike,2021-09-11 17:23:49,2021-09-11 17:37:19,TA1306000026,TA1307000041,31610


In [0]:
# create data_trips that take information from df_trips and df_riders
data_trips = df_trips.alias("tr").join(df_riders.alias("ri"), col("tr.rider_id") == col("ri.rider_id"), "left") \
    .withColumn('duration', round((unix_timestamp("ended_at") - unix_timestamp('start_at'))/60)) \
    .withColumn('rider_age', round((unix_timestamp("account_start_date") - unix_timestamp('birthday'))/3600/24)) \
    .select("trip_id", "duration", "rider_age", "rideable_type", "start_station_id", "end_station_id", "start_at", "ended_at", col("tr.rider_id"))
            
display(data_trips.limit(10))

trip_id,duration,rider_age,rideable_type,start_station_id,end_station_id,start_at,ended_at,rider_id
F6F309843C09CAAC,13.0,,classic_bike,TA1306000026,TA1307000041,2021-09-02 18:36:00,2021-09-02 18:48:43,70456
BD496FA19316E89C,9.0,,classic_bike,TA1306000026,TA1309000019,2021-09-02 17:23:28,2021-09-02 17:32:43,24732
657E606A35206CC1,17.0,,classic_bike,15642,TA1305000041,2021-09-05 23:13:43,2021-09-05 23:30:26,11875
B10DEEC4A5D90EB3,14.0,,electric_bike,TA1307000001,TA1308000049,2021-09-09 18:26:00,2021-09-09 18:40:17,33064
268A00298A05078B,13.0,,classic_bike,TA1306000026,TA1308000049,2021-09-06 18:49:27,2021-09-06 19:02:22,71976
41E9E476004232FD,21.0,,electric_bike,13235,TA1305000041,2021-09-04 21:41:58,2021-09-04 22:03:02,43348
3F89D8A2BEE07478,26.0,,classic_bike,13235,TA1307000041,2021-09-11 16:10:38,2021-09-11 16:36:36,1159
0BE315967E011514,18.0,,electric_bike,13235,TA1309000019,2021-09-22 21:53:46,2021-09-22 22:11:58,75374
AE58DD50675A5E4E,13.0,,classic_bike,TA1306000026,TA1308000049,2021-09-05 00:41:37,2021-09-05 00:54:15,3290
B435E6AE404F547E,14.0,,classic_bike,TA1306000026,TA1307000041,2021-09-11 17:23:49,2021-09-11 17:37:19,31610


In [0]:

#create fact_trip table
data_trips.dropDuplicates(["trip_id"])\
    .write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("face_trip")
     

## Transform dim_time

In [0]:


# Get min date from df_trips
date_min = df_trips.selectExpr('MIN(start_at) AS started_at')\
    .first().asDict()['started_at']
# set max date
date_max = df_trips.selectExpr('DATEADD(year, 5, MAX(start_at)) AS started_at')\
    .first().asDict()['started_at']

expression = f"sequence(to_date('{date_min}'), to_date('{date_max}'), interval 1 day)"
data_time = spark.createDataFrame([(1,)], ["time_id"])

data_time = data_time.withColumn("dateinit", explode(expr(expression)))
data_time = data_time.withColumn("date", to_timestamp(data_time.dateinit, "yyyy-MM-dd"))

data_time = data_time \
            .withColumn("day_of_week", dayofweek(data_time.date)) \
            .withColumn("day_of_month", dayofmonth(data_time.date)) \
            .withColumn("year", year(data_time.date)) \
            .withColumn("quarter", quarter(data_time.date)) \
            .withColumn("month", month(data_time.date)) \
            .drop(col("dateinit"))

display(data_time.limit(10))

time_id,date,day_of_week,day_of_month,year,quarter,month
1,2021-02-01T00:00:00Z,2,1,2021,1,2
1,2021-02-02T00:00:00Z,3,2,2021,1,2
1,2021-02-03T00:00:00Z,4,3,2021,1,2
1,2021-02-04T00:00:00Z,5,4,2021,1,2
1,2021-02-05T00:00:00Z,6,5,2021,1,2
1,2021-02-06T00:00:00Z,7,6,2021,1,2
1,2021-02-07T00:00:00Z,1,7,2021,1,2
1,2021-02-08T00:00:00Z,2,8,2021,1,2
1,2021-02-09T00:00:00Z,3,9,2021,1,2
1,2021-02-10T00:00:00Z,4,10,2021,1,2


In [0]:

#create dim_time table
data_time.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("dim_time")
     

end