###### 2020-11-11 수요일

# 03_Spark를 이용한 Machine Learning

## 목차

#### 1. Titanic Data불러오기

#### 2. Spark를 이용한 EDA

#### 3. 결측치 처리

#### 4. 파생변수만들기

#### 5. Spark를 이용한 Machine Learning

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


In [3]:
conf = SparkConf().setMaster('local').setAppName('spark_ml')
spark = SparkContext(conf=conf)
spark

In [4]:
sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x1e54073d488>

## 1. Titanic Data불러오기

In [5]:
titanic = sqlCtx.read.csv('./실습데이터/spark_titanic_train.csv',
                          header      = True,
                          inferSchema = True) # 이걸 지정해 주어야 원래의 타입에 맞도록 데이터가 입력된다.

In [6]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [7]:
titanic.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [8]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [9]:
titanic.select(['Survived', 'Pclass', 'Embarked']).show()

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       0|     3|       S|
|       1|     1|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       Q|
|       0|     1|       S|
|       0|     3|       S|
|       1|     3|       S|
|       1|     2|       C|
|       1|     3|       S|
|       1|     1|       S|
|       0|     3|       S|
|       0|     3|       S|
|       0|     3|       S|
|       1|     2|       S|
|       0|     3|       Q|
|       1|     2|       S|
|       0|     3|       S|
|       1|     3|       C|
+--------+------+--------+
only showing top 20 rows



## 2. Spark를 이용한 EDA

In [10]:
titanic.groupby('Sex', 'Survived').count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



##### 

In [11]:
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

##### `mean`함수
##### 나이 평균을 구한다면?

In [12]:
titanic[['Age']].agg({'Age' : 'mean'}).show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



In [13]:
titanic.select(mean('Age')).show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



##### `regexp_extract()` 함수

In [14]:
titanic.select('name').show()

+--------------------+
|                name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [15]:
titanic = titanic.withColumn('initial', regexp_extract(col('Name'), '([A-Za-z]+)', 1))

In [16]:
titanic.select('initial').distinct().show()

+-------------+
|      initial|
+-------------+
|     Pavlovic|
|         Saad|
|      Palsson|
| Thorneycroft|
|   Johannesen|
|     Meanwell|
|      Markoff|
|       Porter|
|     Harrison|
|     Bissette|
|        Hampe|
|      Hegarty|
|Lemberopolous|
|      Ekstrom|
|       Boulos|
|       Bourke|
|     McGovern|
|     Kvillner|
|   Goldenberg|
|        Keefe|
+-------------+
only showing top 20 rows



In [17]:
titanic = titanic.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])


In [18]:
titanic.groupby('initial').avg('Age').show()

+-------------+--------+
|      initial|avg(Age)|
+-------------+--------+
|     Pavlovic|    32.0|
|         Saad|    25.0|
|      Palsson|    10.5|
| Thorneycroft|    null|
|   Johannesen|    null|
|     Meanwell|    null|
|      Markoff|    35.0|
|       Porter|    47.0|
|     Harrison|    40.0|
|     Bissette|    35.0|
|        Hampe|    20.0|
|      Hegarty|    18.0|
|Lemberopolous|    34.5|
|      Ekstrom|    45.0|
|       Boulos|     9.0|
|       Bourke|    36.0|
|     McGovern|    null|
|     Kvillner|    31.0|
|   Goldenberg|    49.0|
|        Keefe|    null|
+-------------+--------+
only showing top 20 rows



##### `filter()`함수

In [19]:
titanic.filter(titanic['Age'] == 48).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------+-------+-----+--------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked| initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------+-------+-----+--------+--------+
|        461|       1|     1| Anderson, Mr. Harry|  male|48.0|    0|    0|     19952|  26.55|  E12|       S|Anderson|
|        464|       0|     2|Milling, Mr. Jaco...|  male|48.0|    0|    0|    234360|   13.0| null|       S| Milling|
|        557|       1|     1|"Duff Gordon, Lad...|female|48.0|    1|    0|     11755|   39.6|  A16|       C|    Duff|
|        646|       1|     1|Harper, Mr. Henry...|  male|48.0|    1|    0|  PC 17572|76.7292|  D33|       C|  Harper|
|        713|       1|     1|Taylor, Mr. Elmer...|  male|48.0|    1|    0|     19996|   52.0| C126|       S|  Taylor|
|        737|       0|     3|Ford, Mrs. Edward...|female

## 3. 결측치 처리

In [20]:
# 이 함수는 칼럼마다 결측값의 수를 세어주는 함수입니다
def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows
            null_columns_counts.append(temp)
    return(null_columns_counts)


In [21]:
null_list = null_value_count(titanic)

In [22]:
null_list

[('Age', 177), ('Cabin', 687), ('Embarked', 2)]

In [23]:
sqlCtx.createDataFrame(null_list, ['column', 'cnt']).show()

+--------+---+
|  column|cnt|
+--------+---+
|     Age|177|
|   Cabin|687|
|Embarked|  2|
+--------+---+



##### `Embarked`칼럼의 결측치를 `S`로 채워보자

In [24]:
titanic.groupby('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [25]:
titanic = titanic.na.fill({'Embarked' : 'S'})

In [26]:
titanic.groupby('Embarked').count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|       C|  168|
|       S|  646|
+--------+-----+



##### `Age` 칼럼의 결측치를 20으로 채우자

In [27]:
titanic = titanic.na.fill({'Age' : 20})

## 4. 파생변수만들기

##### `SibSp`와 `Parch`를 합하여 `Family_Size`라는 새로운 변수를 만들고 기존 데이터프레임에 추가하여라

In [28]:
titanic = titanic.withColumn('Family_Size', col('SibSp') + col('Parch'))

In [29]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)



