# Reading and Writing Data ( Extract Step )

- Produce Spark code in Databricks using Jupyter Notebooks and Python scripts

  _The notebook should contain Python code to extract information from CSV files stored in Databricks and write it to the Delta file system._

- Use distributed data storage using Azure Data Storage options

  _The notebook should contain Python code that picks files up from the Databricks file system storage and writes it out to Delta file locations._
  

In [0]:
# The snippets bellow can be used to relocate the ingested files if necessary since the script below assumes the files are located at: "/FileStore/csv_files"

# To create the path used on this script: 
# dbutils.fs.mkdirs("/FileStore/csv_files")

# To delete and move files and paths case necessary:
# dbutils.fs.rm('/delta/staging', recurse=True)
# dbutils.fs.mv("/old_location/file.csv", "/new_location")

True

In [0]:
from pyspark.sql.types import *


rider_schema = StructType([
    StructField("rider_id", LongType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("address", StringType(), True),
    StructField("birthday", DateType(), True),
    StructField("account_start_date", DateType(), True),
    StructField("account_end_date", DateType(), True),
    StructField("is_member", BooleanType(), True)
])

payment_schema = StructType([
    StructField("payment_id", LongType(), True),
    StructField("date", DateType(), True),
    StructField("amount", DecimalType(25, 2), True),
    StructField("rider_id", LongType(), True)
])

trip_schema = StructType([
    StructField("trip_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("start_at", TimestampNTZType(), True),
    StructField("ended_at", TimestampNTZType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("rider_id", LongType(), True)
])

station_schema = StructType([
    StructField("station_id", StringType(), True),
	StructField("name", StringType(), True),
	StructField("latitude", DoubleType(), True),
	StructField("longitude", DoubleType(), True)
])

files = [("trips", trip_schema), ("stations", station_schema), ("payments", payment_schema), ("riders", rider_schema)]
for file, schema in files:
    file_path = f"/FileStore/csv_files/{file}.csv"
    out_path = f"dbfs:/delta/staging/{file}"

    spark_df = spark.read\
                .format("csv")\
                .option("header", False)\
                .option("inferSchema", False)\
                .option("delimiter", ",")\
                .schema(schema)\
                .load(file_path)

    spark_df.write.format("delta").mode("overwrite").save(out_path)

In [0]:
# Use to check if the data was loaded correctly into delta
result = spark.read.format("delta").load('/delta/staging/stations')
display(result)

station_id,name,latitude,longitude]
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


# Loading Data from Delta ( Load Step )

- Implement key features of data lakes on Azure

  The notebook should contain code that creates tables and loads data from Delta files. The learner should use spark.sql statements to create the tables and then load data from the files that were extracted in the Extract step.
  

In [0]:
tables_name = ["stations", "riders", "payments", "trips"]
for name in tables_name:
  location_path = f"'/delta/staging/{name}'"
  spark.sql(f"CREATE TABLE IF NOT EXISTS staging_{name} USING DELTA LOCATION {location_path}")

In [0]:
# Use to check if data as properly loaded to tables
table_result = spark.sql(f"SELECT * FROM staging_{tables_name[0]} LIMIT 10")
display(table_result)

station_id,name,latitude,longitude
525,Glenwood Ave & Touhy Ave,42.012701,-87.66605799999999
KA1503000012,Clark St & Lake St,41.88579466666667,-87.63110066666668
637,Wood St & Chicago Ave,41.895634,-87.672069
13216,State St & 33rd St,41.8347335,-87.6258275
18003,Fairbanks St & Superior St,41.89580766666667,-87.62025316666669
KP1705001026,LaSalle Dr & Huron St,41.894877,-87.632326
13253,Lincoln Ave & Waveland Ave,41.948797,-87.675278
KA1503000044,Rush St & Hubbard St,41.890173,-87.62618499999999
KA1504000140,Winchester Ave & Elston Ave,41.92403733333333,-87.67641483333334
TA1305000032,Clinton St & Madison St,41.882242,-87.64106600000001


# Transforming Data to Gold ( Transfom Step )

- Use Spark and Databricks to run ELT processes by creating fact tables

  The fact table Python scripts should contain appropriate keys from the dimensions. In addition, the fact table scripts should appropriately generate the correct facts based on the diagrams provided in the first step.
  
- Use Spark and Databricks to run ELT processes by creating dimension tables

  The dimension Python scripts should match the schema diagram. Dimensions should generate appropriate keys and should not contain facts.

- Produce Spark code in Databricks using Jupyter Notebooks and Python scripts

  The transform scripts should at minimum adhere to the following: should write to delta; should use overwrite mode; save as a table in delta.

### Date Dimension Tranformation

In [0]:
date_dim_df = spark.sql("""
SELECT DISTINCT *
    FROM
    (
        Select  cast(start_at AS VARCHAR(20)) as date_id,
                year(start_at) as year,
                quarter(start_at) as quarter,
                month(start_at) as month,
                day(start_at) as day,
                weekday(start_at) weekday,
                hour(start_at) hour,
                minute(start_at) minute,
                second(start_at) second
        From staging_trips
        UNION ALL
        Select  cast(ended_at AS VARCHAR(20)) as date_id,
                year(ended_at) as year,
                quarter(ended_at) as quarter,
                month(ended_at) as month,
                day(ended_at) as day,
                weekday(ended_at) weekday,
                hour(ended_at) hour,
                minute(ended_at) minute,
                second(ended_at) second
        From staging_trips
        UNION ALL
        Select  cast(date AS VARCHAR(20)) as date_id,
                year(date) as year,
                quarter(date) as quarter,
                month(date) as month,
                day(date) as day,
                weekday(date) weekday,
                00 as hour,
                00 as minute,
                00 as second
        From staging_payments
    ) as temp
""")

date_dim_df.write.format("delta").mode("overwrite").saveAsTable("date_dim")
display(date_dim_df)

### Station Dimension Tranformation

In [0]:
stations_dim_df = spark.sql("""
SELECT DISTINCT station_id,
                name        AS station_name,
                latitude    AS station_latitude,
                longitude   AS station_longitude
	       FROM staging_stations
""")

stations_dim_df.write.format("delta").mode("overwrite").saveAsTable("station_dim")
display(stations_dim_df)

### Rider Dimention Tranformation

In [0]:
riders_dim_df = spark.sql("""
SELECT DISTINCT rider_id,
                concat('', first, last)                         AS rider_name,
                datediff(YEAR, birthday, account_start_date)	AS rider_age,
                birthday                                        AS rider_birthday,
                account_start_date                              AS rider_join_date,
                is_member
	FROM staging_riders
""")

riders_dim_df.write.format("delta").mode("overwrite").saveAsTable("rider_dim")
display(riders_dim_df)

rider_id,rider_name,rider_age,rider_birthday,rider_join_date,is_member
1030,JeremyRoberts,20,2000-05-13,2021-02-09,True
1200,ScottMorris,26,1995-10-29,2021-12-17,True
1495,MaryBecker,19,2001-06-15,2020-12-21,True
1578,AlyssaTerrell,12,2003-07-05,2015-12-24,True
1827,MelanieParker,64,1955-12-18,2020-06-04,False
1959,DouglasHicks,13,2003-07-25,2017-03-12,False
2109,MelissaVasquez,23,1994-04-16,2017-09-21,True
2272,LisaGonzales,23,1997-03-05,2020-11-24,True
2535,BettyGarner,22,1996-02-20,2018-03-09,True
2554,BrendaHampton,34,1985-02-13,2019-12-15,True


### Trips Fact Transformation

In [0]:
trips_fact_df = spark.sql("""
SELECT	t.trip_id,
        datediff(MINUTE, t.start_at, t.ended_at)    AS trip_duration_min,
        cast(t.start_at AS VARCHAR(20))             AS start_date_id,
        cast(t.ended_at AS VARCHAR(20))             AS ended_date_id,
        t.start_station_id,
        t.end_station_id,
        datediff(YEAR, r.rider_birthday, t.start_at)	    AS rider_age,
        r.rider_id
   FROM	staging_trips t
   JOIN station_dim s1 ON t.start_station_id = s1.station_id
   JOIN station_dim s2 ON t.end_station_id = s2.station_id
   JOIN rider_dim r    ON t.rider_id = r.rider_id
""")

trips_fact_df.write.format("delta").mode("overwrite").saveAsTable("trip_fact")
display(trips_fact_df)

trip_id,trip_duration_min,start_date_id,ended_date_id,start_station_id,end_station_id,rider_age,rider_id
222BB8E5059252D7,18,2021-06-13 09:48:47,2021-06-13 10:07:23,KA1503000064,13021,30,34062
1826E16CB5486018,5,2021-06-21 22:59:13,2021-06-21 23:04:29,TA1306000010,13021,26,5342
3D9B6A0A5330B04D,5,2021-06-18 16:06:42,2021-06-18 16:12:02,TA1305000030,13021,26,3714
07E82F5E9C9E490F,16,2021-06-17 16:46:23,2021-06-17 17:02:45,TA1305000034,13021,18,18793
A8E94BAECBF0C2DD,54,2021-06-13 17:36:29,2021-06-13 18:30:39,TA1308000009,TA1308000009,28,43342
378F4AB323AA1D14,46,2021-06-13 13:20:10,2021-06-13 14:06:14,TA1308000009,TA1308000009,28,6693
38AD311DC2EB1FBE,14,2021-06-16 17:14:30,2021-06-16 17:28:34,KA1503000019,KA1503000019,56,71480
1D466737F0B18097,34,2021-06-27 14:51:52,2021-06-27 15:26:39,TA1308000009,TA1308000009,40,50846
27E1142E1ACFAEFB,0,2021-06-21 13:58:26,2021-06-21 13:58:53,13257,13257,21,18951
67F2A115DAE77924,16,2021-06-22 00:51:43,2021-06-22 01:08:25,TA1308000009,TA1308000009,37,63987


### Payments Fact Tranformation

In [0]:
payments_fact_df = spark.sql("""
     SELECT p.payment_id,
            p.amount,
            r.rider_id,
            cast(p.date AS VARCHAR(20)) AS payment_date_id
       FROM staging_payments p
       JOIN rider_dim r ON p.rider_id = r.rider_id
""")

payments_fact_df.write.format("delta").mode("overwrite").saveAsTable("payment_fact")
display(payments_fact_df)

payment_id,amount,rider_id,payment_date_id
1,9.0,1000,2019-05-01
2,9.0,1000,2019-06-01
3,9.0,1000,2019-07-01
4,9.0,1000,2019-08-01
5,9.0,1000,2019-09-01
6,9.0,1000,2019-10-01
7,9.0,1000,2019-11-01
8,9.0,1000,2019-12-01
9,9.0,1000,2020-01-01
10,9.0,1000,2020-02-01
