# Bronze Stages

## Payment table

In [0]:
payment_df = spark.read.format("csv") \
    .option("inferSchema","true") \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/bikesharing/payments.csv')


payment_df = payment_df.withColumnRenamed("_c0", "payment_id").withColumnRenamed("_c1", "date").withColumnRenamed("_c2", "amount").withColumnRenamed("_c3", "rider_id")

from pyspark.sql.functions import to_timestamp
# Converting a date string column to timestamp using a specific format
payment_df = payment_df.withColumn("date", to_timestamp("date", "yyyy-MM-dd HH:mm:ss"))


display(payment_df.take(5))

payment_id,date,amount,rider_id
1,2019-05-01T00:00:00Z,9.0,1000
2,2019-06-01T00:00:00Z,9.0,1000
3,2019-07-01T00:00:00Z,9.0,1000
4,2019-08-01T00:00:00Z,9.0,1000
5,2019-09-01T00:00:00Z,9.0,1000


Save bronze_payment table

In [0]:
payment_df.write.format('delta') \
                .mode('overwrite') \
                .saveAsTable('bronze_payment')

## Rider table

In [0]:
rider_df = spark.read.format("csv") \
    .option("inferSchema","true") \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/bikesharing/riders.csv')

rider_df = rider_df.withColumnRenamed("_c0", "rider_id") \
                   .withColumnRenamed("_c1", "first") \
                   .withColumnRenamed("_c2", "last") \
                   .withColumnRenamed("_c3", "address") \
                   .withColumnRenamed("_c4", "birthday") \
                   .withColumnRenamed("_c5", "account_start_date") \
                   .withColumnRenamed("_c6", "account_end_date") \
                   .withColumnRenamed("_c7", "is_member")


display(rider_df.take(5))

rider_id,first,last,address,birthday,account_start_date,account_end_date,is_member
1000,Diana,Clark,1200 Alyssa Squares,1989-02-13,2019-04-23,,True
1001,Jennifer,Smith,397 Diana Ferry,1976-08-10,2019-11-01,2020-09-01,True
1002,Karen,Smith,644 Brittany Row Apt. 097,1998-08-10,2022-02-04,,True
1003,Bryan,Roberts,996 Dickerson Turnpike,1999-03-29,2019-08-26,,False
1004,Jesse,Middleton,7009 Nathan Expressway,1969-04-11,2019-09-14,,True


Save bronze_rider table

In [0]:
rider_df.write.format('delta') \
                .mode('overwrite') \
                .saveAsTable('bronze_rider')

## Station table

In [0]:
station_df = spark.read.format("csv") \
    .option("inferSchema","true") \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/bikesharing/stations.csv')

station_df = station_df.withColumnRenamed("_c0", "station_id") \
                   .withColumnRenamed("_c1", "name") \
                   .withColumnRenamed("_c2", "latitude") \
                   .withColumnRenamed("_c3", "longitude")

display(station_df.take(5))

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669


In [0]:
station_df.write.format('delta') \
                .mode('overwrite') \
                .saveAsTable('bronze_station')

## Trip table

In [0]:
trip_df = spark.read.format("csv") \
    .option("inferSchema","true") \
    .option('header','false') \
    .option('sep',',') \
    .load('dbfs:/FileStore/bikesharing/trips.csv')

trip_df = trip_df.withColumnRenamed("_c0", "trip_id") \
                   .withColumnRenamed("_c1", "rideable_type") \
                   .withColumnRenamed("_c2", "start_at") \
                   .withColumnRenamed("_c3", "ended_at") \
                   .withColumnRenamed("_c4", "start_station_id") \
                   .withColumnRenamed("_c5", "end_station_id") \
                   .withColumnRenamed("_c6", "rider_id")

display(trip_df.take(5))

trip_id,rideable_type,start_at,ended_at,start_station_id,end_station_id,rider_id
89E7AA6C29227EFF,classic_bike,2021-02-12T16:14:56Z,2021-02-12T16:21:43Z,525,660,71934
0FEFDE2603568365,classic_bike,2021-02-14T17:52:38Z,2021-02-14T18:12:09Z,525,16806,47854
E6159D746B2DBB91,electric_bike,2021-02-09T19:10:18Z,2021-02-09T19:19:10Z,KA1503000012,TA1305000029,70870
B32D3199F1C2E75B,classic_bike,2021-02-02T17:49:41Z,2021-02-02T17:54:06Z,637,TA1305000034,58974
83E463F23575F4BF,electric_bike,2021-02-23T15:07:23Z,2021-02-23T15:22:37Z,13216,TA1309000055,39608


In [0]:
trip_df.write.format('delta') \
                .mode('overwrite') \
                .saveAsTable('bronze_trip')

# Silver Stages
because this dataset doesn't have other process, i will skip this process to gold stages that create data warehouse.

# Gold Stages

## create  and load gold payment table

In [0]:
spark.sql("""
        create table if not exists gold_fact_payment
        as
        select row_number() over(order by payment_id) as gold_payment_id,
          p.date as time_id, 
          r.rider_id, 
          p.amount 
        from bronze_payment as p
        join bronze_rider as r
        on p.rider_id = r.rider_id
        """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## create and load gold dim rider 

In [0]:
spark.sql("""
        create table if not exists gold_dim_rider
        as
        select rider_id,
        first, 
        last, 
        address,
        birthday,
        account_start_date,
        account_end_date,
        is_member
        from bronze_rider
        """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## create and load gold dim station

In [0]:
spark.sql("""
        create table if not exists gold_dim_station
        as
        select station_id, 
        name,
        latitude, 
        longitude
        from bronze_station
        """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## create and load gold fact trip

In [0]:
spark.sql("""
        create table if not exists gold_fact_trip
        as
        select ROW_NUMBER() over(order by trip.trip_id) as fact_trip_id,
              rider.rider_id,
              trip.start_at AS start_at,
              trip.ended_at AS ended_at,
              trip.start_station_id,
              trip.end_station_id,
              DATEDIFF(minute, start_at, ended_at) AS Total_Mins
              FROM bronze_trip as trip
              JOIN bronze_rider as rider on trip.rider_id = rider.rider_id
        """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## create and load gold dim time

In [0]:
spark.sql("""
    create table if not exists gold_dim_time
    as
    SELECT DISTINCT start_at AS datetime,
      minute(start_at) as min,
      hour(start_at) as hour,
      dayofmonth(start_at) as day,
      month(start_at) as month,
      year(start_at) as year,
      dayofweek(start_at) as dayofweek
    from bronze_trip
    union all
    SELECT DISTINCT ended_at AS datetime,
          minute(ended_at) as min,
          hour(ended_at) as hour,
          dayofmonth(ended_at) as day,
          month(ended_at) as month,
          year(ended_at) as year,
          dayofweek(ended_at) as dayofweek
          from bronze_trip 
    union all
        SELECT DISTINCT date AS datetime,
          minute(date) as min,
          hour(date) as hour,
          dayofmonth(date) as day,
          month(date) as month,
          year(date) as year,
          dayofweek(date) as dayofweek
          from bronze_payment
    """)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]