In [22]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [5]:
#Q1
pyspark.__version__

'3.3.2'

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [8]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [9]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-06.csv')

In [10]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [11]:
df = df.repartition(12)

In [16]:
df.write.parquet('fhvhv/2021/06/')

                                                                                

In [18]:
!ls -lh fhvhv/2021/06

total 271M
-rw-r--r-- 1 aminu aminu   0 Mar  5 22:09 _SUCCESS
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00000-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00001-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00002-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00003-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00004-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00005-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:08 part-00006-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:09 part-00007-9c35a00f-6aa2-4b3f-b6e2-04e93365f64a-c000.snappy.parquet
-rw-r--r-- 1 aminu aminu 23M Mar  5 22:09 part-0

In [None]:
#Q2
#24MB

In [19]:
df = spark.read.parquet('fhvhv/2021/06/')

In [20]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



Q3
Count records

How many taxi trips were there on June 15?

Consider only trips that started on June 15.

308,164
12,856
452,470
50,982 

In [21]:
df.select("pickup_datetime")\
    .filter((df.pickup_datetime >= '2021-06-15') &(df.pickup_datetime < '2021-06-16'))\
    .count()

                                                                                

452470

In [None]:
'''
Question 4:

Longest trip for each day

Now calculate the duration for each trip.
How long was the longest trip in Hours?

66.87 Hours
243.44 Hours
7.68 Hours
3.32 Hours 
'''

In [23]:
df.withColumn('duration', ( F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime))/3600).select('duration').sort('duration', ascending=False).show(1,truncate=False)

[Stage 8:>                                                          (0 + 4) / 4]

+----------------+
|duration        |
+----------------+
|66.8788888888889|
+----------------+
only showing top 1 row





In [24]:
taxi_zones = spark.read.option("header","true").csv("taxi+_zone_lookup.csv")

In [25]:
taxi_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [26]:
taxi_zones.createOrReplaceTempView('zones')

In [27]:
taxi_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [28]:
ts =df.join(taxi_zones, df.PULocationID == taxi_zones.LocationID)

In [30]:
ts.groupby('Zone').count().sort("count", ascending= False).show(1)



+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+
only showing top 1 row



                                                                                

## METHOD B

In [31]:
df.createOrReplaceTempView('fhvh')

In [37]:
combined_df = spark.sql("""

    SELECT * FROM fhvh
    JOIN zones
    ON  PULocationID = LocationID
    
""")

In [38]:
combined_df.createOrReplaceTempView('combined')

In [39]:
combined_df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+-------------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|      Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+-------------+--------------------+------------+
|              B02617|2021-06-04 16:50:34|2021-06-04 17:01:18|         118|         109|      N|                B02617|       118|Staten Island|Heartland Village...|   Boro Zone|
|              B02875|2021-06-02 22:28:45|2021-06-02 22:37:28|         163|          79|      N|                B02875|       163|    Manhattan|       Midtown North| Yellow Zone|
|              B02871|2021-06-03 11:47:48|2021-06-03 11:52:23|         231|          13|      N|         

In [41]:
spark.sql("""
    SELECT ZOne, count(1) as count FROM combined
    GROUP BY ZOne
    ORDER BY count DESC
    LIMIT 1
 
""").show()

[Stage 28:>                                                         (0 + 4) / 4]

+-------------------+------+
|               ZOne| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



