In [2]:
spark.stop()

NameError: name 'spark' is not defined

In [1]:
from pyspark.sql import SparkSession  #객체 생성
MAX_MEMORY= '8g'
spark= SparkSession.builder.appName("taxi-fare-prediction_2nd")\
            .config('spark.driver.memory', MAX_MEMORY)\
            .config('spark.executor.memory', MAX_MEMORY)\
            .getOrCreate()

In [2]:
import os
cwd=os.getcwd()
trip_data_path=os.path.join(cwd, 'learning_spark_data', 'trips', '*.csv') #learning_spark_data/trips 폴더 아래의 모든 .csv 파일 경로를 만듦
trip_data_path

'/home/jovyan/work/learning_spark_data/trips/*.csv'

In [3]:
file_path=f"file:///{trip_data_path.replace(os.sep,'/')}"
file_path

'file:////home/jovyan/work/learning_spark_data/trips/*.csv'

In [4]:
trip_df=spark.read.csv(file_path,inferSchema=True, header=True) 
trip_df.printSchema()

#inferSchema=True: 데이터 타입을 자동으로 추론
#header=True: 첫 번째 줄을 컬럼명으로 인식
#printSchema(): 읽어온 DataFrame의 컬럼명, 타입 구조를 출력


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [5]:
trip_df.createOrReplaceTempView('trips')
#임시 테이블 trips에서 필요한 컬럼만 추출 후 필터링
query = """
SELECT 
    passenger_count,
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_distance,
    HOUR(tpep_pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(tpep_pickup_datetime), 'EEEE') AS day_of_week,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
"""


In [6]:
data_df = spark.sql(query)
data_df.show(5)

+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|passenger_count|pickup_location_id|dropoff_location_id|trip_distance|pickup_time|day_of_week|total_amount|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
|              0|               138|                265|         16.5|          0|     Monday|       70.07|
|              1|                68|                264|         1.13|          0|     Monday|       11.16|
|              1|               239|                262|         2.68|          0|     Monday|       18.59|
|              1|               186|                 91|         12.4|          0|     Monday|        43.8|
|              2|               132|                265|          9.7|          0|     Monday|        32.3|
+---------------+------------------+-------------------+-------------+-----------+-----------+------------+
only showing top 5 rows



In [7]:
#train, test split 8:2, seed=1 
train_df,test_df=data_df.randomSplit([0.8,0.2],seed=1)

## 파이프라인 생성
- 전처리 과정을 각 스테이지로 정의해서 쌓는다
- 범주형: StringIndexer+원핫인코딩 'pickup_location','dropoff_location_id','day_of_week'
- 수치형: StandardScaler : 'passenger_count', 'trip_distance', 'pickup_time'

In [8]:
stages=[] #파이프라인용 stages 리스트

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

cat_features = ['pickup_location_id', 'dropoff_location_id', 'day_of_week'] #모델에 바로 못넣는 컬럼
for cat in cat_features:
    cat_index = StringIndexer(inputCol=cat, outputCol=cat+'_idx').setHandleInvalid('keep') #stringindexer로 숫자 인코딩
    onehot_encode = OneHotEncoder(inputCols= [cat_index.getOutputCol()], #_idx col    원한인코더로 벡터화(ML용)
                                  outputCols=[cat+'_onehot'] #postfix
                                 )
    stages += [cat_index, onehot_encode ] #collist
stages

#setHandleInvalid('keep')
#  →만약 데이터에 train/test split 등으로 못 본 카테고리가 들어와도
#에러 없이 처리(새 인덱스 할당)

#처음에 stringindexer로 문자에서 숫자로 변환해주고
# 변환된 숫자만 있으면 0,1,2등으로만 보면 순서(서열)가 헷갈려 각 카테고리별로 '독립된 열'을 만들어 희소벡터로 변환함.

[StringIndexer_98f964e8cd58,
 OneHotEncoder_df638b420824,
 StringIndexer_a1196aee1671,
 OneHotEncoder_aab865d287b6,
 StringIndexer_97db36c96319,
 OneHotEncoder_cd309e3acd7f]

In [12]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
num_features= ['passenger_count','trip_distance', 'pickup_time'] #정규화(스케일링)할 수치형 변수 리스트

for num in num_features:
    num_assembler=VectorAssembler(inputCols=[num],outputCol=num+'_vector') #1차원 벡터컬럼
    num_scaler=StandardScaler(inputCol=num_assembler.getOutputCol(),outputCol=num+'_scaled') #위 벡터 데이터 정규화
    stages+=[num_assembler, num_scaler] #파이프라인에 추가

stages

#pyspark의 standardScaler는 반드시 벡터컬럼으로 입력받아야함
#예: 3->[3.0]
#전처리된 모든 입력을 feature 벡터로 통합

'''
<통합 전 데이터 프레임>
trip_distance	passenger_count	pickup_location_id_onehot	day_of_week_onehot
3.0	2	[0, 0, 1, 0, ...]	[1, 0, 0, 0, 0, 0, 0]
5.2	1	[0, 1, 0, 0, ...]	[0, 1, 0, 0, 0, 0, 0]
'''
'''
<통합 후 프레임>
features
[3.0, 2, 0, 0, 1, 0, ..., 1, 0, 0, 0, 0, 0, 0]
[5.2, 1, 0, 1, 0, 0, ..., 0, 1, 0, 0, 0, 0, 0]
'''








