## Question 1:

Install Spark and PySpark

Install Spark
Run PySpark
Create a local spark session
Execute spark.version.
What's the output?

In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

spark.version

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 19:29:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.3.2'

## Question 2:

FHV October 2019

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

We transform into a column schema

In [5]:
from pyspark.sql import types

fhv_schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.LongType(), True),
    types.StructField("DOLocationID", types.LongType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True)
])

In [6]:
df_fhv = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv("fhv_tripdata_2019-10.csv")

In [7]:
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

                                                                                

Write a parquet partioned in 6 files

In [11]:
df_fhv \
    .repartition(6) \
    .write.parquet('data/pq/fhvhv/2019/10/')

                                                                                

To check the size of the parquet

ls -lh
6.2M

In [12]:
df_fhvh_oct = spark.read.parquet('data/pq/fhvhv/2019/10/')

## Question 3:

Count records

How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.

## SPARK SQL

In [14]:
df_fhv.createOrReplaceTempView("df_fhv")

df_sql = spark.sql("""
SELECT 
    COUNT(1) AS number_records
FROM
    df_fhv
WHERE date_trunc('day', pickup_datetime) == "2019-10-15"
""")

df_sql.show()



+--------------+
|number_records|
+--------------+
|         62610|
+--------------+



                                                                                

## PYSPARK

In [15]:
from pyspark.sql import functions as F

df_fhv_15_october = df_fhv \
    .filter(F.date_trunc("day", "pickup_datetime") == "2019-10-15") 

df_fhv_15_october.count()


                                                                                

62610

## Question 4:

Longest trip for each day

What is the length of the longest trip in the dataset in hours?

631,152.50 Hours
243.44 Hours
7.68 Hours
3.32 Hours

In [16]:
df_fhv_longest = spark.sql("""
SELECT 
    pickup_datetime, dropoff_datetime,
    (unix_timestamp(dropoff_datetime)-unix_timestamp(pickup_datetime))/(3600) as diff 
FROM
    df_fhv
    order by 3 desc
""")

df_fhv_longest.show()



+-------------------+-------------------+------------------+
|    pickup_datetime|   dropoff_datetime|              diff|
+-------------------+-------------------+------------------+
|2019-10-28 09:00:00|2091-10-28 09:30:00|          631152.5|
|2019-10-11 18:00:00|2091-10-11 18:30:00|          631152.5|
|2019-10-31 23:46:33|2029-11-01 00:13:00| 87672.44083333333|
|2019-10-01 21:43:42|2027-10-01 21:45:23| 70128.02805555555|
|2019-10-17 14:00:00|2020-10-18 00:00:00|            8794.0|
|2019-10-26 21:26:00|2020-10-26 21:36:00| 8784.166666666666|
|2019-10-30 12:30:04|2019-12-30 13:02:08|1464.5344444444445|
|2019-10-25 07:04:57|2019-12-08 07:54:33|1056.8266666666666|
|2019-10-25 07:04:57|2019-12-08 07:21:11|1056.2705555555556|
|2019-10-01 13:47:17|2019-11-03 15:20:28| 793.5530555555556|
|2019-10-01 07:21:12|2019-11-03 08:44:21| 793.3858333333334|
|2019-10-01 13:41:00|2019-11-03 14:58:51|          793.2975|
|2019-10-01 18:43:20|2019-11-03 19:43:13| 792.9980555555555|
|2019-10-01 18:43:46|201

                                                                                

## Using PySpark

In [19]:
df_fhv

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: bigint, DOLocationID: bigint, SR_Flag: string, Affiliated_base_number: string]

In [20]:
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import col

df_fhv_2019_october = df_fhv \
    .withColumn("diff", (unix_timestamp("dropoff_datetime")-unix_timestamp("pickup_datetime"))/3600) \
    .orderBy(col("diff").desc()) \
    .show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|              diff|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   null|                B02832|          631152.5|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|          631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        null|        null|   null|                B02416| 87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   null|       B00746         | 70128.02805555555|
|              B02921|2019-

                                                                                

## Question 6:

Least frequent pickup location zone

Load the zone lookup data into a temp view in Spark
Zone Data

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone?

East Chelsea
Jamaica Bay
Union Sq
Crown Heights North

In [21]:
df_zones = spark.read \
    .option("header", "true") \
    .csv("taxi_zone_lookup.csv")

In [22]:
df_zones

DataFrame[LocationID: string, Borough: string, Zone: string, service_zone: string]

Rename the locationID column

In [23]:
df_zones = df_zones.withColumnRenamed('LocationID', 'PULocationID') 

Join both datasets

In [25]:
df_join = df_fhv.join(df_zones.select("PULocationID","Zone"), on=['PULocationID'], how='left')

Create temp table

In [26]:
df_join.createOrReplaceTempView("df_join")

Using SPARK SQL

In [27]:
df_less_fzones = spark.sql("""
SELECT 
    count(*) as count, zone
FROM
    df_join
GROUP BY zone
ORDER BY count ASC
""")
df_less_fzones.show(10)



+-----+--------------------+
|count|                zone|
+-----+--------------------+
|    1|         Jamaica Bay|
|    2|Governor's Island...|
|    5| Green-Wood Cemetery|
|    8|       Broad Channel|
|   14|     Highbridge Park|
|   15|        Battery Park|
|   23|Saint Michaels Ce...|
|   25|Breezy Point/Fort...|
|   26|Marine Park/Floyd...|
|   29|        Astoria Park|
+-----+--------------------+
only showing top 10 rows



                                                                                

Using PySpark

In [29]:
less_f_zones_pysparK = df_join \
.groupBy("zone").count() \
.sort(col("count").asc()) \
.show()



+--------------------+-----+
|                zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows



                                                                                