## Homework

In [24]:
import pyspark
from pyspark.sql import SparkSession, types, functions as F

### Creating a spark session

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Reading trips csv file into spark dataframe

In [9]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2021-06.csv')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

### Display the schema

By default it takes fields as String type

In [10]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [12]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

### Modify the schema

Update pickupdatetime and dropoffdatetime as Timestamp <br>
Update pickup and dropoff location ID as Integer

In [16]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)]
    )

### Re-read the csv with schema

In [17]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2021-06.csv')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [18]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



### Re-partition into 12 segments

In [20]:
df = df.repartition(12)

DataFrame[dispatching_base_num: string, pickup_datetime: timestamp, dropoff_datetime: timestamp, PULocationID: int, DOLocationID: int, SR_Flag: string, Affiliated_base_number: string]

### Write into parquet 

In [22]:
df.write.parquet('fhv', mode='overwrite')

23/03/06 05:13:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

### Query df 

In [38]:
df_process = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('trip_duration', df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long")) \
    .select('dispatching_base_num', 'pickup_date', 'dropoff_date', 'trip_duration', 'PULocationID', 'DOLocationID', 'SR_Flag', 'Affiliated_base_number')

df_process.show()

+--------------------+-----------+------------+-------------+------------+------------+-------+----------------------+
|dispatching_base_num|pickup_date|dropoff_date|trip_duration|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-----------+------------+-------------+------------+------------+-------+----------------------+
|              B02764| 2021-06-01|  2021-06-01|          305|         174|          18|      N|                B02764|
|              B02764| 2021-06-01|  2021-06-01|          298|          32|         254|      N|                B02764|
|              B02764| 2021-06-01|  2021-06-01|          910|         240|         127|      N|                B02764|
|              B02764| 2021-06-01|  2021-06-01|          457|         127|         235|      N|                B02764|
|              B02510| 2021-06-01|  2021-06-01|         1071|         144|         146|      N|                  null|
|              B02510| 2021-06-01|  2021-06-01| 

### Query 1

How many taxi trips were there on June 15?

In [93]:
df_process \
    .select('pickup_date') \
    .where(df_process.pickup_date=='2021-06-15') \
    .count()

                                                                                

452470

### Query 2

How long was the longest trip in Hours?

In [73]:
df_process \
    .groupBy("pickup_date") \
    .agg((F.max("trip_duration") / 3600 ).alias("trip_duration_sum")) \
    .orderBy(F.desc("trip_duration_sum")) \
    .show(1)

[Stage 75:>                                                         (0 + 8) / 8]

+-----------+-----------------+
|pickup_date|trip_duration_sum|
+-----------+-----------------+
| 2021-06-25| 66.8788888888889|
+-----------+-----------------+
only showing top 1 row





### Query 3

What is the name of the most frequent pickup location zone?

In [78]:
schema_zoon = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [79]:
df_zone = spark.read \
    .option("header", "true") \
    .schema(schema_zoon) \
    .csv('taxi+_zone_lookup.csv')

df_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [80]:
df_zone.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [83]:
df_zone.createOrReplaceTempView('t_zone')

In [82]:
df_process.createOrReplaceTempView('t_trips')

In [91]:
spark.sql("""
    SELECT 
        Zone as most_pickup_zone
    FROM t_zone 
    WHERE LocationID = (
        SELECT
            PULocationID
        FROM
            t_trips
        GROUP BY 
            PULocationID
        ORDER BY COUNT(*) desc LIMIT 1
)
""").show()



+-------------------+
|   most_pickup_zone|
+-------------------+
|Crown Heights North|
+-------------------+



                                                                                