In [None]:
from pyspark.sql import SparkSession

MAX_MEMORY = "5g"
spark = SparkSession.builder.master("local")\
                    .appName("ml_taix")\
                    .config("spark.excutor.memory", MAX_MEMORY)\
                    .config("spark.driver.memory", MAX_MEMORY).getOrCreate()

In [None]:
import os 

trip_data = f"{os.getcwd()}/data/2020"
trip_directory = spark.read.parquet(f"file:///{trip_data}/*")
trip_directory.createOrReplaceTempView("trips")

In [None]:
trip_directory.printSchema()

In [None]:
qs = """
SELECT
    pickup_datetime,
    PULocationID as pickup_location_id,
    DOLocationID as drop_location_id,
    tips,
    driver_pay,
    trip_miles,
    trip_time
FROM 
    trips
"""
spark.sql(qs).describe().show()
tr_data = spark.sql(qs)
tr_data.createOrReplaceTempView("trip_data")



In [None]:
qs = """
SELECT 
    pickup, 
    count(*) as time
FROM
    (SELECT 
        split(pickup_datetime, " ")[0] as pickup
    FROM 
        trips    
    )
GROUP BY 
    pickup
ORDER BY 
    pickup
"""
time_data = spark.sql(qs).toPandas()

import seaborn as sns 
import matplotlib.pyplot as plt 

fig, ax = plt.subplots(figsize=(100, 9))
sns.barplot(x="pickup", y="time", data=time_data)
plt.xticks(rotation=45)
plt.title("NYC Texi 2020-01-01 ~ 2021-01-01")
plt.show()

In [None]:
#trip_data = f"{os.getcwd()}/data/2020"
# trip_directory = spark.read.parquet(f"file:///{trip_data}/*")
# trip_directory.createOrReplaceTempView("trips")
from typing import List

directory: str = [f"{os.getcwd()}/data/{i}" for i in os.listdir(f"{os.getcwd()}/data")][2]
filename: List[str] = [f"{directory}/{data}" for data in os.listdir(directory)]
filename.sort()

In [None]:
filename

In [None]:
def year_read_data() -> List:
    return [spark.read.parquet(f"file:///{data}") for data in filename]

def year_data(table_name: str, qs: str) -> List[SparkSession]:
    t_pickup_dat = []
    for data in year_read_data():
        data.createOrReplaceTempView(table_name)
        td = spark.sql(qs).toPandas()
        t_pickup_dat.append(td)
    return t_pickup_dat

In [None]:
def barplot_3to4_visualization(x: str, y:str, data: List,
                        n_rows: int = 3, n_cols: int = 4) -> None:
    
    fig, ax = plt.subplots(n_rows, n_cols, figsize=(30, 10))
    plt.subplots_adjust(wspace = 0.4, hspace = 0.4)
    fig.set_size_inches((80, 20))

    for i, axi in enumerate(ax.flat):
        sns.barplot(x=x, y=y, data=data[i], ax=axi)
        axi.set_title(f'taxi 2020 --> date {i+1}')
        axi.set_xticklabels(axi.get_xticklabels(), rotation=30)

    # 데이터 플롯 출력
    plt.show()

In [None]:
qs_2020 = """
SELECT 
    date,
    count(*) as pickup
FROM
    (SELECT 
        split(pickup_datetime, " ")[0] as date
    FROM 
        trips    
    )
GROUP BY 
    date
ORDER BY 
    date
"""
data2020_all = year_data(table_name="trips", qs=qs_2020)
barplot_3to4_visualization(x="date", y="pickup", data=data2020_all)

In [None]:
qs_2020_3 = """
SELECT 
    DISTINCT(TO_DATE(pickup_datetime) as pickup_datetime,
    PULocationID as pickup_location_id,
    DOLocationID as drop_location_id,
    tips,
    driver_pay,
    trip_miles,
    trip_time
FROM 
    trips
WHERE 
    driver_pay > 0
    AND trip_miles > 0
    AND trip_time < 300
    AND tips < 10
"""
data2020_march = year_data(table_name="trips", qs=qs_2020_3)
barplot_3to4_visualization(x="pickup_datetime", y="tips", data=data2020_march)