In [1]:
import pyspark
from pyspark.sql import SparkSession
import os
import sys
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

### Question 1:
Install Spark and PySpark 

* Install Spark
* Run PySpark
* Create a local spark session
* Execute spark.version.
  
What's the output?

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

24/03/05 17:18:15 WARN Utils: Your hostname, codespaces-47ef95 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
24/03/05 17:18:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/05 17:18:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/05 17:18:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark.version

'3.3.2'

In [None]:
# df = pd.read_csv('fhv_tripdata_2019-10.csv.gz', nrows=100)

### Question 2:
FHV October 2019 \
Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.\
Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [4]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PUlocationID', types.IntegerType(), True), 
    types.StructField('DOlocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

df_fhv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv.gz')

In [5]:
df_fhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [6]:
# pd.DataFrame.iteritems = pd.DataFrame.items
# pd.DataFrame.iteritems = pd.DataFrame.items
# spark.createDataFrame(df).schema

In [8]:
df_fhv \
    .repartition(6) \
    .write.parquet('fhvhv/2019/10/', mode='overwrite')

                                                                                

In [9]:
df_fhv = spark.read \
    .option("header", "true") \
    .parquet('fhvhv/2019/10/*')

In [10]:
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



### Question 3:
Count record

How many taxi trips were there on the 15th of Octobr?

Consider only trips that started on the 15th of October.

In [11]:
df_fhv.createOrReplaceTempView('fhv_trips_data')

In [12]:
spark.sql("""
SELECT
    COUNT(*)
FROM
    fhv_trips_data
WHERE pickup_datetime >= '2019-10-15 00:00:00' AND pickup_datetime <= '2019-10-15 23:59:59' 
""").show()

[Stage 4:>                                                          (0 + 2) / 2]

+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

### Question 4:
Longest trip for each day.
What is the length of the longest trip in the dataset in hours?

In [13]:
df_fhv = df_fhv \
    .withColumn("trip_duration_hours", (F.col("dropOff_datetime").cast("long") - F.col("pickup_datetime").cast("long")) / 3600)

In [14]:
df_fhv.createOrReplaceTempView('fhv_trips_data')

In [15]:
df_fhv.select('pickup_datetime','dropOff_datetime', 'trip_duration_hours').show(5)

+-------------------+-------------------+-------------------+
|    pickup_datetime|   dropOff_datetime|trip_duration_hours|
+-------------------+-------------------+-------------------+
|2019-10-01 09:55:38|2019-10-01 10:05:43|0.16805555555555557|
|2019-10-21 04:15:47|2019-10-21 04:36:04|0.33805555555555555|
|2019-10-19 12:00:00|2019-10-19 12:20:00| 0.3333333333333333|
|2019-10-11 14:28:00|2019-10-11 14:32:44|0.07888888888888888|
|2019-10-21 18:00:26|2019-10-21 18:07:21|0.11527777777777778|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [16]:
spark.sql("""
SELECT
     trip_duration_hours
FROM
    fhv_trips_data
ORDER BY trip_duration_hours DESC
LIMIT 5
""").show()

[Stage 8:>                                                          (0 + 2) / 2]

+-------------------+
|trip_duration_hours|
+-------------------+
|           631152.5|
|           631152.5|
|  87672.44083333333|
|  70128.02805555555|
|             8794.0|
+-------------------+



                                                                                

### Question 5:
User Interface
Spark’s User Interface which shows the application's dashboard runs on which local port?

4040

### Question 6 

Least frequent pickup location zone. Load the zone lookup data into a temp view in Spark Zone Data.

Using the zone lookup data and the FHV October 2019 data, what is the name of the LEAST frequent pickup location Zone? Zone?

In [17]:
df_zone = spark.read \
    .option("header", "true") \
    .parquet('zones/')

In [18]:
df_zone.createOrReplaceTempView('zones')

In [19]:
df_fhv_zones = df_fhv.join(df_zone, df_fhv.PUlocationID == df_zone.LocationID)
df_fhv_zones.createOrReplaceTempView('FHV_ZONES')

In [20]:
spark.sql("""
SELECT
     ZONE, COUNT(*) AS TOTAL_PICKUPS
FROM
    FHV_ZONES
GROUP BY ZONE
ORDER BY TOTAL_PICKUPS
LIMIT 1
""").show()

[Stage 12:>                                                         (0 + 2) / 2]

+-----------+-------------+
|       ZONE|TOTAL_PICKUPS|
+-----------+-------------+
|Jamaica Bay|            1|
+-----------+-------------+



                                                                                