# ML package of PySpark
최상단 레벨에 3개의 추상 클래스를 가지고 있다. (트랜스포머, 에스티메이터, 파이프라인)

## 트랜스포머

트랜스포머 클래스는 데이터프레임에 새로운 컬럼을 추가하고 데이터를 변형.
spark.ml.feature에 많은 트랜스포머들이 있음.

ChiSqSelector, CountVectorizer, HashingTF, IDF, indexToString, MaxAbsScaler, MinMaxScaler  
import findspark,pyspark  
findspark.find()  

In [11]:
# RDD로 바뀐것 ->DF로 다시 바꿔줘야 MLlib사용가능
import findspark,pyspark
findspark.find()

'C:\\Bigdata\\spark-2.4.5-bin-hadoop2.7'

In [12]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local[*]')
sc=pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=appName, master=local[*]) created by __init__ at <ipython-input-3-31414dea88fd>:5 

In [14]:
df = spark.createDataFrame(
    [(12,10,3), (1,4,2)],
    ['a','b','c']
)

In [15]:
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
| 12| 10|  3|
|  1|  4|  2|
+---+---+---+



In [16]:
import pyspark.ml.feature as ft

In [17]:
ft.VectorAssembler()

VectorAssembler_77290a494e36

In [18]:
ft.VectorAssembler(inputCols=['a','b','c'], outputCol='features') \
                .transform(df) \
                .select('features') \
                .collect()

[Row(features=DenseVector([12.0, 10.0, 3.0])),
 Row(features=DenseVector([1.0, 4.0, 2.0]))]

In [19]:
import pyspark.sql.types as typ
labels = [
      ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
      ('BIRTH_PLACE', typ.StringType()),
      ('MOTHER_AGE_YEARS', typ.IntegerType()),
      ('FATHER_COMBINED_AGE', typ.IntegerType()),
      ('CIG_BEFORE', typ.IntegerType()),
      ('CIG_1_TRI', typ.IntegerType()),
      ('CIG_2_TRI', typ.IntegerType()),
      ('CIG_3_TRI', typ.IntegerType()),
      ('MOTHER_HEIGHT_IN', typ.IntegerType()),
      ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
      ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
      ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
      ('DIABETES_PRE', typ.IntegerType()),
      ('DIABETES_GEST', typ.IntegerType()),
      ('HYP_TENS_PRE', typ.IntegerType()),
      ('HYP_TENS_GEST', typ.IntegerType()),
      ('PREV_BIRTH_PRETERM', typ.IntegerType())
]
schema = typ.StructType([typ.StructField(e[0],e[1],False) for e in labels])

In [21]:
base_path = "../../data/RDD_example/"
births = spark.read.csv(base_path+'births_transformed.csv',header=True,schema=schema)

BIRTH_PLACE 칼럼 인코딩 작업

In [22]:
births= births.withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE']\
                .cast(typ.IntegerType()))
# BIRTH_PLACE_INT 라는 컬럼을 만든뒤, births['BIRTH_PLACE']를 가져와 cast한다

In [23]:
encoder=ft.OneHotEncoder(inputCol = 'BIRTH_PLACE_INT',
                        outputCol = 'BIRTH_PLACE_VEC')

# input, output을 지정하주고 onehotencode한다

In [24]:
featuresCreator=ft.VectorAssembler(
                    inputCols=[col[0] for col in labels[2:]]+\
                    [encoder.getOutputCol()],
                    outputCol='features'
)

create an estimator  
에스티메이터  
에스티메이터는 관찰된 데이터들에 대해 예측이나 분류를 수행하는데 필요한 통계모델  

- 분류모델 : LogisticRegression,  DecisionTreeClassifier, RandomForestClassifier...
- 회귀모델 : LinearRegression, RandomForestRegression...
- 군집모델 : K-Means, GaussianMixture ...

In [25]:
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter = 10, regParam=0.01,
                                labelCol='INFANT_ALIVE_AT_REPORT')

Create a pipeline

파이프라인  
ML에서의 파이프라인은 엔드 투 엔드 변환 - 추정 과정에 대한 처리.  
원본 데이터를 받아서(데이터프레임) 필요한 데이터 변형을 수행 한 수(트랜스포메이션), 최종적으로  
통계모델(에스티메이터)을 생성한다.

In [26]:
from pyspark.ml import Pipeline
pipline=Pipeline(stages=[encoder, featuresCreator, logistic])

Fit the model

In [27]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed = 555)

In [28]:
print(births_train)

DataFrame[INFANT_ALIVE_AT_REPORT: int, BIRTH_PLACE: string, MOTHER_AGE_YEARS: int, FATHER_COMBINED_AGE: int, CIG_BEFORE: int, CIG_1_TRI: int, CIG_2_TRI: int, CIG_3_TRI: int, MOTHER_HEIGHT_IN: int, MOTHER_PRE_WEIGHT: int, MOTHER_DELIVERY_WEIGHT: int, MOTHER_WEIGHT_GAIN: int, DIABETES_PRE: int, DIABETES_GEST: int, HYP_TENS_PRE: int, HYP_TENS_GEST: int, PREV_BIRTH_PRETERM: int, BIRTH_PLACE_INT: int]


