In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max, sum, to_date

# Spark Session 시작하기

In [31]:
spark = SparkSession.builder.remote("sc://localhost").appName("W5M1").getOrCreate()

# NYC 택시 데이터 가져오기

In [32]:
taxi_trip_df = spark.read.parquet('/Users/admin/Projects/HMG_Softeer_DE/missions/W5/spark-softeer/userdata/NYC-TLC/yellow_tripdata_2024-05.parquet')
taxi_trip_df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-05-01 00:59:15|  2024-05-01 01:23:50|              1|          6.1|         1|                 N|         138|         145|           1|       28.2| 7.75|    0.5|       5.

In [33]:
taxi_trip_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



# 데이터 전처리
## 결측치 제거

In [34]:
numerical_columns = ['passenger_count', 'trip_distance', 'fare_amount', 
                     'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
                     'improvement_surcharge', 'total_amount', 'congestion_surcharge', 
                     'Airport_fee']
minmax_columns = []

for column_name in numerical_columns:
    minmax_columns.append(min(column_name))
    minmax_columns.append(max(column_name))

In [35]:
taxi_trip_df.select(*minmax_columns).show()

+--------------------+--------------------+------------------+------------------+----------------+----------------+----------+----------+------------+------------+---------------+---------------+-----------------+-----------------+--------------------------+--------------------------+-----------------+-----------------+-------------------------+-------------------------+----------------+----------------+
|min(passenger_count)|max(passenger_count)|min(trip_distance)|max(trip_distance)|min(fare_amount)|max(fare_amount)|min(extra)|max(extra)|min(mta_tax)|max(mta_tax)|min(tip_amount)|max(tip_amount)|min(tolls_amount)|max(tolls_amount)|min(improvement_surcharge)|max(improvement_surcharge)|min(total_amount)|max(total_amount)|min(congestion_surcharge)|max(congestion_surcharge)|min(Airport_fee)|max(Airport_fee)|
+--------------------+--------------------+------------------+------------------+----------------+----------------+----------+----------+------------+------------+---------------+-----

In [36]:
taxi_trip_df = (taxi_trip_df.filter(col('fare_amount') > 0)
                            .filter(col('extra') > 0)
                            .filter(col('mta_tax') > 0)
                            .filter(col('tip_amount') > 0)
                            .filter(col('tolls_amount') > 0)
                            .filter(col('improvement_surcharge') > 0)
                            .filter(col('total_amount') > 0)
                            .filter(col('congestion_surcharge') > 0)
                            .filter(col('Airport_fee') > 0)
                            .filter(col('trip_distance') > 0)
                )

In [37]:
taxi_trip_df.select(*minmax_columns).show()

+--------------------+--------------------+------------------+------------------+----------------+----------------+----------+----------+------------+------------+---------------+---------------+-----------------+-----------------+--------------------------+--------------------------+-----------------+-----------------+-------------------------+-------------------------+----------------+----------------+
|min(passenger_count)|max(passenger_count)|min(trip_distance)|max(trip_distance)|min(fare_amount)|max(fare_amount)|min(extra)|max(extra)|min(mta_tax)|max(mta_tax)|min(tip_amount)|max(tip_amount)|min(tolls_amount)|max(tolls_amount)|min(improvement_surcharge)|max(improvement_surcharge)|min(total_amount)|max(total_amount)|min(congestion_surcharge)|max(congestion_surcharge)|min(Airport_fee)|max(Airport_fee)|
+--------------------+--------------------+------------------+------------------+----------------+----------------+----------+----------+------------+------------+---------------+-----

## 필요한 columns만 고르기

In [38]:
taxi_trip_df = taxi_trip_df.select('tpep_pickup_datetime', 'trip_distance', 'total_amount')
taxi_trip_df.show(5)

+--------------------+-------------+------------+
|tpep_pickup_datetime|trip_distance|total_amount|
+--------------------+-------------+------------+
| 2024-05-01 00:57:17|         9.02|       65.16|
| 2024-05-01 00:14:05|         8.53|       59.31|
| 2024-05-01 00:03:08|         27.5|      107.49|
| 2024-05-01 00:44:01|         11.0|       74.74|
| 2024-05-01 00:14:19|         8.32|       59.79|
+--------------------+-------------+------------+
only showing top 5 rows



