In [None]:
from pathlib import Path

from pyspark.sql.session import SparkSession
from pyspark.sql import DataFrame

MAX_MEMORY = "5g"
spark = (
    SparkSession.builder.master("local[*]")
    .appName("TripAnaliysis")
    .config("spark.excutor.memory", MAX_MEMORY)
    .config("spark.driver.memory", MAX_MEMORY)
    .getOrCreate()
)


In [None]:
import os
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.session import SparkSession


class FileLoadInParquet:
    def __init__(self, year: int, taxi_type: str) -> None:
        self.year = year
        self.taxi_type = taxi_type
    
    def location(self) -> list[str]:
        return str(Path(os.getcwd()).parent.joinpath(f"data/{self.taxi_type}"))

    def read_parquet_data(self, spark: SparkSession) -> DataFrame:
        # 파케이 파일 경로
        data: str = self.location()
        return spark.read.parquet(f"file:///{data}/*")




In [None]:
"""
[PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-02.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-03.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-04.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-05.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-06.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-07.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-08.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-09.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-10.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-11.parquet'),
 PosixPath('/Users/imhaneul/Documents/spark-kafka-distribute/sparkAnaliysis/data/2019/fhvhv_tripdata_2019-12.parquet')]
"""
from pyspark.sql.functions import col
from pyspark.sql import functions as F


In [None]:

data = FileLoadInParquet(2019, "YellowTaxi").read_parquet_data(spark)
# data = spark.read.parquet(f"file:///{data}/*")
data.take(1)

In [None]:
def datetime_groupby(data: DataFrame, name: str, agg_name: str) -> DataFrame:
    return (
        data.select(F.split(col(name), " ")[0].name("pickup"))
        .dropna()
        .groupBy("pickup")
        .agg(F.count("*").name(agg_name))
    )

def datetime_miles_average(data: DataFrame) -> DataFrame:
    return (data.select(F.split(col("pickup_datetime"), " ")[0].name("pickup"), col("trip_miles"))
        .groupBy("pickup") 
        .agg(
            F.count("pickup").name("pickup_total"), 
            F.avg("trip_miles").name("average_miles")
        ) 
    )

In [None]:

request_groupby = datetime_groupby(data, "request_datetime", "request_count")
trip_groupby = datetime_groupby(data, "pickup_datetime", "trip_count")
drop_groupby = datetime_groupby(data, "dropoff_datetime", "drop_count")
average_mile = datetime_miles_average(data)

rtd_join = (
    trip_groupby
    .join(request_groupby, on="pickup", how="left")
    .join(drop_groupby, on="pickup", how="left")
    .join(average_mile, on="pickup", how="left")
).orderBy("pickup")

week_day_rtd_join = (
    rtd_join.select(
        F.date_format(col("pickup"), "EEEE").alias("week"),
        F.dayofweek(col("pickup")).alias("week_number"),
        col("pickup"), 
        col("trip_count"), 
        col("request_count"), 
        col("drop_count"),
        col("average_miles")
    )
).toPandas()

In [None]:

import matplotlib.pyplot as plt 
import seaborn as sns

week_day_rtd_join.to_csv("test.csv", index=False)
group_date_average_mlies = week_day_rtd_join.groupby("week").average_miles.mean().to_frame().reset_index()
group_date_average_mlies["sort_dow"] = group_date_average_mlies["week"].replace({
    "Sunday": 0,
    "Monday": 1,
    "Tuesday": 2,
    "Wednesday": 3,
    "Thursday": 4,
    "Friday": 5,
    "Saturday": 6,  
})
group_date_average_mlies

In [None]:
group_date_trip = week_day_rtd_join.groupby("week").trip_count.mean().to_frame().reset_index()
group_date_trip["sort_dow"] = group_date_trip["week"].replace({
    "Sunday": 0,
    "Monday": 1,
    "Tuesday": 2,
    "Wednesday": 3,
    "Thursday": 4,
    "Friday": 5,
    "Saturday": 6,  
})
group_date_trip

In [None]:
# Create subplots
fig, ax = plt.subplots(2, 1, figsize=(20, 10), constrained_layout=True)

# Plot 2: Weekly average miles
sns.lineplot(x="week", y="average_miles", data=group_date_average_mlies.sort_values("sort_dow"), ax=ax[0], palette='husl', legend=False)
ax[0].tick_params(axis='x', rotation=45)
ax[0].set_xlabel("Week")
ax[0].set_ylabel("Average Miles")
ax[0].set_title("Weekly Average Miles (2019-02)")

# Plot 2: Weekly average miles
sns.barplot(x="week", y="trip_count", data=group_date_trip.sort_values("sort_dow"), ax=ax[1], palette='husl', legend=False)
ax[1].tick_params(axis='x', rotation=45)
ax[1].set_xlabel("Week")
ax[1].set_ylabel("Average Miles")
ax[1].set_title("Weekly Average Miles (2019-02)")
# Overall Title
plt.suptitle("Analysis of NYC Taxi Data (February 2019)", fontsize=16)

# Show plot
plt.show()

In [None]:
spark.stop()