In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
df=spark.read.csv('C:\\Users\\jihun\\Downloads\\stats (7).csv',header=True,inferSchema=True)


from pyspark.sql.types import StructType,StructField, StringType
import re

for each in df.schema.names:
    df = df.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*','',each.replace(' ', '')))
# 일부 컬럼에 있는 공백 없애기    
    
# pandas DataFrame을 spark DataFrame으로 부터 생성. 
pdf = df.select('*').toPandas()

In [4]:
df.cache()

DataFrame[last_name: string, first_name: string, player_id: int, year: int, p_k_percent: double, p_bb_percent: double, xwoba: double, z_swing_percent: double, z_swing_miss_percent: double, oz_swing_percent: double, oz_swing_miss_percent: double, oz_contact_percent: double, out_zone_percent: double, iz_contact_percent: double, in_zone_percent: double, edge_percent: double, whiff_percent: double, fastball_avg_speed: double, fastball_avg_spin: int, fastball_avg_break: double, breaking_avg_speed: double, breaking_avg_spin: int, breaking_avg_break: double, offspeed_avg_speed: double, offspeed_avg_spin: int, offspeed_avg_break: double, _c26: string]

In [5]:
from pyspark.sql.functions import count, isnan, isnull,when, col,max, avg, sum, min,lit,substring,concat,concat_ws

In [6]:
df=df.drop('_c26')
df=df.na.drop('any')

In [13]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]) .addGrid(lr.fitIntercept, [False, True]).\
addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()

In [9]:
train_sdf, test_sdf = df.randomSplit([0.8,0.2])
train_sdf.cache()
print(df.count(),train_sdf.count(),test_sdf.count())

3601 2859 742


In [8]:
features = ['p_k_percent','p_bb_percent','z_swing_percent','z_swing_miss_percent','oz_swing_percent', 'oz_swing_miss_percent',
 'oz_contact_percent', 'out_zone_percent','iz_contact_percent','in_zone_percent','edge_percent','whiff_percent','fastball_avg_speed', 'fastball_avg_spin',
 'fastball_avg_break', 'breaking_avg_speed', 'breaking_avg_spin', 'breaking_avg_break', 'offspeed_avg_speed', 'offspeed_avg_spin', 'offspeed_avg_break']

### Feature Vectorization 변환과 Estimator 객체 생성.

In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator


vec_assembler = VectorAssembler(inputCols = features, outputCol = 'features')
train_vec = vec_assembler.transform(train_sdf)

lr = LinearRegression(featuresCol = 'features', labelCol = 'xwoba')

### CrossValidator로 교차 검증과 하이퍼파라미터 튜닝 수행. 
* ParamGridBuilder로 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 Param Grid를 생성.
* 교차 검증 시 성능 평가할 Evaluator 생성. 
* CrossValidator 객체 생성시 Estimator객체, 그리드 서치용 Param Grid, 성능 평가용 Evaluator 객체, Fold 수를 인자로 입력함. 
* CrossValidator 객체의 fit(입력DataFrame) 메소드를 호출하여 교차 검증과 하이퍼파라미터 튜닝 수행. fit() 수행 후 CrossValidatorModel 반환. 
* 반환된 CrossValidatorModel의 avgMetrics 속성에 교차 검증별 평균 성능 평가점수, bestModel에 최적 하이퍼파라미터로 refit된 EstimatorModel 객체를 가짐.

In [31]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]) .addGrid(lr.fitIntercept, [False, True]).\
addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
#CrossValidator에서 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 ParamGrid 생성.

#CrossValidator에서 적용할 Evaluator 객체 생성. 
evaluator_rmse = RegressionEvaluator(labelCol='xwoba', predictionCol='prediction', metricName='rmse')

# Estimator 객체, 하이퍼파라미터 Grid를 가지는, Evaluator 객체, Fold수를 인자로 입력하여 CrossValidator 객체 생성. 
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=evaluator_rmse, numFolds=5)

# Cross validation 과 하이퍼파라미터 튜닝 수행. 
cv_model = cv.fit(train_vec)

In [32]:
print(type(cv_model))

<class 'pyspark.ml.tuning.CrossValidatorModel'>


In [33]:
# avgMetrics 속성과 param grid 속성 값을 함께 출력
list(zip(cv_model.avgMetrics, cv_model.getEstimatorParamMaps()))