[StringIndexer_98f964e8cd58,
 OneHotEncoder_df638b420824,
 StringIndexer_a1196aee1671,
 OneHotEncoder_aab865d287b6,
 StringIndexer_97db36c96319,
 OneHotEncoder_cd309e3acd7f,
 VectorAssembler_4024ce8af10d,
 StandardScaler_7a67f0eb5a13,
 VectorAssembler_88998bfdd304,
 StandardScaler_ab264c9e7bd2,
 VectorAssembler_a5f9de43c1f0,
 StandardScaler_6f281e523d36]

In [13]:
assembler_input = [cat+'_onehot' for cat in cat_features] + [num+'_scaled' for num in num_features]
assembler_input

#cat_features 원핫인코딩 결과 컬럼과 num_features 스케일링 결과 컬럼을 한번에 묶어서 합친 컬럼명 준비

['pickup_location_id_onehot',
 'dropoff_location_id_onehot',
 'day_of_week_onehot',
 'passenger_count_scaled',
 'trip_distance_scaled',
 'pickup_time_scaled']

In [14]:
assembler=VectorAssembler(inputCols=assembler_input, outputCol='feature_vector')
stages+=[assembler]
stages

#위에서 만든 모든 피처(벡터,스케일링,원핫인코딩)들을 하나의 벡터 컬럼으로 합쳐서 stages 리스트에 추가

[StringIndexer_98f964e8cd58,
 OneHotEncoder_df638b420824,
 StringIndexer_a1196aee1671,
 OneHotEncoder_aab865d287b6,
 StringIndexer_97db36c96319,
 OneHotEncoder_cd309e3acd7f,
 VectorAssembler_4024ce8af10d,
 StandardScaler_7a67f0eb5a13,
 VectorAssembler_88998bfdd304,
 StandardScaler_ab264c9e7bd2,
 VectorAssembler_a5f9de43c1f0,
 StandardScaler_6f281e523d36,
 VectorAssembler_e9b042512570]

In [15]:
from pyspark.ml import Pipeline

pipeline=Pipeline(stages=stages) #파이프라인 객체에 지금까지 쌓은 모든 전처리 단계를 넣기
fitted_transform=pipeline.fit(train_df) #한번에 학습, 학습된(인코딩 등) 파이프라인 모델
vtrain_df=fitted_transform.transform(train_df) #학습된 모든 전처리 적용
vtrain_df.printSchema() #모든 파생 피처 컬럼과 최종feature_vector가 포함된 완전 전처리 데이터프레임

#vtrain_df는 전처리 및 피처 엔지니어링이 끝난 학습 데이터

root
 |-- passenger_count: integer (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_time: integer (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- pickup_location_id_idx: double (nullable = false)
 |-- pickup_location_id_onehot: vector (nullable = true)
 |-- dropoff_location_id_idx: double (nullable = false)
 |-- dropoff_location_id_onehot: vector (nullable = true)
 |-- day_of_week_idx: double (nullable = false)
 |-- day_of_week_onehot: vector (nullable = true)
 |-- passenger_count_vector: vector (nullable = true)
 |-- passenger_count_scaled: vector (nullable = true)
 |-- trip_distance_vector: vector (nullable = true)
 |-- trip_distance_scaled: vector (nullable = true)
 |-- pickup_time_vector: vector (nullable = true)
 |-- pickup_time_scaled: vector (nullable = true)
 |-- feature_vector: vector (nul

In [16]:
vtrain_df.select('feature_vector').show(2)

+--------------------+
|      feature_vector|
+--------------------+
|(533,[62,311,527,...|
|(533,[62,280,526,...|
+--------------------+
only showing top 2 rows



In [17]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=50, solver='normal',  
                 labelCol='total_amount', featuresCol='feature_vector')

#labelCol='total_amount': 예측하고자 하는 값(요금 컬럼) 지정
#featuresCol='feature_vector': 입력값으로 쓸 벡터 컬럼명 지정 (여기서는 feature_vector)

In [18]:
model = lr.fit(vtrain_df)

#학습 데이터(vtrain_df)에서 feature_vector → total_amount를 예측하도록 모델 학습 수행

In [19]:
#테스트데이터(test_df)에도 전처리 변환
vtest_df = fitted_transform.transform(test_df)
#테스트데이터로 total_amount(요금) 예측
pred = model.transform(vtest_df)
#pred DataFrame에는 원본 데이터 + 예측값(prediction) 컬럼이 추가됨

In [20]:
pred.select('total_amount', 'prediction').show(3)

+------------+------------------+
|total_amount|        prediction|
+------------+------------------+
|       10.55|12.695522792729275|
|        13.3|14.450558014776915|
|        21.3|21.108271361254218|
+------------+------------------+
only showing top 3 rows



In [21]:
model.summary.r2, model.summary.rootMeanSquaredError

(0.80849012500813, 5.6485201652667625)

In [None]:
spark.stop()