### Dependencies

In [22]:
import os
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'

import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import from_utc_timestamp, to_utc_timestamp
import pyspark.pandas as ps
import io
import requests


### Display Options

## Spark Session

In [3]:
ps.options.display.max_rows = 10
# 공식 문서: compute.max_rows
# 1000행 이하라면 driver로 데이터를 가져와서 pandas API로 처리.
# 1000행 이상이면 pySpark로 처리
print(ps.options.compute.max_rows)

Picked up _JAVA_OPTIONS: -Xmx8g
Picked up _JAVA_OPTIONS: -Xmx8g
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 10:07:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


1000


## Spark Configuration

In [4]:
# spark.stop()
spark = SparkSession.builder \
        .appName("test") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.driver.memory", "6g") \
        .config("spark.executor.memory", "6g") \
        .config("spark.driver.maxResultSize", "4g") \
        .config("spark.sql.shuffle.partitions", "100") \
        .config('spark.driver.bindAddress', '127.0.0.1') \
        .getOrCreate()
        
# 1. 모든 설정 확인
all_configs = spark.sparkContext.getConf().getAll()
for key, value in all_configs:
    print(f"{key}: {value}")

# # 3. SQL 관련 설정만 확인
# sql_configs = spark.sql("SET -v").collect()
# for row in sql_configs:
#     print(f"{row['key']}: {row['value']}")

# # 4. Runtime에 설정된 값들 확인
# runtime_conf = spark.sparkContext._conf.getAll()
# for key, value in runtime_conf:
#     print(f"{key}: {value}")

spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.sql.warehouse.dir: file:/Users/admin/workspace/python/DE/spark-warehouse
spark.executor.id: driver
spark.dr

25/02/04 10:07:55 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Data Loading

In [15]:
# DataFrame으로 읽기
psdf = spark.read.parquet("materials/data_engineering_course_materials/missions/W4/tlc/yellow_tripdata_2024-02.parquet")
psdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [20]:
psdf = psdf.withColumn("tpep_pickup_datetime", 
                          to_utc_timestamp(from_utc_timestamp(psdf["tpep_pickup_datetime"], "America/New_York"), "America/New_York")) \
            .withColumn("tpep_dropoff_datetime", 
                          to_utc_timestamp(from_utc_timestamp(psdf["tpep_dropoff_datetime"], "America/New_York"), "America/New_York"))

psdf.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [25]:
psdf = ps.DataFrame(psdf)

In [28]:
psdf.info()

                                                                                

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 3007526 entries, 0 to 3007525
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               3007526 non-null  int32         
 1   tpep_pickup_datetime   3007526 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  3007526 non-null  datetime64[ns]
 3   passenger_count        2821916 non-null  int64         
 4   trip_distance          3007526 non-null  float64       
 5   RatecodeID             2821916 non-null  int64         
 6   store_and_fwd_flag     2821916 non-null  object        
 7   PULocationID           3007526 non-null  int32         
 8   DOLocationID           3007526 non-null  int32         
 9   payment_type           3007526 non-null  int64         
 10  fare_amount            3007526 non-null  float64       
 11  extra                  3007526 non-null  float64       
 12  mta_tax                30

25/02/04 10:43:15 WARN AttachDistributedSequenceExec: clean up cached RDD(62) in AttachDistributedSequenceExec(235)


## Data Cleaning

### Narrow Transformations

In [35]:
from datetime import datetime
start_date = datetime.strptime("2024-02-01", "%Y-%m-%d")
end_date = datetime.strptime("2024-03-01", "%Y-%m-%d")

query = f"""
    VendorID IS NOT NULL and
    tpep_pickup_datetime IS NOT NULL and
    tpep_pickup_datetime BETWEEN "{start_date}" and "{end_date}" and
    tpep_dropoff_datetime IS NOT NULL and
    tpep_dropoff_datetime BETWEEN "{start_date}" and "{end_date}" and
    passenger_count IS NOT NULL and
    passenger_count > 0 and
    trip_distance IS NOT NULL and
    trip_distance > 0 and
    RatecodeID IS NOT NULL and
    store_and_fwd_flag IS NOT NULL and
    PULocationID IS NOT NULL and
    DOLocationID IS NOT NULL and
    payment_type IS NOT NULL and
    fare_amount IS NOT NULL and
    fare_amount > 0 and
    extra IS NOT NULL and
    mta_tax IS NOT NULL and
    tip_amount IS NOT NULL and
    tolls_amount IS NOT NULL and
    improvement_surcharge IS NOT NULL and
    total_amount IS NOT NULL and
    total_amount > 0 and
    congestion_surcharge IS NOT NULL and
    Airport_fee IS NOT NULL
"""
filtered_psdf = psdf.query(query)

