In [46]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import StorageLevel

from pyspark.sql import Row
from pyspark.sql.functions import *

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType


In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Pyspark") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

In [3]:
sc

## Read CSV

In [54]:
df = spark.read.csv("/Users/byeon/Dropbox/workspace/til/data/titanic-train.csv", header=True, inferSchema=True)

In [7]:
df.cache()

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [17]:
df.is_cached

True

In [9]:
df.createOrReplaceTempView("train")

In [12]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [14]:
df.describe("age").show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|               714|
|   mean| 29.69911764705882|
| stddev|14.526497332334035|
|    min|              0.42|
|    max|              80.0|
+-------+------------------+



In [15]:
df.describe("age", "Sex").show()

+-------+------------------+------+
|summary|               age|   Sex|
+-------+------------------+------+
|  count|               714|   891|
|   mean| 29.69911764705882|  null|
| stddev|14.526497332334035|  null|
|    min|              0.42|female|
|    max|              80.0|  male|
+-------+------------------+------+



In [28]:
df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [29]:
df.groupBy("Pclass", "Survived").count().orderBy("Pclass", "Survived").show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     1|       1|  136|
|     2|       0|   97|
|     2|       1|   87|
|     3|       0|  372|
|     3|       1|  119|
+------+--------+-----+



### Column별 Null Check

In [31]:
df.select(*(
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns)).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



### SQL

In [32]:
query = """
SELECT Embarked, count(PassengerId) as count
FROM train
WHERE Survived = 1
GROUP BY Embarked
"""

spark.sql(query).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   30|
|    null|    2|
|       C|   93|
|       S|  217|
+--------+-----+



### Dataframe
- filter, groupby, count

In [44]:
df.filter("Survived == 1").groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   30|
|    null|    2|
|       C|   93|
|       S|  217|
+--------+-----+



In [45]:
df.filter("Survived == 1").groupby("PClass").count().show()

+------+-----+
|PClass|count|
+------+-----+
|     1|  136|
|     3|  119|
|     2|   87|
+------+-----+



### Missing Value
- [pyspark.sql.DataFrameNaFunctions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameNaFunctions)
- Spark ML의 [Imputer](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=imputer#pyspark.ml.feature.Imputer)도 사용 가능

### Feature Engineering
- 1) UDF(User Define Function)을 만들어 원하는 형태로 전처리
- 2) correlation, corvariance, stratified sampling 등은 이미 구현되어 있음
    - [링크](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions) 참고

In [55]:
# Column에서 Null값 Drop
df = df.drop('cabin')
before = df.select('age').where('age is null').count()
print("Before: {}".format(before))

test = df.na.drop(subset="age")
after = test.select('age').where('age is null').count()
print("After {}".format(after))

Before: 177
After 0


In [92]:
# Column에서 Null값 mean으로 채우기
avg_age = df.where('age is not null').groupBy().avg('age').collect()[0][0]
df = df.na.fill({'age': avg_age})
df.select('age').show(5)

+----+
| age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
+----+
only showing top 5 rows



- ```df.where```는 ```df.filter```의 alias

In [93]:
# label을 기준으로 Stratified Sampling
sample_df = df.sampleBy('survived', fractions={0: 0.1, 1: 0.5}, seed=0)
print("Before:")
df.groupBy('survived').count().show()
print("After:")
sample_df.groupBy('survived').count().show()

Before:
+--------+-----+
|survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+

After:
+--------+-----+
|survived|count|
+--------+-----+
|       1|  168|
|       0|   57|
+--------+-----+



In [95]:
# 승객 이름의 길이를 새로운 feature로 추가하는 예시
str_length = udf(lambda x: len(x), IntegerType())
df = df.withColumn('len_name', str_length(df['name']))
df.select('name', 'len_name').show(5)

+--------------------+--------+
|                name|len_name|
+--------------------+--------+
|Braund, Mr. Owen ...|      23|
|Cumings, Mrs. Joh...|      51|
|Heikkinen, Miss. ...|      22|
|Futrelle, Mrs. Ja...|      44|
|Allen, Mr. Willia...|      24|
+--------------------+--------+
only showing top 5 rows



In [96]:
# udf를 사용해서 categorical feature를 전처리하는 예시
# Spark ML의 StringIndexer를 사용해도 결과는 동일