In [30]:
titanic.select('Family_Size').show()

+-----------+
|Family_Size|
+-----------+
|          1|
|          1|
|          0|
|          1|
|          0|
|          0|
|          0|
|          4|
|          2|
|          1|
|          2|
|          0|
|          0|
|          6|
|          0|
|          0|
|          5|
|          0|
|          1|
|          0|
+-----------+
only showing top 20 rows



In [31]:
titanic.groupby('Family_Size').count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



##### 혼자 탑승한 사람은 1, 가족이 있는사람은 0 으로하는 `Alone` 칼럼을 만들라

In [32]:
titanic = titanic.withColumn('Alone', lit(0))
## lit()는 뭐죠? : 파이썬 기본 내장함수인 int()와 같은 함수이다.

In [33]:
titanic.select('Alone').show()

+-----+
|Alone|
+-----+
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
|    0|
+-----+
only showing top 20 rows



##### `when().otherwise()`함수
   - if ~ else와 같은 방식으로 작동된다

In [34]:
titanic = titanic.withColumn('Alone', when(titanic['Family_Size'] == 0, 1).otherwise(0))

In [35]:
titanic.select(['Family_Size','Alone']).show()

+-----------+-----+
|Family_Size|Alone|
+-----------+-----+
|          1|    0|
|          1|    0|
|          0|    1|
|          1|    0|
|          0|    1|
|          0|    1|
|          0|    1|
|          4|    0|
|          2|    0|
|          1|    0|
|          2|    0|
|          0|    1|
|          0|    1|
|          6|    0|
|          0|    1|
|          0|    1|
|          5|    0|
|          0|    1|
|          1|    0|
|          0|    1|
+-----------+-----+
only showing top 20 rows



## 5. Spark를 이용한 Machine Learning

In [36]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


In [37]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic) for column in ["Sex","Embarked"]]
pipeline = Pipeline(stages=indexers)
titanic = pipeline.fit(titanic).transform(titanic)


In [38]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)



##### 사용하지 않는 열제거

In [39]:
titanic = titanic.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","initial")

In [40]:
titanic.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)



In [41]:
titanic.show()

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|
|       0|     3|20.0|    0|    0| 8.4583|          0|    1|      0.0|           2.0|
|       0|     1|54.0|    0|    0|51.8625|          0|    1|      0.0|           0.0|
|       0|     3| 2.0|    3|    1| 21.075|          4|    0|      0.0|           0.0|
|       1|     3|27.0|    0|    2|11.1333|          2|

In [42]:
from pyspark.ml.feature import VectorAssembler

In [43]:
feature = VectorAssembler(inputCols=titanic.columns[1:], outputCol='features')
feature

VectorAssembler_51efdf31879e

In [44]:
feature_vector = feature.transform(titanic)

In [45]:
feature_vector.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- features: vector (nullable = true)



##### Train set / Test set 나누기

In [46]:
trainData, testData = feature_vector.randomSplit([0.8, 0.2], seed=100)

In [47]:
type(trainData)

pyspark.sql.dataframe.DataFrame

In [52]:
trainData.show()

+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|    Fare|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+--------------------+
|       0|     1| 2.0|    1|    2|  151.55|          3|    0|      1.0|           0.0|[1.0,2.0,1.0,2.0,...|
|       0|     1|18.0|    1|    0|   108.9|          1|    0|      0.0|           1.0|[1.0,18.0,1.0,0.0...|
|       0|     1|19.0|    1|    0|    53.1|          1|    0|      0.0|           0.0|[1.0,19.0,1.0,0.0...|
|       0|     1|20.0|    0|    0|     0.0|          0|    1|      0.0|           0.0|(9,[0,1,6],[1.0,2...|
|       0|     1|20.0|    0|    0|     0.0|          0|    1|      0.0|           0.0|(9,[0,1,6],[1.0,2...|
|       0|     1|20.0|    0|    0|  25.925|          0|    1|      0.0|           0.0|(9,[0,1,4,6],[1.0...|
|       0|     1|20.0|    0|

 - 모델링
 - Spark ML(DTC, LR, RF, GDTC, NB, SVM)

##### 로지스틱 회귀분석을 이용하여 모델을 학습하고 예측하자

In [48]:
# Logistic Regression
# 데이터의 범주가 0 또는 1

from pyspark.ml.classification import LogisticRegression

In [49]:
# 모델을 생성
lr = LogisticRegression(labelCol='Survived',   # label을 지정
                        featuresCol='features') # Feaure들을 지정

In [50]:
# 모델 학습
lr_model = lr.fit(trainData)

# 모델 예측
lr_pred = lr_model.transform(testData)


In [56]:
lr_pred.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



 - `sklearn`의 예측결과와는 다르게 Test set에 예측결과(학률, 라벨)가 더해져서 결과가 출력된다

##### 모델을 평가한다.

In [57]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [58]:
# 모델 성능평가지표를 설정한다.

evaluator = MulticlassClassificationEvaluator(labelCol      = 'Survived',
                                              predictionCol = 'prediction',
                                              metricName    = 'accuracy')  # 정확도를 보여주세요

In [60]:
accuracy = evaluator.evaluate(lr_pred)

In [61]:
print('accuracy :', accuracy)
print('error    :', 1 - accuracy)

accuracy : 0.788235294117647
error    : 0.21176470588235297