In [36]:
filtered_psdf.info()

                                                                                

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 2718661 entries, 0 to 2821915
Data columns (total 19 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               2718661 non-null  int32         
 1   tpep_pickup_datetime   2718661 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  2718661 non-null  datetime64[ns]
 3   passenger_count        2718661 non-null  int64         
 4   trip_distance          2718661 non-null  float64       
 5   RatecodeID             2718661 non-null  int64         
 6   store_and_fwd_flag     2718661 non-null  object        
 7   PULocationID           2718661 non-null  int32         
 8   DOLocationID           2718661 non-null  int32         
 9   payment_type           2718661 non-null  int64         
 10  fare_amount            2718661 non-null  float64       
 11  extra                  2718661 non-null  float64       
 12  mta_tax                27

25/02/04 14:27:17 WARN AttachDistributedSequenceExec: clean up cached RDD(133) in AttachDistributedSequenceExec(574)
                                                                                

In [65]:
# 놓친 것이 있는지 확인
query = f"""
    RatecodeID < 0 or
    PULocationID < 0 or
    DOLocationID < 0 or
    payment_type < 0 or
    fare_amount < 0 or
    extra < 0 or
    mta_tax < 0 or
    tip_amount < 0 or
    tolls_amount < 0 or
    improvement_surcharge < 0 or
    total_amount < 0 or
    congestion_surcharge < 0 or
    Airport_fee < 0
"""
check_psdf = filtered_psdf.query(query)
check_psdf.head()

                                                                                

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,estimated_cost_amount


In [64]:
query = """
    RatecodeID >= 0 and
    PULocationID >= 0 and
    DOLocationID >= 0 and
    payment_type >= 0 and
    fare_amount >= 0 and
    extra >= 0 and
    mta_tax >= 0 and
    tip_amount >= 0 and
    tolls_amount >= 0 and
    improvement_surcharge >= 0 and
    total_amount >= 0 and
    congestion_surcharge >= 0 and
    Airport_fee >= 0
"""
filtered_psdf = filtered_psdf.query(query)

In [66]:
# 재확인
query = f"""
    RatecodeID < 0 or
    PULocationID < 0 or
    DOLocationID < 0 or
    payment_type < 0 or
    fare_amount < 0 or
    extra < 0 or
    mta_tax < 0 or
    tip_amount < 0 or
    tolls_amount < 0 or
    improvement_surcharge < 0 or
    total_amount < 0 or
    congestion_surcharge < 0 or
    Airport_fee < 0
"""
check_psdf = filtered_psdf.query(query)
check_psdf.head()

                                                                                

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,estimated_cost_amount


### 비현실적인 거리 찾기

In [81]:
# filtered_psdf['trip_distance'].quantile(0.999)
filtered_psdf.to_spark().createOrReplaceTempView("trips")

# 구간 나눠 SQL로 처리
sql = """
    WITH trip_distance_outliers AS (
        SELECT
            CASE
                WHEN trip_distance <= 10 THEN "0-10 miles"
                WHEN trip_distance <= 100 THEN "10-100 miles"
                WHEN trip_distance <= 1000 THEN "100-1000 miles"
                WHEN trip_distance <= 10000 THEN "1000-10000 miles"
                ELSE "10000+ miles"
            END AS trip_distance_group
        FROM trips
    )

    SELECT
        trip_distance_group,
        COUNT(*) AS trip_count
    FROM trip_distance_outliers
    GROUP BY trip_distance_group        
    ORDER BY 
        CASE trip_distance_group
                WHEN "0-10 miles" THEN 1
                WHEN "10-100 miles" THEN 2
                WHEN "100-1000 miles" THEN 3
                WHEN "1000-10000 miles" THEN 4
                ELSE 5
            END DESC
"""

result = ps.sql(sql)
result



<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 5 entries, 0 to 4
Data columns (total 2 columns):
 #   Column               Non-Null Count  Dtype 
---  ------               --------------  ----- 
 0   trip_distance_group  5 non-null      object
 1   trip_count           5 non-null      int64 
dtypes: int64(1), object(1)

                                                                                

In [90]:
filtered_psdf['trip_distance'].describe()

                                                                                

count    2.718641e+06
mean     3.217765e+00
std      4.284442e+00
min      1.000000e-02
25%      1.010000e+00
50%      1.700000e+00
75%      3.100000e+00
max      3.825000e+02
Name: trip_distance, dtype: float64

In [None]:
filtered_psdf[filtered_psdf['trip_distance'] > 100].to_pandas().head(100)

                                                                                

In [87]:
print("filtered_psdf.shape", filtered_psdf.shape)
filtered_psdf = filtered_psdf.query(
    """
    trip_distance <= 100
    or trip_distance > 100 and fare_amount > 200
    """
)
print("filtered_psdf.shape", filtered_psdf.shape)

                                                                                

filtered_psdf.shape (2718659, 20)




filtered_psdf.shape (2718641, 20)


                                                                                

## Transformation Logic:


In [94]:
# 총 매출
tot_revenue = filtered_psdf['total_amount'].sum()
print(f"Total revenue: {tot_revenue}")

# 날짜별 매출
filtered_psdf.to_spark().createOrReplaceTempView("trips")
sql = """
    WITH pickup_
    SELECT
        DATE(tpep_pickup_datetime) AS pickup_date,
        total_amount
    FROM trips
    GROUP BY pickup_date
        
        """



Total revenue: 74430725.46007907


                                                                                

## Aggregate

In [79]:
stats = filtered_rdd.aggregate(
    (0, 0.0, 0), # 초기값 (trip 수, 총 매출, 평균 거리)
    # combine value
    # # 각 partition에서 수행할 연산 (map)
    lambda acc, x: (
        acc[0] +1, 
        acc[1] + x['total_amount'],
        acc[2] + x['trip_distance'],
        ),
    # 각 partition 결과를 합치는 연산 (combine combiners)
    lambda acc1, acc2: ( # 
        acc1[0] + acc2[0], 
        acc1[1] + acc2[1],
        acc1[2] + acc2[2],        
        ),     
)

print(f"Total trips: {stats[0]}")
print(f"Total revenue: {stats[1]}")
print(f"Average distance: {stats[2] / stats[0]}")




Total trips: 2718651
Total revenue: 74431279.74007908
Average distance: 3.419221959714512


                                                                                

In [99]:
# 1. map, reduceByKey, sortByKey 사용
# number of trips per day
daily_trips = filtered_rdd.map(
    lambda x: (x['tpep_pickup_datetime'].date(), 1)
).reduceByKey(lambda x, y: x + y).sortByKey()

for date, count in daily_trips.collect():
    print(f"{date}: {count}")

# 2. groupByKey 사용 (누적하는 것이 아닌 한 번에 메모리에 올리기 때문에 메모리 부족할 수 있음)
# 2m 30s
# number of trips per day
# daily_trips = filtered_rdd.keyBy(
#     lambda x: x['tpep_pickup_datetime'].strftime('%Y-%m-%d')
# ).groupByKey().mapValues(
#     lambda x: len(list(x))
# ).collect()

# for date, count in daily_trips:
#     print(f"{date}: {count}")

# 3. Join도 가능하지만 Inner 값이기에 비효율적

                                                                                

2024-02-01: 102070
2024-02-02: 98021
2024-02-03: 100990
2024-02-04: 80983
2024-02-05: 82904
2024-02-06: 95328
2024-02-07: 95906
2024-02-08: 105306
2024-02-09: 101060
2024-02-10: 102715
2024-02-11: 85507
2024-02-12: 85609
2024-02-13: 64354
2024-02-14: 108629
2024-02-15: 107069
2024-02-16: 94776
2024-02-17: 91075
2024-02-18: 80718
2024-02-19: 70098
2024-02-20: 90266
2024-02-21: 96718
2024-02-22: 105507
2024-02-23: 94591
2024-02-24: 103426
2024-02-25: 82841
2024-02-26: 79291
2024-02-27: 100885
2024-02-28: 100272
2024-02-29: 111736


In [100]:
# Total revenue per day
daily_revenue = filtered_rdd.map(
    lambda x: (x['tpep_pickup_datetime'].date(), x['total_amount'])
).reduceByKey(lambda x, y: x + y).sortByKey()

for date, revenue in daily_revenue.collect():
    print(f"{date}: {revenue}")

                                                                                

2024-02-01: 2824015.919999942
2024-02-02: 2620733.9299999326
2024-02-03: 2491385.499999978
2024-02-04: 2237400.2499998766
2024-02-05: 2316529.6799999434
2024-02-06: 2580443.3999999897
2024-02-07: 2696189.720000007
2024-02-08: 2960078.48999986
2024-02-09: 2804011.5299998904
2024-02-10: 2608000.2499999367
2024-02-11: 2330352.6399999503
2024-02-12: 2400751.849999933
2024-02-13: 1637187.279999944
2024-02-14: 2909888.909999908
2024-02-15: 2928848.609999918
2024-02-16: 2679060.249999922
2024-02-17: 2312374.7899999036
2024-02-18: 2187241.4499999024
2024-02-19: 2080471.0499999125
2024-02-20: 2546112.629999959
2024-02-21: 2635096.6199999717
2024-02-22: 2952750.1499999287
2024-02-23: 2614497.559999943
2024-02-24: 2626553.7999999304
2024-02-25: 2336178.379999912
2024-02-26: 2398380.339999892
2024-02-27: 2768085.449999888
2024-02-28: 2819418.339999884
2024-02-29: 3129240.9699998614


## Join daily data

In [118]:
daily_trips_revenue = daily_trips.join(daily_revenue).map(
    lambda x: (x[0], x[1][0], x[1][1])
)
daily_trips_revenue.collect()

[(datetime.date(2024, 2, 4), 80983, 2237400.2499998766),
 (datetime.date(2024, 2, 13), 64354, 1637187.279999944),
 (datetime.date(2024, 2, 17), 91075, 2312374.7899999036),
 (datetime.date(2024, 2, 21), 96718, 2635096.6199999717),
 (datetime.date(2024, 2, 1), 102070, 2824015.919999942),
 (datetime.date(2024, 2, 8), 105306, 2960078.48999986),
 (datetime.date(2024, 2, 11), 85507, 2330352.6399999503),
 (datetime.date(2024, 2, 14), 108629, 2909888.909999908),
 (datetime.date(2024, 2, 16), 94776, 2679060.249999922),
 (datetime.date(2024, 2, 19), 70098, 2080471.0499999125),
 (datetime.date(2024, 2, 9), 101060, 2804011.5299998904),
 (datetime.date(2024, 2, 24), 103426, 2626553.7999999304),
 (datetime.date(2024, 2, 26), 79291, 2398380.339999892),
 (datetime.date(2024, 2, 6), 95328, 2580443.3999999897),
 (datetime.date(2024, 2, 15), 107069, 2928848.609999918),
 (datetime.date(2024, 2, 28), 100272, 2819418.339999884),
 (datetime.date(2024, 2, 18), 80718, 2187241.4499999024),
 (datetime.date(2024,

## Save Results

In [117]:
import os

base_dir = 'materials/data_engineering_course_materials/missions/W5/results/'
os.makedirs(base_dir, exist_ok=True)

# 기본적으로 partition으로 저장됨

# Save results as CSV
daily_trips_revenue = daily_trips_revenue.toDF(['date', 'count', 'revenue'])
# 파티션 1개로 저장하기
daily_trips_revenue.coalesce(1).write.csv(f'{base_dir}/daily_trips_revenues.csv', header=True, mode="overwrite")

# Save results as Parquet
stats_df = spark.createDataFrame([stats], ['total_trips', 'total_revenue', 'avg_distance'])
stats_df.coalesce(1).write.parquet(f'{base_dir}/stats.parquet', mode="overwrite")

                                                                                