def embarked_to_int(embarked):
    if embarked == 'C': return 1
    elif embarked == 'Q': return 2
    elif embarked == 'S': return 3    
    else: return 0

embarked_to_int = udf(embarked_to_int, IntegerType())
df = df.withColumn('embarked_ix', embarked_to_int(df['embarked']))
df.select('embarked', 'embarked_ix').show(5)

+--------+-----------+
|embarked|embarked_ix|
+--------+-----------+
|       S|          3|
|       C|          1|
|       S|          3|
|       S|          3|
|       S|          3|
+--------+-----------+
only showing top 5 rows



In [97]:
# StringIndexer 사용
from pyspark.ml.feature import StringIndexer

In [98]:
indexer = StringIndexer(inputCol="Embarked", outputCol="embarked_ix2")

In [99]:
indexed = indexer.fit(df).transform(df)

In [100]:
indexed.select('Embarked','embarked_ix2').show(5)

+--------+------------+
|Embarked|embarked_ix2|
+--------+------------+
|       S|         0.0|
|       C|         1.0|
|       S|         0.0|
|       S|         0.0|
|       S|         0.0|
+--------+------------+
only showing top 5 rows



In [101]:
# Spark SQL Function의 when-otherwise 절을 사용하는 방법
# categorical feature를 전처리하는 예시
df.select('sex', 
    when(df['sex'] == 'male', 0).otherwise(1).alias('sex_ix')).show(5)

+------+------+
|   sex|sex_ix|
+------+------+
|  male|     0|
|female|     1|
|female|     1|
|female|     1|
|  male|     0|
+------+------+
only showing top 5 rows



- ```when```으로 조건주기

### Extraction
- raw 데이터에서 feature 추출하는 패키지
- [공식 문서](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature)
- TF-IDF, Word2Vec, CountVectorizer, FeatureHasher

### Transformation
- feature를 변형시키는 패키지
- scaling, coverting
- Tokenizer, StopWordsRemover, n-gram, PCA, StringIndexer, OneHotEncoder
- StandardScaler, MinMaxScaler

### Selection
- feature selection을 지원하는 패키지
- feature가 많을 경우 유용
- VectorSlicer, RFormula, ChiSqSelector

In [102]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [104]:
# StringIndexer를 사용해서 categorical feature를 전처리
df = StringIndexer(inputCol='Sex', outputCol='sex_ix').fit(df).transform(df)
df.select('Sex', 'sex_ix').show(5)

+------+------+
|   Sex|sex_ix|
+------+------+
|  male|   0.0|
|female|   1.0|
|female|   1.0|
|female|   1.0|
|  male|   0.0|
+------+------+
only showing top 5 rows



In [105]:
# VectorAssembler를 사용해서 feature를 vector 형태로 변환
inputCols = ['Pclass', 'Age', 'SibSp', 'Parch', 'Fare', 'embarked_ix', 'sex_ix', 'len_name']
assembler = VectorAssembler(inputCols=inputCols, outputCol='features')

In [106]:
train = assembler.transform(df).select('PassengerId', col('Survived').alias('label'), 'features')
train.show(5)

