# Data access

In [0]:
dbutils.secrets.list(scope='nyctaxi-scope')

[SecretMetadata(key='nyctaxi-secret-applicationid'),
 SecretMetadata(key='nyctaxi-secret-directoryid'),
 SecretMetadata(key='nyctaxi-secret-servicecredential')]

In [0]:
storage_account = "adlsnyctaxii" 

service_credential = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-servicecredential")

application_id = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-applicationid")

directory_id = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-directoryid")

In [0]:
print(f"Storage Account: {storage_account}")
print(f"Service Credential: {service_credential}")
print(f"Application ID: {application_id}")
print(f"Directory ID: {directory_id}")

Storage Account: adlsnyctaxii
Service Credential: [REDACTED]
Application ID: [REDACTED]
Directory ID: [REDACTED]


In [0]:
storage_account = "adlsnyctaxii" 

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account>.dfs.core.windows.net", f"{application_id}")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account>.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account>.dfs.core.windows.net", f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

In [0]:

#OR

service_credential = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-servicecredential")
#application_id="bac529c762ac-a9dd-4f05-b26b-8cf307f00a19"
#directory_id="4acdee9a43ac-a52b-48a0-81ca-fa13ed007990"

spark.conf.set("fs.azure.account.auth.type.adlsnyctaxii.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.adlsnyctaxii.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.adlsnyctaxii.dfs.core.windows.net","b529c762-a9dd-4f05-b26b-8cf307f00a19")
spark.conf.set("fs.azure.account.oauth2.client.secret.adlsnyctaxii.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.adlsnyctaxii.dfs.core.windows.net", "https://login.microsoftonline.com/4dee9a43-a52b-48a0-81ca-fa13ed007990/oauth2/token")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8702630156509078>, line 11[0m
[1;32m      3[0m service_credential [38;5;241m=[39m dbutils[38;5;241m.[39msecrets[38;5;241m.[39mget(scope[38;5;241m=[39m[38;5;124m"[39m[38;5;124mnyctaxi-scope[39m[38;5;124m"[39m, key[38;5;241m=[39m[38;5;124m"[39m[38;5;124mnyctaxi-secret-servicecredential[39m[38;5;124m"[39m)
[1;32m      5[0m [38;5;66;03m#application_id = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-directoryid")[39;00m
[1;32m      6[0m [38;5;66;03m#application_id="b529c762-a9dd-4f05-b26b-8cf307f00a19"[39;00m
[1;32m      7[0m 
[1;32m      8[0m [38;5;66;03m#directory_id = dbutils.secrets.get(scope="nyctaxi-scope", key="nyctaxi-secret-directoryid")[39;00m
[1;32m      9[0m [38;5;66;03m#directory_id="[REDACTED]"[39;00m
[0;32m---> 11[0m spark

In [0]:
%sql
SELECT current_metastore();



In [0]:
%sql
SHOW Catalogs;



In [0]:
dbutils.fs.ls("abfss://bronze@adlsnyctaxii.dfs.core.windows.net/")



# Data Reading

**Importing Libraries**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *



### Reading CSV Data

**Trip Type Data**

In [0]:
df_trip_type = spark.read.format('csv') \
                    .option("inferschema",True) \
                      .option("header",True) \
                        .load("abfss://bronze@adlsnyctaxii.dfs.core.windows.net/trip_type")




In [0]:
df_trip_type.display()



**Trip Zone**

In [0]:
df_trip_zone = spark.read.format('csv') \
               .option("inferschema","true") \
                .option("header","true") \
                .load("abfss://bronze@adlsnyctaxii.dfs.core.windows.net/trip_zone")



In [0]:
df_trip_zone.display()



**Trip Data** 

using recursiveFileLookup() -> to read all files within hierarchy of folders

In [0]:
myschema = '''
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE

      '''



In [0]:
df_trip = spark.read.format('parquet') \
                .schema(myschema) \
                .option("header","true") \
                .option("recursiveFileLookup","true") \
                .load("abfss://bronze@adlsnyctaxii.dfs.core.windows.net/trips2023data/")

# Instead of .option("inferschema","true") 



In [0]:
df_trip.display()



# Data Transformation

**Taxi Trip Type**

In [0]:
df_trip_type.display()



In [0]:
df_trip_type = df_trip_type.withColumnRenamed('description','trip_description')
df_trip_type.display()



In [0]:
df_trip_type.write.format('parquet')\
            .mode('append')\
            .option("path","abfss://silver@adlsnyctaxii.dfs.core.windows.net/trip_type")\
            .save()



**Trip Zone**

In [0]:
df_trip_zone.display()



In [0]:
df_trip_zone = df_trip_zone.withColumn('zone1',split(col('Zone'),'/')[0])\
                            .withColumn('zone2',split(col('Zone'),'/')[1])

df_trip_zone.display()



In [0]:
df_trip_zone.write.format('parquet')\
          .mode('append')\
          .option('path','abfss://silver@adlsnyctaxii.dfs.core.windows.net/trip_zone')\
          .save()



**Trip Data**

In [0]:
df_trip.display()



In [0]:
df_trip = df_trip.withColumn('trip_date',to_date('lpep_pickup_datetime'))\
                  .withColumn('trip_year',year('lpep_pickup_datetime'))\
                  .withColumn('trip_month',month('lpep_pickup_datetime'))



In [0]:
df_trip.display()



In [0]:
df_trip = df_trip.select('VendorID','PULocationID','DOLocationID','fare_amount','total_amount')
df_trip.display()



In [0]:
df_trip.write.format('parquet')\
            .mode('append')\
            .option('path','abfss://silver@adlsnyctaxii.dfs.core.windows.net/trips2023data')\
            .save()



# Analysis

In [0]:
display(df_trip) 

