# Pyspark join

In [53]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql.functions import concat, col, lit

In [54]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('fhvhv_tripdata_2021-02_homework') \
    .getOrCreate()

In [55]:
#!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

In [56]:
#!wc -l fhvhv_tripdata_2021-02.csv

In [57]:
#schema = types.StructType([
#    types.StructField('hvfhs_license_num', types.StringType(), True),
#    types.StructField('dispatching_base_num', types.StringType(), True),
#    types.StructField('pickup_datetime', types.TimestampType(), True),
#    types.StructField('dropoff_datetime', types.TimestampType(), True),
#    types.StructField('PULocationID', types.IntegerType(), True),
#    types.StructField('DOLocationID', types.IntegerType(), True),
#    types.StructField('SR_Flag', types.StringType(), True)
#])

I don't need to bother repartitioning etc. here as I've already done it before. I'll turn these code cells into markdown cells and simply start off from reading the parquet file

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

df = df.repartition(24)

df.write.parquet('fhvhv/2021/02/')

In [58]:
df_fhvhv = spark.read.parquet('fhvhv/2021/02/')
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [59]:
# Check how many rows in the df_fhvhv dataframe...
df_fhvhv.createOrReplaceTempView("fhvhv_2021_02")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM fhvhv_2021_02")
# sqlDF.show() # Comment this out for Bonus Question stages 

In [60]:
# Let's just check how many rows in the df_zones dataframe...
df_zones.createOrReplaceTempView("taxi_zones")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM taxi_zones")
# sqlDF.show() # Comment this out for Bonus Question stages 

There are many different types of joins in PySpark that support traditional SQL joins.
However, here we want to preserve everything that is in our fhvhv dataset when we join to the zone data.
If we do this we must do an outer join. Note that the default join in pyspark is an inner
join and will this ould have the result of dropping any rows that don't have a key in common (the one that we are joining on).

In [61]:
df_join_1 = df_fhvhv.join(df_zones, df_fhvhv.PULocationID == df_zones.LocationID, how='outer')

In [62]:
# Let's check again how many rows in the joined dataframe. It should be the same as 
# fhvhv dataset before we joined. 
df_join_1.createOrReplaceTempView("df_join_PU")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM df_join_PU")
# sqlDF.show() # Comment this out for Bonus Question stages 

Note that in the above this isn't exactly what I was expecting - there are more rows here than in the original fhvhv dataframe than when I started.
This is because I've done a full outer join. That means if there are any rows in the zone
dataframe that don't have a match it will be put here as well. This is not what we are after. Let's change this to a Left Outer Join. This will tell the join to just make sure to grab everything on the left dataset (but not the right) regardless of whether there is a match on the right data set.

In [63]:
# Do the join again but with a left outer join this time
df_join_1 = df_fhvhv.join(df_zones, \
                          df_fhvhv.PULocationID == df_zones.LocationID, \
                          how='leftouter')

In [64]:
# Let's check again how many rows in the joined dataframe. It should be the same as 
# fhvhv dataset before we joined. 
df_join_1.createOrReplaceTempView("df_join_PU")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM df_join_PU")
# sqlDF.show() # Comment this out for Bonus Question stages 

Okay, that's better. Now let's rename the zone column so we don't get confused down the track when we are doing more joining...

In [65]:
# We will do some renaming here to save ourselves a bit of confusion
df_join_1 = df_join_1 \
    .withColumnRenamed('Zone', 'PU_Zone') 

In [66]:
# sqlDF.show() # Comment this out for Bonus Question stages 

Now we need to find out the zone names for the drop off location. We will do the same thing as we did for the pickup location. Remembering of course that this is a left outer join

In [67]:
df_join_2 = df_fhvhv.join(df_zones, \
                          df_fhvhv.DOLocationID == df_zones.LocationID, \
                          how = 'leftouter')

In [68]:
# Let's check again how many rows are in the joined dataframe. It should again be the same as
# fhvhv dataset before we joined. 
df_join_2.createOrReplaceTempView("df_join_DO")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM df_join_DO")
# sqlDF.show() # Comment this out for Bonus Question stages 

That looks good. But let's do some renaming again so we don't get confused when we join the two tables together (df_join_1 + df_join_2)

In [69]:
# We will again rename our zone column 
df_join_2 = df_join_2 \
    .withColumnRenamed('Zone', 'DO_Zone') 

In [70]:
# sqlDF.show() # Comment this out for Bonus Question stages 

We now do what is essentially a self-join. It would have been better if we had a unique id for each row but we don't. Hence things get a bit ugly. I'm going to say that as long as the hvfhs_license_num, dispatching_base_num, pickup_datetime, dropoff_datetime, PULocationID and DOLocationID are the same in each then it is ok to join - this is what uniquely identifies a row. This is terribly inefficient and setting up a UID would have been much better. 

In [71]:
df_join_1_and_2 = df_join_1.join(df_join_2, (
    df_join_1.hvfhs_license_num == df_join_2.hvfhs_license_num) \
    & (df_join_1.dispatching_base_num == df_join_2.dispatching_base_num) \
    & (df_join_1.pickup_datetime == df_join_2.pickup_datetime) \
    & (df_join_1.dropoff_datetime == df_join_2.dropoff_datetime) \
    & (df_join_1.PULocationID == df_join_2.PULocationID) \
    & (df_join_1.DOLocationID == df_join_2.DOLocationID) )

Again, let's check the count on this dataframe - it should still be the same as what we
originally started with in the df_fhvhv dataframe. If it's not something has gone wrong.

In [72]:
# Do a count check
df_join_1_and_2.createOrReplaceTempView("df_join_PU_DO")

sqlDF = spark.sql("SELECT COUNT(1) \
                     FROM df_join_PU_DO")
# sqlDF.show() # Comment this out for Bonus Question stages 

Now it's time to get the column we were after all along. The combination of PU_Zone/DO_Zone so that we can then see which combination is the most popular

In [73]:
df_join_3 = df_join_1_and_2 \
    .withColumn('PU_DO_Zone', concat(col('PU_Zone'), lit('/'), col('DO_Zone')))

In [74]:
#Let's have a look at the joined dataframe and see if we have our new column here
# sqlDF.show() # Comment this out for Bonus Question stages 

Now comes the moment of truth. Which are the most popular PU/DO Zone combinations? Let's do a Group By on the PU/DO Zone combo and then an Order By on the count in descending order to get the most popular first.

In [75]:
df_join_3.createOrReplaceTempView("pu_do_zones")

sqlDF = spark.sql("SELECT PU_DO_Zone, \
                          count(1) \
                     FROM pu_do_zones \
                    GROUP BY PU_DO_Zone \
                    ORDER BY count(1) DESC")
sqlDF.show(20, False) # to show the full column contents



+---------------------------------------------------+--------+
|PU_DO_Zone                                         |count(1)|
+---------------------------------------------------+--------+
|East New York/East New York                        |45041   |
|Borough Park/Borough Park                          |37329   |
|Canarsie/Canarsie                                  |28026   |
|Crown Heights North/Crown Heights North            |25976   |
|Bay Ridge/Bay Ridge                                |17934   |
|Jackson Heights/Jackson Heights                    |14688   |
|Astoria/Astoria                                    |14688   |
|Central Harlem North/Central Harlem North          |14481   |
|Bushwick South/Bushwick South                      |14424   |
|Flatbush/Ditmas Park/Flatbush/Ditmas Park          |13976   |
|South Ozone Park/South Ozone Park                  |13716   |
|Brownsville/Brownsville                            |12829   |
|JFK Airport/NA                                     |12