+-----------+-----+--------------------+
|PassengerId|label|            features|
+-----------+-----+--------------------+
|          1|    0|[3.0,22.0,1.0,0.0...|
|          2|    1|[1.0,38.0,1.0,0.0...|
|          3|    1|[3.0,26.0,0.0,0.0...|
|          4|    1|[1.0,35.0,1.0,0.0...|
|          5|    0|[3.0,35.0,0.0,0.0...|
+-----------+-----+--------------------+
only showing top 5 rows



### Modeling
- [classification 문서](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification)
- 대부분 Data parallelism을 통해 분산 학습
- Spark 2.3부터 model parallelism 지원(모델 병렬화)

### Classification, Regression
- 트리 모델: DecisionTree, RandomForest, GBTClassifier
- SVM 모델: LinearSVC, OneVsRest
- MultilayerPerceptronClassifier: hidden layer가 없는 Softmax 모델
- LinearRegression, SurvivalRegression, NaiveBayes

### Clustering
- 다양한 클러스터링 알고리즘을 지원
- KMeans, LDA, GMM
- 이전에는 computeCost 함수를 통해 SSE로 모델을 평가
- 2.3 버전부터 ClusteringEvaluator 사용 가능

### Recommendation
- CF 방식의 Alternating Least Squares(ALS) 추천 알고리즘을 지원
- "Large-Scale Parallel Collaborative Filtering for the Netflix Prize" 논문을 참고
- Production에 쉽게 연동할 수 있게 만든 Apache PredictionIO도 참고 (MLlib)

In [108]:
from pyspark.ml.classification import RandomForestClassifier

In [109]:
# RandomForestClassifier 예제
# training set을 row 단위로 partitioning
splits = train.randomSplit([0.8, 0.2])
train = splits[0].cache()
test = splits[1].cache()

# cacheNodeIds: 인스턴스 마다 노드의 Id를 캐싱, 트리가 깊어진다면 성능 향상 팁
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True)

predict = model.fit(train).transform(test)
predict.show(5)

+-----------+-----+--------------------+--------------------+--------------------+----------+
|PassengerId|label|            features|       rawPrediction|         probability|prediction|
+-----------+-----+--------------------+--------------------+--------------------+----------+
|         22|    1|[2.0,34.0,0.0,0.0...|[17.3006752634910...|[0.86503376317455...|       0.0|
|         25|    0|[3.0,8.0,3.0,1.0,...|[15.4203833201276...|[0.77101916600638...|       0.0|
|         28|    0|[1.0,19.0,3.0,2.0...|[8.33806528847363...|[0.41690326442368...|       1.0|
|         39|    0|[3.0,18.0,2.0,0.0...|[10.0694063466503...|[0.50347031733251...|       0.0|
|         44|    1|[2.0,3.0,1.0,2.0,...|[3.43605630898893...|[0.17180281544944...|       1.0|
+-----------+-----+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



### Evaluation
- [문서](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.evaluation)
- 모델을 평가하기 위한 패키지, 사용할 수 있는 metric을 확인할 필요가 있음
- BinaryClassificationEvaluator: areaUnderROC만 사용 가능
- MulticlassClassificationEvaluator: f1, weightedPrecision, weightedRecall, accuracy
- RegressionEvaluator: rmse, mse, mae
- ClusteringEvaluator: 2.3 버전에 새롭게 추가, metric으로 silhouette 사용 가능
- confusionMatrix() 등 몇 가지는 아직 Spark MLlib에만 존재함

In [110]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")

evaluator.evaluate(predict)

0.8165680473372781

### Tuning
- model selection and hyperparameter tuning
- [문서](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning)
- 지정한 parameter의 조합에 대하여 반복 학습하는 형태
- 원래 data parallelism 만 지원했지만, 2.3버전부터 model parallelism도 지원하기 시작
- CrossValidator와 TrainValidationSplit에 parallelism 파라메터 지정

### ParamGridBuilder
- 파라메터를 자동으로 튜닝하기 위한 빌더 패키지 (Grid Search)
- 각 모델에 대한 파라메터는 spark.ml.param module

### CrossValidator
- K-Fold CrossValidation 그 자체 (위키 참고)
- 지정한 Fold 만큼 반복 학습

### TrainValidationSplit (Experimental)
- 지정한 비율에 따라 훈련/검증 셋을 나누어 학습에 반영
- CrossValidator에 비해 금방 끝나겠지만, 주어진 학습 데이터가 적다면 결과가 부정확할 수 있음

In [111]:
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [112]:
# Modeling
model = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    cacheNodeIds=True)

# Parameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(model.numTrees, [500, 700]) \
    .addGrid(model.maxDepth, [5, 7]) \
    .addGrid(model.impurity, ["gini"]) \
    .addGrid(model.maxBins, [31]) \
    .addGrid(model.subsamplingRate, [0.7]) \
    .build()

# Evaluator: accuracy
evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction", 
    labelCol="label", 
    metricName="accuracy")

# train:validation = 7:3
tvs = TrainValidationSplit(
    estimator=model,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.7)

tvsModel = tvs.fit(train)
predict = tvsModel.transform(test)
evaluator.evaluate(predict)

0.8224852071005917

In [113]:
train.unpersist()
test.unpersist()
df.unpersist()

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Embarked: string, len_name: int, embarked_ix: int, sex_ix: double]

In [114]:
spark.stop()

## Reference
- 박준영님 [Repo](https://github.com/Swalloow/pyspark-ml-examples/blob/master/notebooks/spark-ml-starter.ipynb)