### Prepare data

In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet  -P ./data/raw/yellow/ 

In [9]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DecimalType,
    DoubleType,
)

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

schema = StructType(
    [
        StructField("ChargingEvent", StringType(), True),
        StructField("CPID", StringType(), True),
        StructField("StartDate", StringType(), True),
        StructField("StartTime", StringType(), True),
        StructField("EndDate", StringType(), True),
        StructField("EndTime", StringType(), True),
        StructField("Energy", DecimalType(scale=1), True),
        StructField("PluginDuration", DoubleType(), True),
    ]
)

# df = spark.read.option("header", True).schema(schema).csv("./electric-chargepoints-2017.csv")
df = spark.read.option("header", True).option("inferSchema", "true").csv("./electric-chargepoints-2017.csv")
df.show()

+-------------+-------+----------+-------------------+----------+-------------------+------+------------------+
|ChargingEvent|   CPID| StartDate|          StartTime|   EndDate|            EndTime|Energy|    PluginDuration|
+-------------+-------+----------+-------------------+----------+-------------------+------+------------------+
|     16673806|AN11719|2017-12-31|2025-03-20 14:46:00|2017-12-31|2025-03-20 18:00:00|   2.4|3.2333333333333334|
|     16670986|AN01706|2017-12-31|2025-03-20 11:25:00|2017-12-31|2025-03-20 13:14:00|   6.1|1.8166666666666667|
|      3174961|AN18584|2017-12-31|2025-03-20 11:26:11|2018-01-01|2025-03-20 12:54:11|  24.0|25.466666666666665|
|     16674334|AN00812|2017-12-31|2025-03-20 15:18:00|2018-01-01|2025-03-20 14:06:00|   6.7|              22.8|
|      3176831|AN24139|2017-12-31|2025-03-20 18:25:18|2018-01-01|2025-03-20 13:09:18|   6.1|18.733333333333334|
|     16673920|AN03984|2017-12-31|2025-03-20 14:54:00|2017-12-31|2025-03-20 19:19:00|   5.6| 4.416666666

In [10]:
df.printSchema()

root
 |-- ChargingEvent: integer (nullable = true)
 |-- CPID: string (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- StartTime: timestamp (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- EndTime: timestamp (nullable = true)
 |-- Energy: double (nullable = true)
 |-- PluginDuration: double (nullable = true)



In [None]:
df_yellow = spark.read \
    .parquet('./data/raw/yellow/')

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df_yellow = spark.read \
    .parquet('./data/raw/yellow/')

In [None]:
df_yellow.show(5)

In [None]:
df_yellow.printSchema()

### HW questions

In [None]:
# Q1
spark.version

In [None]:
#Q2
df_yellow = df_yellow.repartition(4)
df_yellow.write.parquet("./data/parquet/yellow/")

In [None]:
!ls -lh ./data/parquet/yellow/

In [None]:
#Q3
from pyspark.sql import functions as F

(
    df_yellow.filter(
        F.to_date(F.col("tpep_pickup_datetime")) == "2024-10-15")
    # .orderBy("tpep_pickup_datetime", ascending=False)
).count()

In [None]:
#Q3 - pandas
from datetime import datetime

import numpy as np
import pandas as pd

df_pd = pd.read_parquet("./data/parquet/yellow/")
df_pd["pickup_date"] = df_pd["tpep_pickup_datetime"].dt.date
df_pd[df_pd["pickup_date"] == datetime.strptime("2024-10-15", "%Y-%m-%d").date()]


In [None]:
df_pd["tpep_pickup_str"] = df_pd["tpep_pickup_datetime"].astype(str).str.split(" ").str[0]
df_pd["pickup_str"] = df_pd["pickup_date"].astype(str)
df_pd[df_pd["tpep_pickup_str"] != df_pd["pickup_str"]]

In [None]:
#Q4
(
    df_yellow
        .withColumn("trip_duration", 
                    (F.to_timestamp(F.col("tpep_dropoff_datetime")).cast("long") 
                     - F.to_timestamp(F.col("tpep_pickup_datetime")).cast("long")) / 3600
                )
        .select("tpep_dropoff_datetime", "tpep_pickup_datetime", "trip_duration")
        .orderBy("trip_duration", ascending=False)
).show(3)

In [None]:
max((pd.to_datetime(df_pd["tpep_dropoff_datetime"])
 - pd.to_datetime(df_pd["tpep_pickup_datetime"])) / np.timedelta64(1, 'h'))

In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv  -P ./data/raw/lookup/ 

In [None]:
df_lookup = spark.read \
        .option("header", "true") \
        .csv("./data/raw/lookup/taxi_zone_lookup.csv")

df_lookup.show()

In [None]:
(
    df_yellow
    .join(df_lookup, df_yellow.PULocationID == df_lookup.LocationID, "inner")
    .groupBy("Zone").count()
    .orderBy("count")
).head()