# YellowTaxi ALS

1. 필요한 라이브러리 불러오기 
2. 데이터 준비하기 
    - 2.1 데이터 파일 불러오기
    - 2.2 데이터 전처리
    - 2.3 컬럼 변경 if needed
3. 데이터 분할
4. ALS 모델 생성 및 설정
5. 예측
6. 평가
7. 추천

# 목표 정의 

- ALS 적용시 요소 준비 - 어떠한 컬럼을 추출하고, 어떠한 것을 추천받고 싶은지?
- 내가 택시 기사라면, 회사라면, 사용자라면? 

1. 택시 승하차 지역에 대한 예상 요금 예측 추천 
- 사용자(user): PULocationID
- 아이템(item): DOLocationID
- 평점(rating): total_amount 

# 1. 필요한 라이브러리 불러오기

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("241213_Yellowtaxi_MLlib").getOrCreate()
spark

24/12/16 11:18:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.sql.functions import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType

# 2. 데이터 준비하기 

## 2.1 데이터 불러오기 

In [3]:
import os

trip_files = '/trips/*'
zone_file = 'taxi+_zone_lookup.csv'
directory = os.path.join(os.getcwd(), 'data')

In [4]:
trips_df = spark.read.csv(f'file:///{directory}/{trip_files}', inferSchema=True, header=True)
zone_df = spark.read.csv(f'file:///{directory}/{zone_file}', inferSchema=True, header=True)

                                                                                

In [5]:
trips_df.printSchema()
zone_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [6]:
trips_df.show(3)
zone_df.show(3)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       2| 2021-03-01 00:22:02|  2021-03-01 00:23:22|              1|          0.0|         1|                 N|         264|         264|           2|        3.0|  0.5|    0.5|       0.0|         0.0|                  0.3

## 2.2 데이터 전처리

In [7]:
# 필요한 컬럼만 추출 

trips_df = trips_df.select(["PULocationID", "DOLocationID", "total_amount"])
trips_df.show()

+------------+------------+------------+
|PULocationID|DOLocationID|total_amount|
+------------+------------+------------+
|         264|         264|         4.3|
|         152|         152|         3.8|
|         152|         152|         4.8|
|         138|         265|       70.07|
|          68|         264|       11.16|
|         239|         262|       18.59|
|         186|          91|        43.8|
|         132|         265|        32.3|
|         138|         141|       43.67|
|         138|          50|        46.1|
|         132|         123|        45.3|
|         140|           7|        19.3|
|         239|         238|        14.8|
|         116|          41|        12.8|
|          74|          41|         5.3|
|         239|         144|        17.3|
|         132|          91|       47.25|
|         239|          50|        12.8|
|         132|         230|       61.42|
|         229|          48|       14.16|
+------------+------------+------------+
only showing top

In [8]:
# null 값 체크
null_counts = trips_df.select(
    [
    sum(when(col(c).isNull() | isnan(c),1).otherwise(0)).alias(c) for c in trips_df.columns
    ]
)

null_counts.show()

#trips_df = trips_df.na.drop()



+------------+------------+------------+
|PULocationID|DOLocationID|total_amount|
+------------+------------+------------+
|           0|           0|           0|
+------------+------------+------------+



                                                                                

In [9]:
# total_amount 마이너스 값이 있나 확인 -- 48820 개? 
negative_amount = trips_df.filter(col("total_amount") < 0)
negative_amount.describe().show()



+-------+------------------+------------------+-------------------+
|summary|      PULocationID|      DOLocationID|       total_amount|
+-------+------------------+------------------+-------------------+
|  count|             68820|             68820|              68820|
|   mean|163.31117407730312|161.17672188317349|-14.490561464692082|
| stddev| 66.81615783677348| 70.65807406291414| 17.161514139828355|
|    min|                 1|                 1|             -647.8|
|    max|               265|               265|               -0.3|
+-------+------------------+------------------+-------------------+



                                                                                

In [10]:
# 마이너스인 금액을 0으로 대체
trips_df = trips_df.withColumn(\
                    "total_amount", \
                    when(col("total_amount")<0, 0)\
                    .otherwise(col("total_amount")))
trips_df.describe().show()



+-------+------------------+------------------+------------------+
|summary|      PULocationID|      DOLocationID|      total_amount|
+-------+------------------+------------------+------------------+
|  count|          15000700|          15000700|          15000700|
|   mean|165.42381935509675|162.36693754291466|18.821931650708898|
| stddev| 66.98886568099608| 71.19726238663675|145.72773384755425|
|    min|                 1|                 1|               0.0|
|    max|               265|               265|          398469.2|
+-------+------------------+------------------+------------------+



                                                                                

In [11]:
# # 이상치 처리? 
# from pyspark.sql.functions import col

# # IQR 방식으로 이상치 제거
# quantiles = trips_df.approxQuantile("total_amount", [0.25, 0.75], 0.05)
# Q1, Q3 = quantiles[0], quantiles[1]
# IQR = Q3 - Q1
# lower_bound = Q1 - 1.5 * IQR
# upper_bound = Q3 + 1.5 * IQR

