In [1]:
import pandas as pd
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

22/02/28 11:46:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/28 11:46:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Question 1: Version of PySpark 

In [7]:
pyspark.__version__

'3.0.3'

### Question 2: Size of HVFHW February 2021 

In [16]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-28 11:37:02--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.232.75
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.232.75|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.3’


2022-02-28 11:37:24 (32.6 MB/s) - ‘fhvhv_tripdata_2021-02.csv.3’ saved [733822658/733822658]



In [3]:
fhvhv_schema = types.StructType([
    types.StructField("hvfhs_license_num", types.StringType(), True),
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag",types.StringType(), True)
])

In [4]:
df_fhvhv = spark.read \
        .option("header", "true") \
        .schema(fhvhv_schema) \
        .csv("fhvhv_tripdata_2021-02.csv")

df_fhvhv \
        .repartition(24) \
        .write.parquet("fhvhv_tripdata_2021-02.parquet", mode="overwrite")

                                                                                

In [5]:
!du -h fhvhv_tripdata_2021-02.parquet

210M	fhvhv_tripdata_2021-02.parquet


### Question 3: Records on Feb 2021

In [12]:
df_fhvhv.registerTempTable('fhvhv_tripdata')

In [13]:
df_result = spark.sql("""
SELECT 
    COUNT(*) 
FROM 
    fhvhv_tripdata
WHERE
    DATE(pickup_datetime) = '2021-02-15'
""").show()



+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

### Question 4: Day with the longest trip

In [9]:
df_fhvhv = df_fhvhv \
    .withColumn("trip_length", F.col("dropoff_datetime").cast("long") - F.col("pickup_datetime").cast("long")) \
    .withColumn("day", F.to_date("pickup_datetime"))

In [10]:
df=df_fhvhv \
    .groupBy("day") \
    .max("trip_length")

In [11]:
df.sort(F.col("max(trip_length)").desc()).show()



+----------+----------------+
|       day|max(trip_length)|
+----------+----------------+
|2021-02-11|           75540|
|2021-02-17|           57221|
|2021-02-20|           44039|
|2021-02-03|           40653|
|2021-02-19|           37577|
|2021-02-25|           35010|
|2021-02-18|           34612|
|2021-02-10|           34169|
|2021-02-21|           32223|
|2021-02-09|           32087|
|2021-02-06|           31447|
|2021-02-02|           30913|
|2021-02-05|           30511|
|2021-02-12|           30148|
|2021-02-08|           30106|
|2021-02-14|           29777|
|2021-02-22|           28278|
|2021-02-27|           27170|
|2021-02-15|           25874|
|2021-02-04|           25592|
+----------+----------------+
only showing top 20 rows



                                                                                

### Question 5: Stages for most frequent dispatching_base_num

In [16]:
df=df_fhvhv.groupBy("dispatching_base_num").count()

In [17]:
df.sort(F.col("count").desc()).show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows





### Question 6: Most common locations pair

In [7]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-02-25 09:15:19--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.85.29
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.85.29|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-02-25 09:15:20 (46.8 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [17]:
df_zones = spark.read.format("csv") \
                  .load("taxi+_zone_lookup.csv", header='true',)

In [28]:
df_fhvhv.registerTempTable("fhvhv_tripdata")
df_zones.registerTempTable("zone_data")

In [50]:
df_result = spark.sql("""
SELECT 
    CONCAT(pu.Zone, ' / ' , do.Zone) AS `pickup-dropoff`, COUNT(*) AS count
FROM 
    fhvhv_tripdata
JOIN 
    zone_data AS pu
ON 
    PULocationID = pu.LocationID
JOIN 
    zone_data AS do
ON
    DOLocationID = do.LocationID
GROUP BY
    `pickup-dropoff`
ORDER BY 
    COUNT(*) DESC;
""").show()



+--------------------+-----+
|      pickup-dropoff|count|
+--------------------+-----+
|East New York / E...|45041|
|Borough Park / Bo...|37329|
| Canarsie / Canarsie|28026|
|Crown Heights Nor...|25976|
|Bay Ridge / Bay R...|17934|
|Jackson Heights /...|14688|
|   Astoria / Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South / ...|14424|
|Flatbush/Ditmas P...|13976|
|South Ozone Park ...|13716|
|Brownsville / Bro...|12829|
|    JFK Airport / NA|12542|
|Prospect-Lefferts...|11814|
|Forest Hills / Fo...|11548|
|Bushwick North / ...|11491|
|Bushwick South / ...|11487|
|Crown Heights Nor...|11462|
|Crown Heights Nor...|11342|
|Prospect-Lefferts...|11308|
+--------------------+-----+
only showing top 20 rows





In [14]:
df_fhvhv.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [42]:
df = df_fhvhv \
    .join(df_zones, df_fhvhv.PULocationID == df_zones.LocationID) \
    .drop("LocationID", "Borough", "service_zone") \
    .withColumnRenamed("Zone", "pickup_zone")

In [43]:
df = df \
    .join(df_zones, df_fhvhv.DOLocationID == df_zones.LocationID) \
    .drop("LocationID", "Borough", "service_zone") \
    .withColumnRenamed("Zone", "dropoff_zone")

In [45]:
df = df \
    .withColumn("pickup-dropoff", F.concat(F.col("pickup_zone"), F.lit(" / "), F.col("dropoff_zone")))

In [46]:
df.select("pickup-dropoff").show(5)

+--------------------+
|      pickup-dropoff|
+--------------------+
|Brownsville / Can...|
|Canarsie / Browns...|
|Canarsie / Flatlands|
|Flatlands / Sunse...|
|Hunts Point / Wes...|
+--------------------+
only showing top 5 rows



In [47]:
df=df.groupBy("pickup-dropoff").count()

In [48]:
df.show()



+--------------------+-----+
|      pickup-dropoff|count|
+--------------------+-----+
|  Midtown South / NA| 3380|
|Hamilton Heights ...| 1452|
|Hamilton Heights ...|  960|
|Bedford Park / Be...| 5063|
|Bushwick South / ...|  372|
|    Inwood / Norwood|  664|
|East Flatbush/Rem...| 5659|
|Alphabet City / C...|  430|
|Park Slope / Park...| 9771|
|East Harlem South...|  107|
|Homecrest / Grave...| 1522|
|Eastchester / Hun...|  338|
|TriBeCa/Civic Cen...|  566|
|Norwood / Kensington|    4|
|Greenwich Village...|  675|
|Upper West Side N...| 1415|
|Kew Gardens Hills...|  157|
|Long Island City/...| 1487|
|Brownsville / Two...|  160|
|Soundview/Castle ...|   64|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [49]:
df.sort(F.col("count").desc()).show()



+--------------------+-----+
|      pickup-dropoff|count|
+--------------------+-----+
|East New York / E...|45041|
|Borough Park / Bo...|37329|
| Canarsie / Canarsie|28026|
|Crown Heights Nor...|25976|
|Bay Ridge / Bay R...|17934|
|Jackson Heights /...|14688|
|   Astoria / Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South / ...|14424|
|Flatbush/Ditmas P...|13976|
|South Ozone Park ...|13716|
|Brownsville / Bro...|12829|
|    JFK Airport / NA|12542|
|Prospect-Lefferts...|11814|
|Forest Hills / Fo...|11548|
|Bushwick North / ...|11491|
|Bushwick South / ...|11487|
|Crown Heights Nor...|11462|
|Crown Heights Nor...|11342|
|Prospect-Lefferts...|11308|
+--------------------+-----+
only showing top 20 rows