## 날짜 기준으로 그룹화

In [39]:
taxi_trip_group_by_date_df = taxi_trip_df.withColumn('tpep_pickup_date', to_date(col('tpep_pickup_datetime'))).groupBy('tpep_pickup_date')

# Aggregation

In [41]:
taxi_trip_df.count()

84378

In [44]:
taxi_trip_df.select(sum(col('total_amount'))).show()

+-----------------+
|sum(total_amount)|
+-----------------+
| 6862441.78999949|
+-----------------+



In [45]:
num_of_trips = taxi_trip_df.count()
total_trip_distance = taxi_trip_df.select(sum(col('trip_distance'))).collect()[0]['sum(trip_distance)']

print('average of trip distance: ', total_trip_distance / num_of_trips, 'miles')

average of trip distance:  11.696007727132685 miles


In [48]:
count_by_date_df = taxi_trip_group_by_date_df.count().orderBy('tpep_pickup_date')
count_by_date_df.show(31)

+----------------+-----+
|tpep_pickup_date|count|
+----------------+-----+
|      2024-05-01| 2874|
|      2024-05-02| 3058|
|      2024-05-03| 3015|
|      2024-05-04| 1431|
|      2024-05-05| 2632|
|      2024-05-06| 3575|
|      2024-05-07| 3043|
|      2024-05-08| 3203|
|      2024-05-09| 3242|
|      2024-05-10| 2898|
|      2024-05-11| 1297|
|      2024-05-12| 2587|
|      2024-05-13| 3963|
|      2024-05-14| 3465|
|      2024-05-15| 3303|
|      2024-05-16| 3496|
|      2024-05-17| 2844|
|      2024-05-18| 1368|
|      2024-05-19| 2760|
|      2024-05-20| 3423|
|      2024-05-21| 2947|
|      2024-05-22| 2845|
|      2024-05-23| 2615|
|      2024-05-24| 2576|
|      2024-05-25| 1060|
|      2024-05-26| 1232|
|      2024-05-27| 1873|
|      2024-05-28| 3492|
|      2024-05-29| 2801|
|      2024-05-30| 2947|
|      2024-05-31| 2513|
+----------------+-----+



In [53]:
total_amount_by_date_df = (taxi_trip_group_by_date_df.sum()
                                                     .select('tpep_pickup_date', 'sum(total_amount)')
                                                     .withColumnRenamed('sum(total_amount)', 'total_amount_by_date')
                                                     .orderBy('tpep_pickup_date'))
total_amount_by_date_df.show(31)

+----------------+--------------------+
|tpep_pickup_date|total_amount_by_date|
+----------------+--------------------+
|      2024-05-01|   234102.3800000013|
|      2024-05-02|  253174.17000000217|
|      2024-05-03|  245122.76000000146|
|      2024-05-04|  108908.40000000017|
|      2024-05-05|  201508.48000000062|
|      2024-05-06|  292839.31000000227|
|      2024-05-07|  251233.74000000185|
|      2024-05-08|   265208.1300000018|
|      2024-05-09|  266810.56000000174|
|      2024-05-10|  242295.62000000122|
|      2024-05-11|   100074.6800000003|
|      2024-05-12|   191414.2600000006|
|      2024-05-13|  321041.43000000203|
|      2024-05-14|  290568.03000000113|
|      2024-05-15|   276198.3200000021|
|      2024-05-16|   293009.5300000017|
|      2024-05-17|  239929.89000000103|
|      2024-05-18|  108224.19000000009|
|      2024-05-19|  208423.63999999972|
|      2024-05-20|    281833.060000002|
|      2024-05-21|  246523.13000000146|
|      2024-05-22|  233623.27000000144|


# 결과 저장하기

In [55]:
count_by_date_df.write.csv('/Users/admin/Projects/HMG_Softeer_DE/missions/W5/NYC-taxi-count_by_date', header=True)
total_amount_by_date_df.write.csv('/Users/admin/Projects/HMG_Softeer_DE/missions/W5/NYC-taxi-total_amount_by_date', header=True)

In [56]:
spark.stop()