In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/24 06:21:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
df_pandas = pd.read_csv('/home/rancher/data/fhvhv_tripdata_2021-02.csv',
                           parse_dates=['pickup_datetime', 'dropoff_datetime'],
                           nrows=100)

In [4]:
fhvhv_pandas = spark.createDataFrame(df_pandas)

In [6]:
fhvhv_schema = fhvhv_pandas.schema

In [164]:
fhvhv_df_spark = spark.read.csv('/home/rancher/data/fhvhv_tripdata_2021-02.csv', schema=fhvhv_schema)

# Question 1 - What version of spark?

In [159]:
pyspark.__version__

'3.0.3'

Answer: 3.0.3

# Question 2 - Total size of all partitions of FHVHV in February 2021

In [165]:
fhvhv_df_spark = fhvhv_df_spark.repartition(24)

In [166]:
fhvhv_df_spark.write.parquet('/home/rancher/data/fhvhv/2021/02', mode='overwrite')

                                                                                

In [182]:
!ls -lR --block-size=M /home/rancher/data/fhvhv/2021/02

/home/rancher/data/fhvhv/2021/02:
total 209M
-rw-r--r-- 1 rancher rancher 0M Feb 26 00:21 _SUCCESS
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00000-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00001-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00002-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00003-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00004-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00005-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00006-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.snappy.parquet
-rw-r--r-- 1 rancher rancher 9M Feb 26 00:20 part-00007-9610adeb-1773-43e9-99cf-aa5a2e379980-c000.sn

Answer: 209Mb

# Question 3 - How many records on Feb 15th 2021?

In [177]:
import pyspark.sql.functions as func

fhvhv_df_spark = fhvhv_df_spark.withColumn("day", func.to_date(fhvhv_df_spark.pickup_datetime))

In [81]:
fhvhv_df_spark.filter(fhvhv_df_spark.day=='2021-02-15').count()

                                                                                

367170

Answer: 367170

# Question 4 - What day in February 2021 had the longest trip duration?

In [281]:
fhvhv_df_spark = fhvhv_df_spark.withColumn(
        'trip_duration',
        (func.col('dropoff_datetime').cast('long') - func.col('pickup_datetime').cast('long'))/60.)

In [282]:
fhvhv_df_spark.createOrReplaceTempView("fhv_feb")
taxi_zones.createOrReplaceTempView("taxi_zones_DO")
taxi_zones.createOrReplaceTempView("taxi_zones_PU")

In [283]:
spark.sql("""
SELECT day, trip_duration
FROM fhv_feb
WHERE trip_duration = (SELECT MAX(trip_duration)
                        FROM fhv_feb)
""").show()



+----------+-------------+
|       day|trip_duration|
+----------+-------------+
|2021-02-11|       1259.0|
+----------+-------------+



                                                                                

Answer: 2021-02-11

# Question 5 - How many stages does Spark need for finding most frequent dispatching_base_num?

### Using spark.sql

In [184]:
most_freq_disp_base_num = spark.sql("""
SELECT dispatching_base_num,
       count(*) AS cnt
FROM fhv_feb
GROUP BY dispatching_base_num
ORDER BY cnt DESC
""")

In [185]:
most_freq_disp_base_num.show()



+--------------------+-------+
|dispatching_base_num|    cnt|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

Answer: 3 stages (from inspecting spark webserver jobs list)

### Using spark groupBy

In [186]:
most_freq_disp_base_num_2 = fhvhv_df_spark.groupBy('dispatching_base_num').count()

In [187]:
most_freq_disp_base_num_2.sort(func.desc('count')).show()



+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



                                                                                

Answer: Also 3 stages (from inspecting spark webserver jobs list)

# Question 6 - What is the most common pickup/dropoff location pair?

In [258]:
zones_PU = spark.read.csv("/home/rancher/data/raw/taxi+_zone_lookup.csv", header=True)
zones_DO = spark.read.csv("/home/rancher/data/raw/taxi+_zone_lookup.csv", header=True)

### If using spark GroupBy

In [188]:
most_common_pu_do_pair = fhvhv_df_spark.filter(fhvhv_df_spark.PULocationID != fhvhv_df_spark.DOLocationID) \
                                          .groupBy('PULocationID', 'DOLocationID') \
                                          .count() \
                                          .sort(func.desc('count'))

In [259]:
most_common_joined = most_common_pu_do_pair.join(zones_PU,
                                                 most_common_pu_do_pair.PULocationID==zones_PU.LocationID,
                                                 how='left') \
                                           .join(zones_DO, 
                                                 most_common_pu_do_pair.DOLocationID==zones_DO.LocationID,
                                                 how='left') \
                                           .select(most_common_pu_do_pair['PULocationID'],
                                                   most_common_pu_do_pair['DOLocationID'],
                                                   zones_PU['Zone'].alias('PU_zone'),
                                                   zones_DO['Zone'].alias('DO_zone'),
                                                   most_common_pu_do_pair['count']) \
                                           .sort(func.desc('count'))

