In [2]:
!pip install findspark
import findspark
findspark.init()

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/03/01 05:03:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/01 05:03:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Q1

In [5]:
spark.version

'3.0.3'

# Q2

In [6]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-01 05:03:49--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.251.116
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.251.116|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-03-01 05:04:06 (44.0 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [7]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-02.csv')

In [11]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))


In [9]:
from pyspark.sql import types

In [12]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [14]:
df = df.repartition(24)

In [16]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [17]:
!ls -lhR fhvhv/2021/02/

fhvhv/2021/02/:
total 208M
-rw-r--r-- 1 pop pop    0 Mar  1 05:10 _SUCCESS
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00000-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00001-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00002-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00003-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00004-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00005-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00006-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00007-4e803433-c34f-4562-b7cc-15dc2671acca-c000.snappy.parquet
-rw-r--r-- 1 pop pop 8.7M Mar  1 05:10 part-00008-4e803433

# Q3

In [18]:
from pyspark.sql import functions as F

In [25]:
df_mod = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')

In [33]:
df_feb15 = df_mod.filter("pickup_date == '2021-02-15'")
print(f'No. of Records Starting on Feb 15: {df_feb15.count()}')



No. of Records Starting on Feb 15: 367170


                                                                                

# Q4

In [41]:
df_mod_q4 = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('duration', (F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime)))

In [43]:
df_mod_q4

DataFrame[hvfhs_license_num: string, dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, pickup_date: date, duration: bigint]

In [44]:
df_mod_q4 \
    .select('pickup_date', 'duration') \
    .sort(df_mod_q4.duration.desc()) \
    .show()



+-----------+--------+
|pickup_date|duration|
+-----------+--------+
| 2021-02-11|   75540|
| 2021-02-17|   57221|
| 2021-02-20|   44039|
| 2021-02-03|   40653|
| 2021-02-19|   37577|
| 2021-02-25|   35010|
| 2021-02-20|   34806|
| 2021-02-18|   34612|
| 2021-02-18|   34555|
| 2021-02-10|   34169|
| 2021-02-10|   32476|
| 2021-02-25|   32439|
| 2021-02-21|   32223|
| 2021-02-09|   32087|
| 2021-02-06|   31447|
| 2021-02-02|   30913|
| 2021-02-10|   30856|
| 2021-02-09|   30732|
| 2021-02-21|   30660|
| 2021-02-05|   30511|
+-----------+--------+
only showing top 20 rows



                                                                                

# Q5

In [45]:
df.registerTempTable('trips_data')

In [48]:
spark.sql("""
SELECT
    pickup_datetime,
    dropoff_datetime,
    unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime) AS duration 
FROM
    trips_data
ORDER BY
    duration DESC
LIMIT
    10
""").show()



+-------------------+-------------------+--------+
|    pickup_datetime|   dropoff_datetime|duration|
+-------------------+-------------------+--------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|   75540|
|2021-02-17 15:54:53|2021-02-18 07:48:34|   57221|
|2021-02-20 12:08:15|2021-02-21 00:22:14|   44039|
|2021-02-03 20:24:25|2021-02-04 07:41:58|   40653|
|2021-02-19 23:17:44|2021-02-20 09:44:01|   37577|
|2021-02-25 17:13:35|2021-02-26 02:57:05|   35010|
|2021-02-20 01:36:13|2021-02-20 11:16:19|   34806|
|2021-02-18 15:24:19|2021-02-19 01:01:11|   34612|
|2021-02-18 01:31:20|2021-02-18 11:07:15|   34555|
|2021-02-10 20:51:39|2021-02-11 06:21:08|   34169|
+-------------------+-------------------+--------+



                                                                                

In [49]:
spark.sql("""
SELECT
    dispatching_base_num,
    count(*) as count
FROM
    trips_data
GROUP BY 
    dispatching_base_num
ORDER BY
    count DESC
LIMIT
    10
""").show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
+--------------------+-------+



                                                                                

# Q6

In [50]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-03-01 06:36:24--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.159.24
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.159.24|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-03-01 06:36:24 (114 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [51]:
dfz = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [52]:
dfz.write.parquet('zones')

In [53]:
df_zones = spark.read.parquet('zones/')

In [55]:
df_zones.registerTempTable('zones_data')

In [78]:
spark.sql("""
SELECT
    CONCAT(zpu.zone, ' / ', zdo.zone) as pd,
    COUNT(*) as count
FROM
    trips_data t,
    zones_data zpu,
    zones_data zdo
 WHERE
    t.PULocationID = zpu.LocationID AND
    t.DOLocationID = zdo.LocationID
GROUP BY
    pd
ORDER BY
    count DESC
LIMIT 
    5;
""").show(truncate=False)



+-----------------------------------------+-----+
|pd                                       |count|
+-----------------------------------------+-----+
|East New York / East New York            |45041|
|Borough Park / Borough Park              |37329|
|Canarsie / Canarsie                      |28026|
|Crown Heights North / Crown Heights North|25976|
|Bay Ridge / Bay Ridge                    |17934|
+-----------------------------------------+-----+



                                                                                