##### This notebook is an assignment in udacity data engineering project with Azure.
##### In this project, it requires:
- Design a star schema based on the business outcomes below <br>
- Import the data into Azure Databricks using Delta Lake to create a Bronze data store <br>
- Create a gold data store in Delta Lake tables <br>
- Transform the data into the star schema for a Gold data store <br>

##### Steps in this notebook:
- Extract data from csv to DBFS
- Refine tables add column names to each table
- Write data into delta file system
- Create delta tables
- Transform data to star schema
- Write star schema into delta tables

In [0]:
from pyspark.sql import functions as f
from pyspark.sql.functions import sequence, explode, to_date

##### Extract data from csv to DBFS

In [0]:
# ingest data from DBFS and create dataframe
# payments
df_payments = spark.read.format("csv") \
            .option("inferSchema", "false") \
            .option("header", "false") \
            .option("sep", ",") \
            .load("/FileStore/tables/payments/payments.csv")

# trips
df_trips = spark.read.format("csv") \
            .option("inferSchema", "false") \
            .option("header", "false") \
            .option("sep", ",") \
            .load("/FileStore/tables/trips/trips.csv")

# riders
df_riders = spark.read.format("csv") \
            .option("inferSchema", "false") \
            .option("header", "false") \
            .option("sep", ",") \
            .load("/FileStore/tables/riders/riders.csv")

# stations
df_stations = spark.read.format("csv") \
            .option("inferSchema", "false") \
            .option("header", "false") \
            .option("sep", ",") \
            .load("/FileStore/tables/stations/stations.csv")

In [0]:
# validate 4 dataframes
df_payments.take(5)

In [0]:
df_riders.take(5)

In [0]:
df_stations.take(5)

In [0]:
df_trips.take(5)

##### Refine tables add column names to each table

In [0]:
# add column names to each dataframe
# payments
df_payments = df_payments.withColumnRenamed("_c0", "payment_id") \
        .withColumnRenamed("_c1", "date") \
        .withColumnRenamed("_c2", "amount") \
        .withColumnRenamed("_c3", "rider_id")

In [0]:
# add column names to each dataframe
# riders
df_riders = df_riders.withColumnRenamed("_c0", "rider_id") \
        .withColumnRenamed("_c1", "first") \
        .withColumnRenamed("_c2", "last") \
        .withColumnRenamed("_c3", "address") \
        .withColumnRenamed("_c4", "birthday") \
        .withColumnRenamed("_c5", "startdate") \
        .withColumnRenamed("_c6", "enddate") \
        .withColumnRenamed("_c7", "member")

In [0]:
# add column names to each dataframe
# trips
df_trips = df_trips.withColumnRenamed("_c0", "trip_id") \
        .withColumnRenamed("_c1", "rideable_type") \
        .withColumnRenamed("_c2", "started_at") \
        .withColumnRenamed("_c3", "ended_at") \
        .withColumnRenamed("_c4", "start_station_id") \
        .withColumnRenamed("_c5", "end_station_id") \
        .withColumnRenamed("_c6", "rider_id")

In [0]:
# add column names to each dataframe
# stations
df_stations= df_stations.withColumnRenamed("_c0", "station_id") \
        .withColumnRenamed("_c1", "name") \
        .withColumnRenamed("_c2", "latitude") \
        .withColumnRenamed("_c3", "longitude")

In [0]:
df_payments.take(2)

##### Write data into delta file system

In [0]:
# write data into delta lake
df_payments.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/payments")

df_trips.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/trips")

df_stations.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/stations")

df_riders.write.format("delta") \
    .mode("overwrite") \
    .save("/delta/riders")

##### Create delta tables

In [0]:
# load data
'''
df_payments = spark.read.format("delta") \
    .load("/delta/payments")

# write into tables
df_payments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("payments")
'''

In [0]:
# load data
'''
df_riders = spark.read.format("delta") \
    .load("/delta/riders")

# write into tables
df_riders.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("riders")

# load data
df_trips = spark.read.format("delta") \
    .load("/delta/trips")

# write into tables
df_trips.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("trips")

# load data
df_stations = spark.read.format("delta") \
    .load("/delta/stations")

# write into tables
df_stations.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("stations")
'''

In [0]:
# write into tables
spark.sql("CREATE TABLE payments USING DELTA LOCATION '/delta/payments'")
spark.sql("CREATE TABLE trips USING DELTA LOCATION '/delta/trips'")
spark.sql("CREATE TABLE riders USING DELTA LOCATION '/delta/riders'")
spark.sql("CREATE TABLE stations USING DELTA LOCATION '/delta/stations'")

##### Transform data to star schema

In [0]:
# read all tables from delta
df_payments = spark.read.format("delta") \
    .load("/delta/payments")

df_riders = spark.read.format("delta") \
    .load("/delta/riders")

