In [1]:
from pyspark.sql.functions import *

In [2]:
#dbutils.fs.ls('/mnt/storage/')
#dbutils.fs.ls('/mnt/storage/')

In [3]:
#dbutils.fs.unmount('/mnt/storage')

In [4]:
#dbutils.fs.ls('mnt/datalake/')

In [5]:
fhv_trips_df = spark.read \
                .option('header', 'true') \
                .option('inferSchema', 'true') \
                .csv('/mnt/storage/fhv_tripdata_2018-12.csv')

In [6]:
display(fhv_trips_df.describe("Pickup_DateTime"))

summary,Pickup_DateTime
count,23854144
mean,
stddev,
min,2018-12-01 00:00:00
max,2018-12-31 23:59:59


In [7]:
'''display(
    fhv_trips_df.describe()
)'''

In [8]:
#print('before = ', fhv_trips_df.count())

fhv_trips_df = fhv_trips_df \
                .na.drop(how='any',subset=('PUlocationID', 'DOlocationID')) \
                .drop_duplicates() \
                .where( (col("Pickup_DateTime") > '2018-11-31') & (col('DropOff_datetime') < '2019-01-01') )

#print('after = ', fhv_trips_df.count())

In [9]:
fhv_trips_df = fhv_trips_df.select( \
                                  'Pickup_DateTime',
                                  'DropOff_datetime',
                                  'PUlocationID',
                                  'DOlocationID',
                                  'SR_Flag',
                                  'Dispatching_base_number'
                                  )

# We could have also dropped a col using fhv_trips_df = fhv_trips_df.drop('Dispatching_base_num')

In [10]:
fhv_trips_df = fhv_trips_df.select( \
                                  col('Pickup_DateTime').alias('pickup_date_time'),
                                  col('DropOff_datetime').alias('dropoff_date_time'),
                                  col('PUlocationID').alias('pickup_location'),
                                  col('DOlocationID').alias('dropoff_location'),
                                  col('SR_Flag').alias('sr_flag'),
                                  col('Dispatching_base_number').alias('dispatching_base_number')
                           )

'''
other way to rename columns
fhvTaxiTripDataDF = fhvTaxiTripDataDF
                        .withColumnRenamed("DropOff_DateTime", "DropTime")
                        .withColumnRenamed("PUlocationID", "PickupLocationId")
                        .withColumnRenamed("DOlocationID", "DropLocationId")
                        .withColumnRenamed("Dispatching_base_number", "BaseLicenseNumber")
'''

In [11]:
fhv_trips_df = fhv_trips_df \
                .withColumn('pickup_year', year('pickup_date_time')) \
                .withColumn('pickup_month', month('pickup_date_time')) \

In [12]:
fhv_trips_df = fhv_trips_df.select(
                                    '*',
                                     dayofmonth('pickup_date_time').alias('pickup_day')
)

In [13]:
#display(fhv_trips_df.describe('sr_flag'))

In [14]:

'''
display(
    fhv_trips_df.select(
        'Pickup_DateTime',
        'DropOff_datetime',
        round((unix_timestamp('DropOff_datetime') - unix_timestamp('Pickup_DateTime')) / 60).alias('trip_duration')
    )
)
'''
# need to be careful about open close brackets. Had initially forgotten to add a (). round(unix_timestamp('dropoff_date_time') - unix_timestamp('pickup_date_time') / 60).alias('trip_duration')
# What happens in the above line is that, only the pickup time unix timestamp is divided by 60 "unix_timestamp('pickup_date_time') / 60" and is subtracted by the drop_off_time
fhv_trips_df = fhv_trips_df.select(
                                    '*',
                                    round((unix_timestamp('dropoff_date_time') - unix_timestamp('pickup_date_time')) / 60).alias('trip_duration')
)

In [15]:
fhv_trips_df = fhv_trips_df.withColumn('trip_type',
                                       when(col('sr_flag') == 1, 'shared_trip') \
                                       .when(col('sr_flag') == 0, 'solo_trip') \
                                       .otherwise('solo_trip')
                                      ) \
                            .drop('sr_flag')

In [16]:
#display(fhv_trips_df.describe('trip_type'))

In [17]:
%fs ls /mnt/storage

path,name,size
dbfs:/mnt/storage/FhvBases.json,FhvBases.json,464836


In [18]:
fhvbases_df = spark.read \
                    .option("inferSchema", "true") \
                    .option("multiline", "true") \
                    .json("/mnt/storage/FhvBases.json")

In [19]:
#dbutils.fs.head('/mnt/storage/FhvBases.json')

In [20]:
fhv_bases_df = spark.read \
                .option('inferSchema', 'true') \
                .option('multiline', 'true') \
                .json('/mnt/storage/FhvBases.json')

In [21]:
'''
display(fhv_bases_df)
display(fhv_bases_df.head(5))
display(fhv_bases_df.describe())
display(fhv_bases_df.describe('Telephone Number'))
'''
#display(fhv_bases_df.head(5))

