### Importing data from Blob storage account for green taxi to create a Fact table for analysis purpose

In [2]:
%%pyspark
blob_account_name = "taxisourceblob"
blob_container_name = "taxisource"
from pyspark.sql import SparkSession

sc = SparkSession.builder.getOrCreate()
token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
blob_sas_token = token_library.getConnectionString("AzureBlobStorage1")

spark.conf.set(
    'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
    blob_sas_token)
df_greentaxi = spark.read.load('wasbs://taxisource@taxisourceblob.blob.core.windows.net/GreenTaxis_201911.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df_greentaxi.limit(10))

StatementMeta(sparkpool, 3, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, e15b4210-db64-47fe-8753-09a25e5f5c53)

In [3]:
df_greentaxi.printSchema

StatementMeta(sparkpool, 3, 4, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[VendorID: string, lpep_pickup_datetime: string, lpep_dropoff_datetime: string, store_and_fwd_flag: string, RatecodeID: string, PULocationID: string, DOLocationID: string, passenger_count: string, trip_distance: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, ehail_fee: string, improvement_surcharge: string, total_amount: string, payment_type: string, trip_type: string, congestion_surcharge: string]>

In [4]:
display(df_greentaxi,summary=True)

StatementMeta(sparkpool, 3, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1d74f067-592d-4cc2-b207-a6e48c178efc)

In [6]:
df_greentaxi.count()

StatementMeta(sparkpool, 3, 7, Finished, Available)

449500

In [7]:
## selecting only require columns
df_greentaxi = df_greentaxi.select(
    "VendorID",
    "lpep_pickup_datetime",
    "lpep_dropoff_datetime",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "trip_type",
    "total_amount"
)

StatementMeta(sparkpool, 3, 8, Finished, Available)

In [8]:
df_greentaxi.printSchema

StatementMeta(sparkpool, 3, 9, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[VendorID: string, lpep_pickup_datetime: string, lpep_dropoff_datetime: string, RatecodeID: string, PULocationID: string, DOLocationID: string, passenger_count: string, trip_distance: string, trip_type: string, total_amount: string]>

In [9]:
## updating the datatype of the columns as per the data present in it


from pyspark.sql.types import *

df_greentaxi_casted = df_greentaxi.withColumn("VendorID",df_greentaxi.VendorID.cast('int'))  \
                    .withColumn("lpep_pickup_datetime",df_greentaxi.lpep_pickup_datetime.cast(TimestampType()))     \
                    .withColumn("lpep_dropoff_datetime",df_greentaxi.lpep_dropoff_datetime.cast(TimestampType()))     \
                    .withColumn("RatecodeID",df_greentaxi.RatecodeID.cast('int'))     \
                    .withColumn("PULocationID",df_greentaxi.PULocationID.cast('int'))     \
                    .withColumn("DOLocationID",df_greentaxi.DOLocationID.cast('int'))     \
                    .withColumn("passenger_count",df_greentaxi.passenger_count.cast('int'))     \
                    .withColumn("trip_distance",df_greentaxi.trip_distance.cast('double'))     \
                    .withColumn("trip_type",df_greentaxi.trip_type.cast('int'))     \
                    .withColumn("total_amount",df_greentaxi.total_amount.cast('double'))

StatementMeta(sparkpool, 3, 10, Finished, Available)

In [10]:
df_greentaxi_casted.printSchema

StatementMeta(sparkpool, 3, 11, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[VendorID: int, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, RatecodeID: int, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, trip_type: int, total_amount: double]>

In [10]:
df_greentaxi_casted.show(10)

StatementMeta(taxisparkpool, 6, 11, Finished, Available)

+--------+--------------------+---------------------+----------+------------+------------+---------------+-------------+---------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|trip_type|total_amount|
+--------+--------------------+---------------------+----------+------------+------------+---------------+-------------+---------+------------+
|       2| 2019-11-01 00:11:24|  2019-11-01 00:23:12|         1|          66|         148|              1|          2.8|        1|       18.66|
|       2| 2019-11-01 00:49:25|  2019-11-01 01:14:19|         1|         145|         114|              1|         5.59|        1|        36.2|
|       1| 2019-11-01 00:57:22|  2019-11-01 01:09:23|         1|         255|          37|              1|          2.1|        1|       13.55|
|       2| 2019-11-01 00:59:52|  2019-11-01 01:08:19|         1|           7|         226|              1|         1.23|        1|      

In [11]:
display(df_greentaxi_casted,summary=True)

StatementMeta(sparkpool, 3, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, f9a503c1-66ae-4252-a3db-83890e0821be)

In [12]:
df_greentaxi_casted.dtypes

StatementMeta(sparkpool, 3, 13, Finished, Available)

[('VendorID', 'int'),
 ('lpep_pickup_datetime', 'timestamp'),
 ('lpep_dropoff_datetime', 'timestamp'),
 ('RatecodeID', 'int'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('passenger_count', 'int'),
 ('trip_distance', 'double'),
 ('trip_type', 'int'),
 ('total_amount', 'double')]

In [13]:
df_greentaxi_casted.describe().show()

StatementMeta(sparkpool, 3, 14, Finished, Available)

+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|           VendorID|        RatecodeID|      PULocationID|     DOLocationID|   passenger_count|     trip_distance|         trip_type|      total_amount|
+-------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|             362429|            362429|            449500|           449500|            362429|            449500|            362429|            449500|
|   mean| 1.8318649997654712|1.1028918767537919|108.07132369299221|129.3175216907675|1.3155100723176125|2.8923469410456066|1.0240516073493016| 18.70784015573655|
| stddev|0.37398664126344555|0.6636113660917435| 71.07901050482566|76.30459822530052|0.9806889475422557| 44.14225245004878|0.1532096351393596|14.519740015634326|
|    min|                  1

### Creating the lake Database for saving of the tables as Fact and Dimension for further analysis

In [None]:
%%sql

CREATE DATABASE taxis

In [None]:
## saving table into the newly created lake database

df_greentaxi_casted.write.mode("overwrite").saveAsTable("taxis.factgreentaxi")

In [19]:
# df_greentaxi_casted.write.mode("overwrite").parquet("abfss://taxioutput@taxiadlsaccount.dfs.core.windows.net/greentaxi/df_greentaxi_casted.parquet")

StatementMeta(taxisparkpool, 4, 18, Finished, Available)

In [19]:
df_result = spark.sql("SELECT * FROM taxis.factgreentaxi")

StatementMeta(sparkpool, 3, 20, Finished, Available)

In [20]:
df_result.show(10)

StatementMeta(sparkpool, 3, 21, Finished, Available)

+--------+--------------------+---------------------+----------+------------+------------+---------------+-------------+---------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|trip_type|total_amount|
+--------+--------------------+---------------------+----------+------------+------------+---------------+-------------+---------+------------+
|       2| 2019-11-01 00:11:24|  2019-11-01 00:23:12|         1|          66|         148|              1|          2.8|        1|       18.66|
|       2| 2019-11-01 00:49:25|  2019-11-01 01:14:19|         1|         145|         114|              1|         5.59|        1|        36.2|
|       1| 2019-11-01 00:57:22|  2019-11-01 01:09:23|         1|         255|          37|              1|          2.1|        1|       13.55|
|       2| 2019-11-01 00:59:52|  2019-11-01 01:08:19|         1|           7|         226|              1|         1.23|        1|      

### Loading data for Taxi zone using spark 

In [24]:
%%pyspark

# Creating dimension table of zones

df_taxizone = spark.read.load([
    'abfss://taxidata@taxidatastorageadls.dfs.core.windows.net/taxizones/TaxiZones1.csv',
    'abfss://taxidata@taxidatastorageadls.dfs.core.windows.net/taxizones/TaxiZones2.csv'
    ], format='csv'
## If header exists uncomment line below
, header=True
)
display(df_taxizone.limit(10))

StatementMeta(sparkpool, 3, 25, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9c94ad73-9d11-4e70-bdcc-9241c90bba75)

In [25]:
df_taxizone.count()

StatementMeta(sparkpool, 3, 26, Finished, Available)

266

In [26]:
df_taxizone.printSchema

StatementMeta(sparkpool, 3, 27, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]>

In [27]:
df_taxizone_casted = df_taxizone.withColumn("LocationID",df_taxizone.LocationID.cast('int'))

StatementMeta(sparkpool, 3, 28, Finished, Available)

In [28]:
df_taxizone_casted.printSchema

StatementMeta(sparkpool, 3, 29, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[LocationID: int, Borough: string, Zone: string, service_zone: string]>

### saving the taxi zone dimention table

In [29]:
df_taxizone_casted.write.mode("overwrite").saveAsTable("taxis.dimtaxizone")

StatementMeta(sparkpool, 3, 30, Finished, Available)

In [30]:
%%pyspark

# creating ratecodes table

spark.sql("USE taxis;")

spark.sql("""CREATE TABLE dimRateCodes (RateCodeId INT, RateCode VARCHAR(100),IsApproved INT);""")
    
spark.sql(""" INSERT INTO dimRateCodes VALUES(1, 'Standard Rate', 1),(2, 'JFK', 1),(3, 'Newark', 1),(4, 'Westchester', 0),(5, 'Negotiated fare', 1),(6, 'GroupRide', 1);""")

StatementMeta(sparkpool, 3, 31, Finished, Available)

DataFrame[]

In [32]:
#checking the table

df_ratecodes = spark.sql("SELECT * FROM dimRateCodes")
df_ratecodes.printSchema

StatementMeta(sparkpool, 3, 33, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[RateCodeId: int, RateCode: string, IsApproved: int]>

### Saving Rate code dimension table into database

In [None]:
df_ratecodes.write.mode("overwrite").saveAsTable("taxis.dimRateCode")

### Loading data for Yellow taxi into dataframe

In [33]:
## loading yellow taxi table

df_yellowtaxi = spark.read.load('abfss://taxidata@taxidatastorageadls.dfs.core.windows.net/YellowTaxis_201911.parquet', format='parquet')
display(df_yellowtaxi.limit(10))

StatementMeta(sparkpool, 3, 34, Finished, Available)

SynapseWidget(Synapse.DataFrame, 97d9195d-bc54-4af0-8465-24e1ab6d0fb1)

In [34]:
df_yellowtaxi.printSchema

StatementMeta(sparkpool, 3, 35, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: int, trip_distance: double, RatecodeID: int, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double]>

In [35]:
df_yellowtaxi_selected = df_yellowtaxi.select("VendorID",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "total_amount")

StatementMeta(sparkpool, 3, 36, Finished, Available)

In [36]:
df_yellowtaxi_selected.printSchema

StatementMeta(sparkpool, 3, 37, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, RatecodeID: int, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, total_amount: double]>

### Saving Yellow taxi as Fact table in lake database

In [37]:
df_yellowtaxi_selected.write.mode("overwrite").saveAsTable("taxis.factyellowtaxi")

StatementMeta(sparkpool, 3, 38, Finished, Available)

### loading fhvtaxis Data

In [38]:


df_fhvtaxis = spark.read.load('abfss://taxidata@taxidatastorageadls.dfs.core.windows.net/fhvtaxis_201911.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df_fhvtaxis.limit(10))

StatementMeta(sparkpool, 3, 39, Finished, Available)

SynapseWidget(Synapse.DataFrame, d4a0b4bb-5879-4b3b-8aa1-b3ccdeb94f28)

In [39]:
df_fhvtaxis.printSchema

StatementMeta(sparkpool, 3, 40, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: string, dropoff_datetime: string, PULocationID: string, DOLocationID: string, SR_Flag: string]>

In [40]:
df_fhvtaxis.columns

StatementMeta(sparkpool, 3, 41, Finished, Available)

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [41]:
df_fhvtaxis = df_fhvtaxis.select('hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID')

StatementMeta(sparkpool, 3, 42, Finished, Available)

In [42]:
## modifying the column type
df_fhvtaxis_casted = df_fhvtaxis.withColumn("pickup_datetime",df_fhvtaxis.pickup_datetime.cast(TimestampType()))  \
                            .withColumn("dropoff_datetime",df_fhvtaxis.dropoff_datetime.cast(TimestampType()))    \
                            .withColumn("PULocationID",df_fhvtaxis.PULocationID.cast('int'))    \
                            .withColumn("DOLocationID",df_fhvtaxis.DOLocationID.cast('int'))    

StatementMeta(sparkpool, 3, 43, Finished, Available)

In [43]:
df_fhvtaxis_casted.printSchema

StatementMeta(sparkpool, 3, 44, Finished, Available)

<bound method DataFrame.printSchema of DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int]>

### saving FHV taxi as fact table in lake database

In [44]:
df_fhvtaxis_casted.write.mode("overwrite").saveAsTable("taxis.factfhvtaxi")

StatementMeta(sparkpool, 3, 45, Finished, Available)

### Loading the JSON file data as dataframe

In [46]:

# importing necessary libraries
from pyspark.sql.functions import *

# specifying path
fhvBasesFilePath = 'abfss://taxidata@taxidatastorageadls.dfs.core.windows.net/FhvBases.json'

# Read FHV Bases json file
df_fhvbases = (
                spark
                  .read
                  .option("multiline", "true")
                  .json(fhvBasesFilePath)
             )

display(df_fhvbases)

# display(df_fhvbases)

StatementMeta(sparkpool, 3, 47, Finished, Available)

SynapseWidget(Synapse.DataFrame, a83183a3-842e-490f-bb2b-9db867b6db94)

### As the data is hierarchical , so flattening the file to load the data correctly

In [47]:
# flattening the file

df_fhvbases = (df_fhvbases.select(
    col("Address.Street").alias("address_street"),
    col('Address.City').alias('address_city'),
    col('Address.Postcode').alias('address_postcode'),
    col('Address.State').alias('address_state'),
    col('Address.Building').alias('address_building'),
    to_date(col('Date'),"MM/dd/yyyy").alias('date'),
    col('Entity Name').alias('entity_name'),
    col('GeoLocation.Latitude').alias('geolocation_latitude'),
    col('GeoLocation.Longitude').alias('geolocation_longitude'),
    col('GeoLocation.Location').alias('geolocation_location'),
    col('License Number').alias('license_number'),
    col('SHL Endorsed').alias('shl_endorsed'),
    col('GeoLocation.Location').alias('geolocation_location'),
    col('Telephone Number').alias('telephone_number'),
    col('Time').alias('geolocation_location'),
    col('Type of Base').alias('type_of_base')
))

StatementMeta(sparkpool, 3, 48, Finished, Available)

In [48]:
df_fhvbases.columns

StatementMeta(sparkpool, 3, 49, Finished, Available)

['address_street',
 'address_city',
 'address_postcode',
 'address_state',
 'address_building',
 'date',
 'entity_name',
 'geolocation_latitude',
 'geolocation_longitude',
 'geolocation_location',
 'license_number',
 'shl_endorsed',
 'geolocation_location',
 'telephone_number',
 'geolocation_location',
 'type_of_base']

In [49]:
df_fhvbases = df_fhvbases.select(
    'address_street',
 'address_city',
 'address_postcode',
 'address_state',
 'address_building',
 'date',
 'license_number',
 'type_of_base'
)

StatementMeta(sparkpool, 3, 50, Finished, Available)

### saving the table as Dimension table in Lake database

In [50]:
df_fhvbases.write.mode("overwrite").saveAsTable("taxis.dimbases")

StatementMeta(sparkpool, 3, 51, Finished, Available)

## Analysing the data using spark SQL

In [52]:
%%sql

SELECT month(lpep_pickup_datetime) as months, count(*) from factgreentaxi
group by month(lpep_pickup_datetime)

StatementMeta(sparkpool, 3, 53, Finished, Available)

<Spark SQL result set with 5 rows and 2 fields>

In [53]:
%%sql

SELECT year(lpep_pickup_datetime) as years, count(*) from factgreentaxi
group by year(lpep_pickup_datetime)

StatementMeta(sparkpool, 3, 54, Finished, Available)

<Spark SQL result set with 3 rows and 2 fields>

In [55]:
df = spark.sql('SELECT month(tpep_pickup_datetime) as months,year(tpep_pickup_datetime) as years,count(year(tpep_pickup_datetime)) as counts from taxis.factyellowtaxi WHERE month(tpep_pickup_datetime) BETWEEN 9 and 12 and  year(tpep_pickup_datetime) = 2019 group by month(tpep_pickup_datetime),year(tpep_pickup_datetime) order by years,months')


StatementMeta(sparkpool, 4, 2, Finished, Available)

In [None]:
df.createOrReplaceTempView('yellowmonthcount')