[(0.03421144528122676,
  {Param(parent='LinearRegression_96aa715c3aea', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
   Param(parent='LinearRegression_96aa715c3aea', name='fitIntercept', doc='whether to fit an intercept term.'): False,
   Param(parent='LinearRegression_96aa715c3aea', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}),
 (0.04267140670406968,
  {Param(parent='LinearRegression_96aa715c3aea', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
   Param(parent='LinearRegression_96aa715c3aea', name='fitIntercept', doc='whether to fit an intercept term.'): False,
   Param(parent='LinearRegression_96aa715c3aea', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.5}),
 (0.045228911921964786,
  {Param(parent='LinearRegress

In [35]:
# param 명과 값을 출력
params = [{p.name: v for p, v in m.items()} for m in cv_model.getEstimatorParamMaps()]
print(params)

[{'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 0.0}, {'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 0.5}, {'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 1.0}, {'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 0.0}, {'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 0.5}, {'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 1.0}, {'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 0.0}, {'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 0.5}, {'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 1.0}, {'regParam': 0.01, 'fitIntercept': True, 'elasticNetParam': 0.0}, {'regParam': 0.01, 'fitIntercept': True, 'elasticNetParam': 0.5}, {'regParam': 0.01, 'fitIntercept': True, 'elasticNetParam': 1.0}]


In [36]:
import pandas as pd

print(list(zip(params, cv_model.avgMetrics)))
# param명과 값에 따른 evaluation  결과를 Pandas DataFrame으로 생성. 
cv_result= pd.DataFrame({'params': params, 
              'evaluation_result':cv_model.avgMetrics
})

display(cv_result)

[({'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 0.0}, 0.03421144528122676), ({'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 0.5}, 0.04267140670406968), ({'regParam': 0.1, 'fitIntercept': False, 'elasticNetParam': 1.0}, 0.045228911921964786), ({'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 0.0}, 0.03343771350524049), ({'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 0.5}, 0.042136624358354594), ({'regParam': 0.1, 'fitIntercept': True, 'elasticNetParam': 1.0}, 0.042136624358354594), ({'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 0.0}, 0.03038723590256326), ({'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 0.5}, 0.03253436557226967), ({'regParam': 0.01, 'fitIntercept': False, 'elasticNetParam': 1.0}, 0.03481422419855752), ({'regParam': 0.01, 'fitIntercept': True, 'elasticNetParam': 0.0}, 0.03016438868856356), ({'regParam': 0.01, 'fitIntercept': True, 'elasticNetParam': 0.5}, 0.03162773062927452), ({'regParam': 0.0

Unnamed: 0,params,evaluation_result
0,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.034211
1,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.042671
2,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.045229
3,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.033438
4,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.042137
5,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.042137
6,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.030387
7,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.032534
8,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.034814
9,"{'regParam': 0.01, 'fitIntercept': True, 'elas...",0.030164


In [37]:
import pandas as pd

# 교차 검증 결과를 pandas DataFrame으로 반환하는 함수 생성. 
def get_cv_result_pdf(cv_model):
    params = [{p.name: v for p, v in m.items()} for m in cv_model.getEstimatorParamMaps()]

    cv_result_pdf= pd.DataFrame({'params': params, 'evaluation_result':cv_model.avgMetrics })
    
    return cv_result_pdf

In [38]:
get_cv_result_pdf(cv_model)

Unnamed: 0,params,evaluation_result
0,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.034211
1,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.042671
2,"{'regParam': 0.1, 'fitIntercept': False, 'elas...",0.045229
3,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.033438
4,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.042137
5,"{'regParam': 0.1, 'fitIntercept': True, 'elast...",0.042137
6,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.030387
7,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.032534
8,"{'regParam': 0.01, 'fitIntercept': False, 'ela...",0.034814
9,"{'regParam': 0.01, 'fitIntercept': True, 'elas...",0.030164


### CrossValidatorModel을 이용하여 예측 수행. 
* CrossValidatorModel 객체의 transform(테스트 DataFrame)을 호출하여 예측. CrossValidatorModel객체는 최적의 하이퍼 파라미터로 학습되었음. 
* CrossValidatorModel 객체의 bestModel은 최적 하이퍼파라미터로 EstimatorModel이 재학습된 것임.

In [39]:
test_vec = vec_assembler.transform(test_sdf)
predictions = cv_model.transform(test_vec)


evaluator_rmse = RegressionEvaluator(labelCol='xwoba', predictionCol='prediction', metricName='rmse')
print('rmse : ', evaluator_rmse.evaluate(predictions))

rmse :  0.02950179582961995


In [40]:
best_lr_model = cv_model.bestModel
print(best_lr_model)

LinearRegressionModel: uid=LinearRegression_96aa715c3aea, numFeatures=21


In [48]:
 {param.name: value for param, value in zip(cv_model.bestModel.extractParamMap().keys(), cv_model.bestModel.extractParamMap().values())}

{'aggregationDepth': 2,
 'elasticNetParam': 0.0,
 'epsilon': 1.35,
 'featuresCol': 'features',
 'fitIntercept': True,
 'labelCol': 'xwoba',
 'loss': 'squaredError',
 'maxBlockSizeInMB': 0.0,
 'maxIter': 100,
 'predictionCol': 'prediction',
 'regParam': 0.01,
 'solver': 'auto',
 'standardization': True,
 'tol': 1e-06}

In [50]:
best_lr_model.extractParamMap()

{Param(parent='LinearRegression_96aa715c3aea', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_96aa715c3aea', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_96aa715c3aea', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_96aa715c3aea', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LinearRegression_96aa715c3aea', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_96aa715c3aea', name='labelCol', doc='label column name.'): 'xwoba',
 Param(parent='LinearRegression_96aa715c3aea', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError

In [51]:
best_model_predictions = best_lr_model.transform(test_vec)

print("\n##### Best Model로 테스트 데이터 예측 결과 ######")
print('정확도:', evaluator_rmse.evaluate(best_model_predictions))

display(best_model_predictions)


##### Best Model로 테스트 데이터 예측 결과 ######
정확도: 0.02950179582961995


DataFrame[last_name: string, first_name: string, player_id: int, year: int, p_k_percent: double, p_bb_percent: double, xwoba: double, z_swing_percent: double, z_swing_miss_percent: double, oz_swing_percent: double, oz_swing_miss_percent: double, oz_contact_percent: double, out_zone_percent: double, iz_contact_percent: double, in_zone_percent: double, edge_percent: double, whiff_percent: double, fastball_avg_speed: double, fastball_avg_spin: int, fastball_avg_break: double, breaking_avg_speed: double, breaking_avg_spin: int, breaking_avg_break: double, offspeed_avg_speed: double, offspeed_avg_spin: int, offspeed_avg_break: double, features: vector, prediction: double]

### Pipeline과 결합하여 사용하기
* CrossValidator에서 Estimator 대신 Pipeline을 적용할 수 있음. 
* Pipeline의 stage에 CrossValidator를 등록 시킬 수 있음.

#### CrossValidator에서 Estimator 대신 Pipeline을 적용

In [0]:
from pyspark.ml import Pipeline

train_sdf, test_sdf = iris_sdf.randomSplit([0.7, 0.3], seed=0)

iris_columns = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
stage_1 = VectorAssembler(inputCols=iris_columns, outputCol='features')
stage_2 = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10 )

# Feature Vectorization 변환, Estimator 객체 등록하여 Pipeline 객체 생성. 
pipeline_01 = Pipeline(stages=[stage_1, stage_2])

# 하이퍼파라미터 튜닝을 위한 그리드 서치(Grid Search)용 ParamGrid 생성. stage_2.maxDepth로 지정해야함
param_grid_01 = ParamGridBuilder().addGrid(stage_2.maxDepth, [5, 7, 8, 10])\
                               .addGrid(stage_2.minInstancesPerNode, [3, 5, 6])\
                               .build()

evaluator_accuracy_01 = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

#CrossValidator 객체 생성 시 estimator 인자로 Estimator 객체가 아닌 Pipeline 객체 입력
cv = CrossValidator(estimator=pipeline_01, estimatorParamMaps=param_grid_01, evaluator=evaluator_accuracy_01, numFolds=3)

# Cross validationr과 하이퍼 파라미터 튜닝을 그리드 서치 방법으로 수행. 
# fit(입력 DataFrame)에서 입력 DataFrame은 feature vectorization이 되어 있지 않아야 함. Pipeline에서 feature vectorization을 수행하기 때문임. 
cv_model_01 = cv.fit(train_sdf) # pipeline_01.fit(train_sdf) 

cv_result_pdf = get_cv_result_pdf(cv_model_01)
display(cv_result_pdf)

In [0]:
#cv_model의 tranform()호출 시 test_sdf는 feature vectorization되지 않아야 함. Pipeline에서 feature vectorization을 수행하기 때문임. 
predictions = cv_model_01.transform(test_sdf)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print(evaluator_accuracy.evaluate(predictions))

#### Pipeline의 stage에 CrossValidator를 등록

In [0]:
# feature vectorization stage와 Cross Validation Stage로 Pipeline 구성. 
stage_vectorized = VectorAssembler(inputCols=iris_columns, outputCol='features')
# Cross Validation Stage를 위하여 CrossValidator용 Estimator 생성. 
dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='label')

# Cross Validation Stage를 위하여 CrossValidator용 Param Grid 생성. 
param_grid_02 = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10])\
                                  .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6])\
                                  .build()

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# Cross Validation Stage를 위하여 CrossValidator객체 생성. 
stage_cv = CrossValidator(estimator=dt_estimator, estimatorParamMaps=param_grid_02, evaluator=evaluator_accuracy, numFolds=3)

# feature vectorization stage와 Cross Validation Stage를 등록하여 Pipeline 객체 생성. 
pipeline_02 = Pipeline(stages=[stage_vectorized, stage_cv])

# Pipeline fit()수행하여 feature vectorization -> Cross Validation 수행. 
pipeline_model_02 = pipeline_02.fit(train_sdf)


In [0]:
print(pipeline_model_02.stages)
print(type(pipeline_model_02.stages[-1]))

In [0]:
cv_model_02 = pipeline_model_02.stages[-1]
cv_result_pdf = get_cv_result_pdf(cv_model_02)
display(cv_result_pdf)

In [0]:
# 예측을 위해 test용 DataFrame을 pipeline에 등록된 feature vectorization 적용 -> CrossValidatorModel의 transform() 수행하여 예측 
predictions = pipeline_model_02.transform(test_sdf) 
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print(evaluator_accuracy.evaluate(predictions))

### TrainValidationSplit를 이용하여 하이퍼 파라미터 튜닝

In [0]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

vector_assembler = VectorAssembler(inputCols=iris_columns, outputCol='features')
train_sdf_vectorized = vector_assembler.transform(train_sdf)

dt_estimator = DecisionTreeClassifier(featuresCol='features', labelCol='label', maxDepth=10 )
tvs_param_grid = ParamGridBuilder().addGrid(dt_estimator.maxDepth, [5, 7, 8, 10])\
                                  .addGrid(dt_estimator.minInstancesPerNode, [3, 5, 6])\
                                  .build()

evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

# TrainValidationSplit 객체 생성. 
tvs = TrainValidationSplit(estimator=dt_estimator, estimatorParamMaps=tvs_param_grid, evaluator=evaluator_accuracy, trainRatio=0.75, seed=0)
# train과 validation 데이터세트 분리후 하이퍼파라미터 튜닝 수행. 
tvs_model = tvs.fit(train_sdf_vectorized)

In [0]:
print('validation metrics:', tvs_model.validationMetrics)
params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]
print(params)

In [0]:
import pandas as pd

# 검증 결과를 pandas DataFrame으로 반환하는 함수 생성. 
def get_tvs_result_pdf(tvs_model):
    params = [{p.name: v for p, v in m.items()} for m in tvs_model.getEstimatorParamMaps()]

    tvs_result_pdf= pd.DataFrame({'params': params, 'evaluation_result':tvs_model.validationMetrics })
    
    return tvs_result_pdf

tvs_result_pdf = get_tvs_result_pdf(tvs_model)
tvs_result_pdf.head(10)

In [0]:
#tvs_model은 fit()수행시 best model이 가지는 hyperparameter로 refit되고, 해당 EstimatorModel이 bestModel속성으로 저장됨. 
best_dt_model = tvs_model.bestModel

best_model_predictions = best_dt_model.transform(test_sdf_vectorized)

print("\n##### Best Model로 테스트 데이터 예측 결과 ######")
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
print('정확도:', evaluator_accuracy.evaluate(best_model_predictions))

display(best_model_predictions)