In [22]:
fhv_bases_df = fhv_bases_df.select(
                    col('License Number').alias('base_liscence_number'),
                    col('Type of Base').alias('base_type'),
    
                    col('Address.Building').alias('building_name'),
                    col('Address.Street').alias('street'),
                    col('Address.City').alias('city'),
                    col('Address.State').alias('state'),
                    col('Address.Postcode').alias('post_code'),
                    )

#display(fhv_bases_df)

In [23]:
'''
For some unknown reasons, the following is not working properly

joined_df = fhv_trips_df.join(fhv_bases_df, 
                 col("fhv_trips_df.dispatching_base_number") == col("fhv_bases_df.base_liscence_number"),
                  'inner'
                 )

it should work just fine according to this documentation
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=alias (search for join in this page. you will see the following line)
# joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
'''
fhv_trips_bases_df = fhv_trips_df.join(fhv_bases_df, 
                 fhv_trips_df.dispatching_base_number == fhv_bases_df.base_liscence_number,
                  'inner'
                 )


In [24]:
#display(fhv_trips_bases_df)

In [25]:
aggregated_df = fhv_trips_bases_df \
                .groupBy('city', 'base_type') \
                .sum('trip_duration') \
                \
                .withColumnRenamed('sum(trip_duration)', 'total_trip_time') \
                \
                .orderBy('city', 'base_type')

#display(aggregated_df)

##Fhv Taxi cleanup and transfromation (All the above transformations in single step)

In [27]:
fhv_trips_df = spark.read \
                .option('header', 'true') \
                .option('inferSchema', 'true') \
                .csv('/mnt/storage/fhv_tripdata_2018-12.csv')

In [28]:
fhv_trips_df = fhv_trips_df \
                \
                .na.drop(how='any',subset=('PUlocationID', 'DOlocationID')) \
                .drop_duplicates() \
                .where( (col("Pickup_DateTime") > '2018-11-31') & (col('DropOff_datetime') < '2019-01-01') ) \
                \
                .select( \
                                  col('Pickup_DateTime').alias('pickup_date_time'),
                                  col('DropOff_datetime').alias('dropoff_date_time'),
                                  col('PUlocationID').alias('pickup_location'),
                                  col('DOlocationID').alias('dropoff_location'),
                                  col('SR_Flag').alias('sr_flag'),
                                  col('Dispatching_base_number').alias('dispatching_base_number'),
                           ) \
                \
                .withColumn('pickup_year', year('pickup_date_time')) \
                .withColumn('pickup_month', month('pickup_date_time')) \
                \
                .select( \
                        '*',
                        round( \
                              (unix_timestamp('dropoff_date_time') - unix_timestamp('pickup_date_time')) / 60
                             ).alias('trip_duration'),
                        dayofmonth('pickup_date_time').alias('pickup_day')
                        ) \
                \
                .withColumn(\
                           'trip_type',
                           when(col('sr_flag') == 1, 'shared_trip') \
                           .when(col('sr_flag') == 2, 'solo_trip') \
                           .otherwise('solo_trip') \
                           ) \
                \
                .drop('sr_flag')
                
'''
# Clean and filter data
# Select only limited columns (rename if column names are not relevant)
# Create derived columns
# Create derived cols using switch case statements and drop 'sr_flag' col as its not needed anymore
'''

In [29]:
display(fhv_trips_df.describe())

summary,pickup_date_time,dropoff_date_time,pickup_location,dropoff_location,dispatching_base_number,pickup_year,pickup_month,trip_duration,pickup_day,trip_type
count,20946339,20946339,20946339.0,20946339.0,20946339,20946339.0,20946339.0,20946339.0,20946339.0,20946339
mean,,,138.94598554907375,141.6902006598862,,2018.0,12.0,20.50503784933491,15.407948949933449,
stddev,,,75.36454777711155,77.83205624891409,,0.0,0.0,15.689344112937787,8.850352057891298,
min,2018-12-01 00:00:00,2018-12-01 00:02:18,0.0,0.0,B00021,2018.0,12.0,-683.0,1.0,shared_trip
max,2018-12-31 23:58:12,2018-12-31 23:59:59,265.0,265.0,B03156,2018.0,12.0,7543.0,31.0,solo_trip


In [31]:
fhv_trips_bases_df.createOrReplaceTempView('local_fhv_taxi_trips_data')

In [32]:
sql_based_df = spark.sql("select * from local_fhv_taxi_trips_data where base_liscence_number = 'B00021'")

In [33]:
display(sql_based_df)

In [34]:
%sql

select * from local_fhv_taxi_trips_data
where base_liscence_number = 'B00021';

In [35]:
fhv_trips_bases_df.createOrReplaceGlobalTempView('fact_fhv_taxi_trip_data')

In [36]:
fhv_trips_bases_df.createOrReplaceGlobalTempView('fact_fhv_taxi_trip_data')