#### Test running pyspark + homework

##### Q2
FHV October 2019

Read the October 2019 FHV into a Spark Dataframe with a schema as we did in the lessons.

Repartition the Dataframe to 6 partitions and save it to parquet.

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [6]:
# This code presents homework for question 2
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Read a CSV file into a DataFrame
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

# Define a custom schema for the DataFrame with specific fields and data types.
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

# Re-reads the CSV file with the specified schema
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

# Repartitions the DataFrame
df = df.repartition(6)

# Write the DataFrame to Parquet format
df.write.parquet('fhv/2019/10/')

# Read the Parquet data back into a DataFrame
df = spark.read.parquet('fhv/2019/10/')

# Print the schema
df.printSchema()

df.show()

                                                                                

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B03162|2019-10-02 17:39:02|2019-10-02 18:42:21|         201|          71|   NULL|                B03162|
|              B02293|2019-10-03 12:24:26|2019-10-03 13:25:24|          65|         222|   NULL|                B02293|
|              B02401|2019-10-06 22:32:17|2019-10-06 22:37

##### Q3
How many taxi trips were there on the 15th of October?

Consider only trips that started on the 15th of October.|

In [7]:
# This code presents homework for question 3
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Read Parquet data into a DataFrame
df = spark.read.parquet('fhv/2019/10/')

# Register the DataFrame as a temporary table
df.registerTempTable('fhv_2019_10')

# Execute a SQL query using Spark SQL
spark.sql("""
SELECT
    COUNT(1)
FROM 
    fhv_2019_10
WHERE
    to_date(pickup_datetime) = '2019-10-15';
""").show()



+--------+
|count(1)|
+--------+
|   62610|
+--------+



#### Q4
What is the length of the longest trip in the dataset in hours?

In [9]:
   # This code presents homework for question 4
# Import necessary libraries
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Read Parquet data into a DataFrame
df = spark.read.parquet('fhv/2019/10/')

# Register the DataFrame as a temporary table
df.registerTempTable('fhv_2019_10')

# Perform DataFrame transformations and aggregations by adding two new columns duration_hours and pickup_date
df \
    .withColumn('duration_hours', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long')) / 3600) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration_hours') \
    .orderBy('max(duration_hours)', ascending=False) \
    .limit(5) \
    .show() 

+-----------+-------------------+
|pickup_date|max(duration_hours)|
+-----------+-------------------+
| 2019-10-28|           631152.5|
| 2019-10-11|           631152.5|
| 2019-10-31|  87672.44083333333|
| 2019-10-01|  70128.02805555555|
| 2019-10-17|             8794.0|
+-----------+-------------------+



                                                                                

In [18]:
# This code presents homework for question 6
# Import necessary libraries

import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

# Read Parquet data into a DataFrames
df_fhv = spark.read.parquet('fhv/2019/10/')
df_fhv.registerTempTable('fhv_2019_15')

# Read the CSV file into a DataFrame
df = spark.read.csv('zones/taxi_zone_lookup.csv', header=True, inferSchema=True)

# Register the DataFrame as a temporary table
df.createOrReplaceTempView('zones')
# Execute a SQL query using Spark SQL
result = spark.sql("SELECT * FROM zones LIMIT 10")
#result.show()

# Execute a SQL query using Spark SQL
spark.sql("""
SELECT
    COUNT(fhv.PULocationID), z.Zone
FROM 
    fhv_2019_15 fhv INNER JOIN zones z ON fhv.PULocationID = z.LocationID
GROUP BY 
    z.Zone 
ORDER BY 1     
LIMIT 5;
""").show()



+-------------------+--------------------+
|count(PULocationID)|                Zone|
+-------------------+--------------------+
|                  1|         Jamaica Bay|
|                  2|Governor's Island...|
|                  5| Green-Wood Cemetery|
|                  8|       Broad Channel|
|                 14|     Highbridge Park|
+-------------------+--------------------+



                                                                                