In [29]:
model = pipline.fit(births_train)

In [30]:
test_model=model.transform(births_test)

In [31]:
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=60, MOTHER_PRE_WEIGHT=154, MOTHER_DELIVERY_WEIGHT=154, MOTHER_WEIGHT_GAIN=0, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 60.0, 7: 154.0, 8: 154.0, 16: 1.0}), rawPrediction=DenseVector([1.1749, -1.1749]), probability=DenseVector([0.764, 0.236]), prediction=0.0)]

rawPrediction : 회귀계수의 선형 결합값.  
probability : 확률  
prediction : 예측결과  

# Model performace

In [32]:
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
            rawPredictionCol='probability',
            labelCol='INFANT_ALIVE_AT_REPORT'
)

In [33]:
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName:'areaUnderPR'}))

0.7401520968450921
0.713487623217287


Saving the model

In [34]:
pipelinePath = './data/infant_oneHotEncoder_logistic_pipline'
pipline.write().overwrite().save(pipelinePath)
#아까 만들었던 모델 저장

In [35]:
loadedPipeline = Pipeline.load(pipelinePath)

In [36]:
loadedPipeline.fit(births_train).transform(births_test).take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=12, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=60, MOTHER_PRE_WEIGHT=154, MOTHER_DELIVERY_WEIGHT=154, MOTHER_WEIGHT_GAIN=0, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 12.0, 1: 99.0, 6: 60.0, 7: 154.0, 8: 154.0, 16: 1.0}), rawPrediction=DenseVector([1.1749, -1.1749]), probability=DenseVector([0.764, 0.236]), prediction=0.0)]

In [37]:
from pyspark.ml import PipelineModel
modelPath = './data/infant_oneHotEncoder_logistic_pipelineModel'
model.write().overwrite().save(modelPath)

In [38]:
loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)

In [39]:
print(evaluator.evaluate(test_loadedModel, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_loadedModel, {evaluator.metricName:'areaUnderPR'}))
# 확인

0.7401520968450921
0.713487623217287


### Parmeter hyper-tuning

Grid search


In [40]:
import pyspark.ml.tuning as tune

In [41]:
logistic = cl.LogisticRegression(labelCol = 'INFANT_ALIVE_AT_REPORT')

In [42]:
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2, 10, 50])\
                                .addGrid(logistic.regParam, [0.01, 0.05, 0.3])\
                                .build()

In [43]:
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol='probability',
labelCol = 'INFANT_ALIVE_AT_REPORT')

In [46]:
cv = tune.CrossValidator(
estimator = logistic, 
estimatorParamMaps = grid,
evaluator = evaluator)

In [47]:
pipeline = Pipeline(stages = [encoder, featuresCreator])
data_transformer = pipeline.fit(births_train)

In [48]:
cvModel = cv.fit(data_transformer.transform(births_train))

In [53]:
data_train = data_transformer.transform(births_test)
results= cvModel.transform(data_train)

In [54]:
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))# test_loadedModel 0.7401520968450921
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'})) # test_loadedModel 0.713487623217287

0.7410337059879951
0.7147457221631203


In [58]:
results = [
    (
        [
            {key.name: paramValue}
            for key, paramValue in zip(params.keys(), params.values())
        ], metric
    )
    for params, metric in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)
]

In [65]:
sorted(results, key = lambda el: el[1], reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7384482711319624)

In [66]:
selector = ft.ChiSqSelector(numTopFeatures=5,
                           featuresCol = featuresCreator.getOutputCol(),
                           outputCol = 'selectedFeatures',
                           labelCol = 'INFANT_ALIVE_AT_REPORT')
logistic = cl.LogisticRegression(
labelCol = 'INFANT_ALIVE_AT_REPORT',
featuresCol = 'selectedFeatures')
pipeline = Pipeline(stages = [encoder, featuresCreator, selector])
data_transformer = pipeline.fit(births_train)

In [70]:
tvs = tune.TrainValidationSplit(
estimator = logistic, 
estimatorParamMaps = grid,
evaluator = evaluator)

In [71]:
tvsModel = tvs.fit(data_transformer.transform(births_train))

In [73]:
data_train = data_transformer.transform(births_test)
results = tvsModel.transform(data_train)

In [74]:
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderROC'}))# results기존값 0.7410337059879951
print(evaluator.evaluate(results, {evaluator.metricName:'areaUnderPR'})) # results기존갑 0.7147457221631203

0.62100281440712
0.5898593074787732


# standardizing continuous variables
## 표준화하기
- 연속변수 표준화는 핓처간의 관계를 더욱 잘 이해하게 할 뿐만아니라 컴퓨팅 계산량을 효율적으로 하기도 하면 수치적 에러가 일어나지 않게 하기도 한다