In [260]:
most_common_joined.show()



+------------+------------+--------------------+--------------------+-----+
|PULocationID|DOLocationID|             PU_zone|             DO_zone|count|
+------------+------------+--------------------+--------------------+-----+
|         132|         265|         JFK Airport|                  NA|12542|
|         188|          61|Prospect-Lefferts...| Crown Heights North|11814|
|          36|          37|      Bushwick North|      Bushwick South|11491|
|          37|          36|      Bushwick South|      Bushwick North|11487|
|          61|         188| Crown Heights North|Prospect-Lefferts...|11462|
|          61|         225| Crown Heights North|  Stuyvesant Heights|11342|
|         225|          61|  Stuyvesant Heights| Crown Heights North|11293|
|          35|          76|         Brownsville|       East New York|11244|
|          39|          76|            Canarsie|       East New York|10688|
|          76|          35|       East New York|         Brownsville|10621|
|          4

                                                                                

Answer: JFK Airport / NA

#### Alternative, probably more efficient, method.  Find top 5 pairs first, then join to taxi_zones table.

In [261]:
top_5_pairs = most_common_pu_do_pair.take(5)

                                                                                

In [263]:
top_5_pairs = spark.createDataFrame(top_5_pairs)

In [264]:
top_5_pairs.show()

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|         132|         265|12542|
|         188|          61|11814|
|          36|          37|11491|
|          37|          36|11487|
|          61|         188|11462|
+------------+------------+-----+



In [267]:
top_5_pairs.columns

['PULocationID', 'DOLocationID', 'count']

In [268]:
top_5_joined = top_5_pairs.join(zones_PU,
                                top_5_pairs.PULocationID==zones_PU.LocationID,
                                how='left') \
                          .join(zones_DO,
                                top_5_pairs.DOLocationID==zones_DO.LocationID) \
                          .select(top_5_pairs['PULocationID'],
                                  top_5_pairs['DOLocationID'],
                                  zones_PU['Zone'].alias("PU_zone"),
                                  zones_DO['Zone'].alias("DO_zone"),
                                  top_5_pairs['count'])

In [270]:
top_5_joined.show()

+------------+------------+--------------------+--------------------+-----+
|PULocationID|DOLocationID|             PU_zone|             DO_zone|count|
+------------+------------+--------------------+--------------------+-----+
|         132|         265|         JFK Airport|                  NA|12542|
|         188|          61|Prospect-Lefferts...| Crown Heights North|11814|
|          36|          37|      Bushwick North|      Bushwick South|11491|
|          37|          36|      Bushwick South|      Bushwick North|11487|
|          61|         188| Crown Heights North|Prospect-Lefferts...|11462|
+------------+------------+--------------------+--------------------+-----+



Answer: JFK Airport / NA

### If using spark.sql

In [274]:
taxi_zones.createOrReplaceTempView("taxi_zones")

In [278]:
spark.sql(
"""
SELECT    -- fhv.day,
          -- fhv.PULocationID,
          -- fhv.DOLocationID,
          -- zones_PU.LocationID,
          zones_PU.Zone AS PU_zone,
          -- zones_DO.LocationID,
          zones_DO.Zone AS DO_zone,
          COUNT(*) AS cnt -- OVER (PARTITION BY zones_PU.LocationID, zones_DO.LocationID) AS cnt
FROM      fhv_feb AS fhv
LEFT JOIN taxi_zones AS zones_PU
  ON      fhv.PULocationID=zones_PU.LocationID
LEFT JOIN taxi_zones AS zones_DO
  ON      fhv.DOLocationID=zones_DO.LocationID
WHERE fhv.PULocationID != fhv.DOLocationID
-- GROUP BY fhv.PULocationID, fhv.DOLocationID
GROUP BY zones_PU.Zone, zones_DO.Zone
ORDER BY cnt DESC
LIMIT 10""").show()



+--------------------+--------------------+-----+
|             PU_zone|             DO_zone|  cnt|
+--------------------+--------------------+-----+
|         JFK Airport|                  NA|12542|
|Prospect-Lefferts...| Crown Heights North|11814|
|      Bushwick North|      Bushwick South|11491|
|      Bushwick South|      Bushwick North|11487|
| Crown Heights North|Prospect-Lefferts...|11462|
| Crown Heights North|  Stuyvesant Heights|11342|
|  Stuyvesant Heights| Crown Heights North|11293|
|         Brownsville|       East New York|11244|
|            Canarsie|       East New York|10688|
|       East New York|         Brownsville|10621|
+--------------------+--------------------+-----+



                                                                                

Answer: JFK Airport / NA