# # 이상치 데이터 제거
# cleaned_trips_df = trips_df.filter((col("total_amount") >= lower_bound) & (col("total_amount") <= upper_bound))


## 2.3 컬럼 변경

In [12]:
trips_df = trips_df.withColumnRenamed("PULocationID", "userId") \
                   .withColumnRenamed("DOLocationID", "itemId") \
                   .withColumnRenamed("total_amount", "rating")
trips_df.show(5)

+------+------+------+
|userId|itemId|rating|
+------+------+------+
|   264|   264|   4.3|
|   152|   152|   3.8|
|   152|   152|   4.8|
|   138|   265| 70.07|
|    68|   264| 11.16|
+------+------+------+
only showing top 5 rows



# 3. 데이터 분할

In [13]:
train_ratio = 0.8
test_ratio = 0.2

train_df,test_df =trips_df.randomSplit([train_ratio, test_ratio], seed=42)

# 4. ALS 모델 생성 및 설정

In [14]:
# ALS 모델 생성
als = ALS(
    maxIter=5,
    regParam=0.1,
    userCol="userId",
    itemCol="itemId",
    ratingCol="rating",
    coldStartStrategy='drop' # 결측값 방지
)

als_model =als.fit(trips_df)

24/12/16 11:20:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
24/12/16 11:20:24 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
24/12/16 11:20:25 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
24/12/16 11:20:25 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

# 5. 예측

In [15]:
# 예측
predictions = als_model.transform(test_df)
predictions.show(5)



+------+------+------+----------+
|userId|itemId|rating|prediction|
+------+------+------+----------+
|   148|   148|   0.0| 12.024168|
|   148|   148|   0.0| 12.024168|
|   148|   148|   0.3| 12.024168|
|   148|   148|   5.8| 12.024168|
|   148|   148|   5.8| 12.024168|
+------+------+------+----------+
only showing top 5 rows



                                                                                

In [16]:
predictions.select("rating","prediction").describe().show()



+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           3000671|           3000671|
|   mean|18.775539527664378|18.797164819267316|
| stddev|14.806227867984497|13.485676661022245|
|    min|               0.0|        -0.5454178|
|    max|           7661.28|          1876.989|
+-------+------------------+------------------+





# 6. 평가

In [17]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [18]:
rmse = evaluator.evaluate(predictions)
rmse

                                                                                

10.088585455658048

### 평가 값의 해석:
- 실제 값과 평균적으로 약 10.09 만큼 차이가 난다는 의미
- RMSE 가 낮을수록 모델이 잘 예측한다고 평가 -> 실제 값 사이에 상당한 차이가 있음 ;;
- 고려사항: 모델 성능이 좋지 않음 -> 전처리 개선(이상치 처리, 정규화 등)이나 하이퍼파라미터 튜닝 

# 7. 추천 생성

In [19]:
# 5. 추천 생성 (예: 승차 지역에 대한 하차 지역 추천)
user_df = trips_df.select("userId").distinct() # 사용자 데이터 (승차 지역)
recommendations = als_model.recommendForUserSubset(user_df, 10) # 각 사용자에 대해 상위 3개의 하차 지역 추천 

In [20]:
# 추천 결과 출력
recommendations.show(truncate=True)

                                                                                

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[{1, 88.199486}, ...|
|   243|[{44, 145.48804},...|
|    31|[{84, 144.77588},...|
|    85|[{1, 88.69849}, {...|
|   137|[{84, 96.748985},...|
|   251|[{109, 288.4514},...|
|    65|[{99, 97.20358}, ...|
|    53|[{189, 219.95505}...|
|   255|[{99, 112.070755}...|
|   133|[{1, 96.08605}, {...|
|    78|[{84, 133.27359},...|
|   108|[{1, 95.205154}, ...|
|   155|[{1, 108.899796},...|
|    34|[{84, 95.48577}, ...|
|   193|[{99, 133.82979},...|
|   211|[{84, 89.8666}, {...|
|   101|[{44, 132.96501},...|
|   115|[{109, 114.85258}...|
|   126|[{84, 121.04209},...|
|    81|[{189, 281.12207}...|
+------+--------------------+
only showing top 20 rows



In [None]:
location_list = recommendations.collect()[0].recommendations
location_list



In [None]:
rec_df = spark.createDataFrame(location_list)
rec_df.show()

## df join

In [None]:
# 임시 뷰로 등록
rec_df.createOrReplaceTempView('recommend')
zone_df.createOrReplaceTempView('zone')

In [None]:
rec_df.show(3)
zone_df.show(3)

In [None]:
query = '''
SELECT 
    pz.Zone AS pickup_zone,
    dz.Zone AS dropoff_zone,
    t.rating

FROM recommend t

LEFT JOIN zone pz ON t.itemId = pz.LocationID  -- 승차 지역(PULocationID) 조인
LEFT JOIN zone dz ON t.itemId = dz.LocationID  -- 하차 지역(DOLocationID) 조인
'''
recommended = spark.sql(query)
recommended.show()

In [None]:
spark.stop()