In [80]:
import numpy as np
x = np.arange(0, 100)
x = x/100.0 * np.pi * 4
y = x * np.sin(x/1.764) + 20.1234

In [82]:
schema = typ.StructType([
    typ.StructField('continuous_var', typ.DoubleType(), False)
])

In [84]:
data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)
data.printSchema()

In [94]:
data.take(3)

[Row(continuous_var=20.1234),
 Row(continuous_var=20.132344452369832),
 Row(continuous_var=20.159087064491775)]

In [95]:
data.show(3)

+------------------+
|    continuous_var|
+------------------+
|           20.1234|
|20.132344452369832|
|20.159087064491775|
+------------------+
only showing top 3 rows



In [96]:
vectorizer = ft.VectorAssembler(
inputCols = ['continuous_var'],
outputCol = 'continuous_vec')

In [101]:
# StandardScaler
# MinMaxScaler
# MaxabsScaler
normalizer = ft.StandardScaler(
                inputCol = vectorizer.getOutputCol(),
                outputCol = 'normalized',
                withMean = True,
                withStd = True)

In [100]:
pipeline = Pipeline(stages = [vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)
data_standardized.show()

+------------------+--------------------+--------------------+
|    continuous_var|      continuous_vec|          normalized|
+------------------+--------------------+--------------------+
|           20.1234|           [20.1234]|[0.23429139554502...|
|20.132344452369832|[20.132344452369832]|[0.23630959828688...|
|20.159087064491775|[20.159087064491775]|[0.24234373105179...|
|20.203356291885854|[20.203356291885854]|[0.2523325232564452]|
| 20.26470185735763| [20.26470185735763]|[0.2661743755372584]|
|20.342498180090526|[20.342498180090526]|[0.2837281334817457]|
|  20.4359491438498|  [20.4359491438498]|[0.30481416351354...|
|20.544094172020312|[20.544094172020312]|[0.32921572364798...|
|20.665815568330437|[20.665815568330437]|[0.35668061983374...|
|20.799847073505322|[20.799847073505322]|[0.38692313665363...|
|  20.9447835797997|  [20.9447835797997]|[0.4196262292862522]|
| 21.09909193743627| [21.09909193743627]|[0.4544439618423734]|
|21.261122779470593|[21.261122779470593]|[0.49100417549

# Classification

In [139]:
# RandomForestClassifier는 더블타임 데이터를 다룸
import pyspark.sql.functions as func
births = births.withColumn( 'INFANT_ALIVE_AT_REPORT', func.col('INFANT_ALIVE_AT_REPORT')\
                          .cast(typ.DoubleType()) )

In [140]:
births_train, births_test = births.randomSplit( [0.7, 0.3], seed=333 )

## RandomForest

In [141]:
classifier = cl.RandomForestClassifier( numTrees=5, maxDepth=5, 
                                        labelCol='INFANT_ALIVE_AT_REPORT' )
pipeline = Pipeline(stages=[encoder, featuresCreator, classifier])

In [142]:
model = pipeline.fit(births_train)

In [143]:
test = model.transform(births_test)

In [144]:
evaluator = ev.BinaryClassificationEvaluator(labelCol = 'INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test, {evaluator.metricName:'areaUnderPR'}))
# results기존값 0.7410337059879951
# results기존갑 0.7147457221631203

0.7512530645009318
0.7045194467626751


## DecisionTreeClassifier

In [145]:
classifier = cl.DecisionTreeClassifier(maxDepth=5, 
                                       labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[encoder, featuresCreator, classifier])

In [146]:
model = pipeline.fit(births_train)

In [147]:
test = model.transform(births_test)

In [148]:
print(evaluator.evaluate(test, {evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test, {evaluator.metricName:'areaUnderPR'}))

0.7030789615584603
0.7083770130284276


# Clustering

In [149]:
import pyspark.ml.clustering as clus

kmeans = clus.KMeans(k=5, featuresCol = 'features')
pipeline = Pipeline(stages = [encoder, featuresCreator, kmeans])
model = pipeline.fit(births_train)
test = model.transform(births_test)

In [150]:
test.groupBy('prediction').agg({'*': 'count', 'MOTHER_HEIGHT_IN': 'avg'}).collect()

[Row(prediction=1, avg(MOTHER_HEIGHT_IN)=67.87682672233821, count(1)=479),
 Row(prediction=3, avg(MOTHER_HEIGHT_IN)=63.94812680115274, count(1)=9022),
 Row(prediction=4, avg(MOTHER_HEIGHT_IN)=66.14646464646465, count(1)=198),
 Row(prediction=2, avg(MOTHER_HEIGHT_IN)=85.08866995073892, count(1)=406),
 Row(prediction=0, avg(MOTHER_HEIGHT_IN)=65.39464033850494, count(1)=3545)]