### Data Access

In [0]:
'''
spark.conf.set("fs.azure.account.auth.type.mystorageaccountrachit.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.mystorageaccountrachit.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.mystorageaccountrachit.dfs.core.windows.net", "4ed43d6d-b6d7-47d4-9c6a-5747f6462e34")
spark.conf.set("fs.azure.account.oauth2.client.secret.mystorageaccountrachit.dfs.core.windows.net", "sVv8Q~Ftq0zohnmNOXawwbOr5cDY_nY8AX2b2b9K")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.mystorageaccountrachit.dfs.core.windows.net", "https://login.microsoftonline.com/9958db81-6cef-48c5-b994-0f6bbc11e783/oauth2/token")'''


In [0]:
dbutils.fs.ls("abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/")

### Data Reading

Importing Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

Reading csv data

Trip Type 

In [0]:
df_triptype=spark.read.format('csv')\
                      .option('header',True)\
                      .option('inferSchema',True)\
                      .load('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/bronze/trip_type/')

In [0]:
df_triptype.display()

Trip Zone

In [0]:
df_tripzone=spark.read.format('csv')\
                      .option('inferSchema',True)\
                      .option('header',True)\
                      .load('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/bronze/trip_zone')

In [0]:
df_tripzone.display()

Reading parquet data

In [0]:
myschema = '''
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE

      '''

In [0]:
df_trip=spark.read.format('parquet')\
                  .schema(myschema)\
                  .option('header',True)\
                  .option('recursiveFileLookup',True)\
                  .load('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/bronze/trip2023data')

In [0]:
df_trip.display()

## SILVER LAYER

Taxi trip type

In [0]:
df_triptype=df_triptype.withColumnRenamed('description','Trip Description')

In [0]:
df_triptype.write.format('parquet')\
                 .mode('overwrite')\
                 .save('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/silver/triptype/')

Taxi Trip Zone

In [0]:
df_tripzone=df_tripzone.withColumn('Zone1',split(col('Zone'),'/')[0])\
                       .withColumn('Zone2',split(col('Zone'),'/')[1])

In [0]:
df_tripzone.display()

In [0]:
df_tripzone.write.format('parquet')\
                 .mode('overwrite')\
                 .save('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/silver/tripzone/')

Trip data

In [0]:
df_trip=df_trip.withColumn('Trip_year',year(col('lpep_pickup_datetime')))\
               .withColumn('Trip_month',month(col('lpep_pickup_datetime')))\
               .withColumn('Trip_date',to_date(col('lpep_pickup_datetime')))

In [0]:
df_trip.display()

In [0]:
df_trip=df_trip.select(['VendorID','DOLocationID','trip_distance','fare_amount','total_amount','Trip_year','Trip_date'])
df_trip.display()

In [0]:
df_trip.write.format('parquet')\
             .mode('overwrite')\
             .save('abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/silver/tripdata/')

In [0]:
print(df_trip.rdd.getNumPartitions())

Visualisation

In [0]:
display(df_trip)

Databricks visualization. Run in Databricks to view.


## GOLD LAYER

Storage variable

In [0]:
silver='abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/silver/'
gold='abfss://nyc-taxi-project@mystorageaccountrachit.dfs.core.windows.net/gold/'

#### **Data Reading, writing and creating Delta tables**

In [0]:
df_zone=spark.read.format('parquet')\
                  .option('header',True)\
                  .option('inferSchema',True)\
                  .load(f'{silver}/tripzone/')

Database creation

In [0]:
%sql
CREATE DATABASE gold;

In [0]:
df_zone.write.format('delta')\
             .mode('append')\
             .option('path',f'{gold}/tripzone')\
             .saveAsTable('gold.tripzone')    #delta table name

In [0]:
%sql
SELECT * FROM gold.tripzone
WHERE Borough='EWR';

In [0]:
# Rename columns to remove invalid characters
df_triptype = df_triptype.select([col(c).alias(c.replace(" ", "_").replace(";", "")) for c in df_triptype.columns])

In [0]:
df_triptype.write.format('delta')\
                 .mode('append')\
                 .option('path',f'{gold}/triptype')\
                 .saveAsTable('gold.triptype')

In [0]:
%sql
SELECT * FROM gold.triptype;

In [0]:
df_trip.write.format('delta')\
             .mode('append')\
             .option('path',f'{gold}/tripdata')\
             .saveAsTable('gold.trip_data')

In [0]:
%sql
SELECT DISTINCT VendorID FROM gold.trip_data;

Learning Delta Lake

In [0]:
%sql
UPDATE gold.tripzone
SET Borough = 'EMR'
WHERE Borough = 'EWR';

In [0]:
%sql
DESCRIBE HISTORY gold.tripzone ;

In [0]:
%sql
DELETE FROM gold.tripzone
where LocationID=1;

Versioning

In [0]:
%sql
DESCRIBE HISTORY gold.tripzone;

In [0]:
%sql
RESTORE gold.tripzone TO VERSION AS OF 0;

In [0]:
%sql
SELECT * FROM gold.tripzone where locationID=1;