df_trips = spark.read.format("delta") \
    .load("/delta/trips")

df_stations = spark.read.format("delta") \
    .load("/delta/stations")

In [0]:
# process df_trips into star schema

def process_trips(in_df):
    ''' 
    this function process df_trips to dim_trips in star schema
    includes creating additional columns and adjusting data formats
    args: dataframe
    returns: dataframe
    
    '''
    # select needed cols
    out_df = in_df.select('trip_id', 'rider_id', 'rideable_type', 'start_station_id', 'end_station_id', 'started_at', 'ended_at')
    
    # convert to correct data format
    out_df = out_df.withColumn('rider_id', f.col('rider_id').cast('int')) \
        .withColumn('started_at', f.to_timestamp('started_at')) \
        .withColumn('ended_at', f.to_timestamp('ended_at')) \
    
    # create duration column
    out_df = out_df.withColumn('duration', f.col('ended_at').cast('long') - f.col('started_at').cast('long'))
    
    # create rider age column
    merged_df = out_df.join(df_riders.select('birthday','rider_id'), on='rider_id', how='left')
    out_df = merged_df.withColumn('rider_age', f.datediff(f.col('ended_at'), f.col('birthday'))/360) \
        .withColumn('rider_age', f.round(f.col('rider_age'), 0))
    
    # drop added birthday
    out_df = out_df.drop('birthday')
    
    return out_df

In [0]:
fact_trips = process_trips(df_trips)

In [0]:
# process payments
def process_payments(in_df):
    ''' 
    this function process df_payments to fact_payments in star schema
    includes adjusting data formats
    args: dataframe
    returns: dataframe
    
    '''
    # select needed cols
    out_df = in_df.select('payment_id', 'date', 'amount', 'rider_id')
    
    # convert to correct data format
    out_df = out_df.withColumn('payment_id', f.col('payment_id').cast('int')) \
    .withColumn('rider_id', f.col('rider_id').cast('int')) \
    .withColumn('amount', f.col('amount').cast('float')) \
    .withColumn('date', f.col('date').cast('date'))
    
    return out_df

In [0]:
fact_payments = process_payments(df_payments)

In [0]:
# process riders
def process_riders(in_df):
    ''' 
    this function process df_riders to dim_riders in star schema
    includes adjusting data formats
    args: dataframe
    returns: dataframe
    
    '''
    # select needed cols
    out_df = in_df.select('rider_id', 'first', 'last', 'address', 'birthday', 'startdate', 'enddate', 'member')
    
    # convert to correct data format
    out_df = out_df.withColumn('rider_id', f.col('rider_id').cast('int')) \
        .withColumn('birthday', f.col('birthday').cast('date')) \
        .withColumn('startdate', f.col('startdate').cast('date')) \
        .withColumn('enddate', f.col('enddate').cast('date')) \
        .withColumn('member', f.col('member').cast('boolean'))
    return out_df

In [0]:
dim_riders = process_riders(df_riders)

In [0]:
# process stations
def process_stations(in_df):
    ''' 
    this function process df_stations to dim_stations in star schema
    includes adjusting data formats
    args: dataframe
    returns: dataframe
    
    '''
    # select needed cols
    out_df = in_df.select('station_id', 'name', 'latitude', 'longitude')
    
    # set nullable to false for primary key
    out_df.schema['station_id'].nullable = False
    # convert to correct data format
    out_df = out_df.withColumn('latitude', f.col('latitude').cast('float')) \
        .withColumn('longitude', f.col('longitude').cast('float'))
    
    return out_df

In [0]:
dim_stations = process_stations(df_stations)

In [0]:
# generate dim_calendar table
# https://stackoverflow.com/questions/43141671/sparksql-on-pyspark-how-to-generate-time-series

def create_calendar():
    ''' 
    this function creates a dim_calendar used as dimension table in star schema
    args: none
    returns: dataframe
    
    '''
    # create date series
    out_df = spark.sql("SELECT sequence(to_date('2010-01-01'), to_date('2022-09-25'), interval 1 day) as date").withColumn("date", explode(f.col("date")))
    # create tother columns
    out_df = out_df.withColumn('quarter', f.quarter(f.col('date'))) \
        .withColumn('year', f.year(f.col('date'))) \
        .withColumn('month', f.month(f.col('date')))
    return out_df

In [0]:
dim_calendar = create_calendar()

##### Write star schema into delta tables

In [0]:
# write star schema tables to delta
dim_stations.write.format('delta').mode('overwrite').saveAsTable('dim_stations')
dim_calendar.write.format('delta').mode('overwrite').saveAsTable('dim_calendar')
dim_riders.write.format('delta').mode('overwrite').saveAsTable('dim_riders')
fact_trips.write.format('delta').mode('overwrite').saveAsTable('fact_trips')
fact_payments.write.format('delta').mode('overwrite').saveAsTable('fact_payments')