In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
import pyspark.sql.types as tp
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()

### 스키마 설정
불러올 데이터의 스키마가 없을 경우 모든 data type이 String으로 설정되어 읽어온다. 

이런 불상사를 방지하기 위해 스키마를 설정해놓고 데이터를 불러와보자

In [3]:
non_schema_data = spark.read.csv('../data/ind-ban-comment.csv', header=True)
non_schema_data.printSchema()

root
 |-- Batsman: string (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: string (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Isball: string (nullable = true)
 |-- Isboundary: string (nullable = true)
 |-- Iswicket: string (nullable = true)
 |-- Over: string (nullable = true)
 |-- Runs: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [4]:
schema_setting = tp.StructType([
    tp.StructField(name='Batsman', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Batsman_Name', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='Bowler', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Bowler_Name', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='Commentary', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='Detail', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='Dismissed', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Id', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Isball', dataType=tp.BooleanType(), nullable=True),
    tp.StructField(name='Isboundary', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Iswicket', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Over', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='Runs', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Timestamp', dataType=tp.TimestampType(), nullable=True)    
])

# read the data again with the defined schema
data = spark.read.csv('../data/ind-ban-comment.csv', schema=schema_setting, header=True)

# print the schema
data.printSchema()

root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: integer (nullable = true)
 |-- Iswicket: integer (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



### 필요없는 컬럼 제거 

In [5]:
drop_column_data = data.drop('Batsman', 'Bowler', 'Id')
drop_column_data.show(10)

+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+---------+
|     Batsman_Name|       Bowler_Name|          Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|Timestamp|
+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+---------+
|   Mohammed Shami| Mustafizur Rahman|OUT! Bowled! 5-fe...|     W|    28994|  true|      null|       1|49.6|   0|     null|
|Bhuvneshwar Kumar| Mustafizur Rahman|WIDE AND RUN OUT!...|  W+wd|     5132|  true|      null|       1|49.6|   1|     null|
|   Mohammed Shami| Mustafizur Rahman|Back of a length ...|  null|     null|  true|      null|    null|49.5|   1|     null|
|Bhuvneshwar Kumar| Mustafizur Rahman|Just 1 run off th...|  null|     null|  true|      null|    null|49.4|   1|     null|
|         MS Dhoni| Mustafizur Rahman|OUT! No Dhoni mag...|     W|     3676|  true|      null|       1|49.3|   0|     null|
|       

### 데이터의 행과 열의 수를 아라보자

In [6]:
(drop_column_data.count() , len(drop_column_data.columns))

(605, 11)

### StringIndexer
StringIndexer는 문자를 카테고리(숫자)로 바꿔주는 함수다.

예를들어 Batsman_Name은 20개의 string으로 입력되어 있다.

In [7]:
drop_column_data.select('Batsman_Name').distinct().count()

20

In [8]:
drop_column_data.select('Batsman_Name').distinct().show()

+------------------+
|      Batsman_Name|
+------------------+
|    Mohammed Shami|
| Bhuvneshwar Kumar|
|          MS Dhoni|
|    Dinesh Karthik|
|      Rishabh Pant|
|     Hardik Pandya|
|       Virat Kohli|
|          KL Rahul|
|      Rohit Sharma|
| Mustafizur Rahman|
|     Rubel Hossain|
|Mohammad Saifuddin|
|  Mashrafe Mortaza|
|     Sabbir Rahman|
|   Shakib Al Hasan|
|  Mosaddek Hossain|
|         Liton Das|
|   Mushfiqur Rahim|
|     Soumya Sarkar|
|       Tamim Iqbal|
+------------------+



이런식으로 범주화한 컬럼이 만들어진다
> 🤔 근데 무슨기준으로 index가 매겨지는거지 ✔ Count 순이다 ! 갯수가 가장 많은 범주가 0이다.

In [30]:
# Batsman_name을 범주화한 컬럼 이름 -> Batsman_Index
SI_batsman = StringIndexer(inputCol='Batsman_Name', outputCol='Batsman_Index')
transform_data = SI_batsman.fit(drop_column_data).transform(drop_column_data)

transform_data.select(
    'Batsman_Index', 'Batsman_Name'
    ).groupBy(
    'Batsman_Index', 'Batsman_Name'
    ).count(
    ).sort(
    'Batsman_Index'
    ).show()

+-------------+------------------+-----+
|Batsman_Index|      Batsman_Name|count|
+-------------+------------------+-----+
|          0.0|      Rohit Sharma|   94|
|          1.0|          KL Rahul|   93|
|          2.0|   Shakib Al Hasan|   75|
|          3.0|      Rishabh Pant|   43|
|          4.0|Mohammad Saifuddin|   42|
|          5.0|     Sabbir Rahman|   40|
|          6.0|     Soumya Sarkar|   39|
|          7.0|          MS Dhoni|   33|
|          8.0|       Tamim Iqbal|   31|
|          9.0|       Virat Kohli|   27|
|         10.0|         Liton Das|   24|
|         11.0|   Mushfiqur Rahim|   23|
|         12.0|     Rubel Hossain|   11|
|         13.0|    Dinesh Karthik|    9|
|         14.0|  Mosaddek Hossain|    7|
|         15.0|  Mashrafe Mortaza|    5|
|         16.0| Bhuvneshwar Kumar|    4|
|         17.0|     Hardik Pandya|    2|
|         18.0|    Mohammed Shami|    2|
|         19.0| Mustafizur Rahman|    1|
+-------------+------------------+-----+



In [33]:
SI_bowler = StringIndexer(inputCol='Bowler_Name', outputCol='Bowler_Index')
transform_data = SI_bowler.fit(transform_data).transform(transform_data)

transform_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show()

+-----------------+-------------+------------------+------------+
|     Batsman_Name|Batsman_Index|       Bowler_Name|Bowler_Index|
+-----------------+-------------+------------------+------------+
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         

### OneHotEncoder
OneHotEncoder는 범주화된 값을 (범주 갯수, [범주 값], ?) 로 표현하는 map이다.

예를들어 Batsman_Name은 20개의 string으로 입력되어 있다.

In [38]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['Batsman_Index', 'Bowler_Index'],outputCols=['Batsman_OHE', 'Bowler_OHE'])

# transform the data
OHE_data = OHE.fit(transform_data).transform(transform_data)

# view and transform the data
OHE_data.select(
    'Batsman_Name', 'Batsman_Index', 'Batsman_OHE'
    ).groupBy(
    'Batsman_Name', 'Batsman_Index', 'Batsman_OHE'
    ).count(
    ).sort(
    'Batsman_Index'
    ).show()


+------------------+-------------+---------------+-----+
|      Batsman_Name|Batsman_Index|    Batsman_OHE|count|
+------------------+-------------+---------------+-----+
|      Rohit Sharma|          0.0| (19,[0],[1.0])|   94|
|          KL Rahul|          1.0| (19,[1],[1.0])|   93|
|   Shakib Al Hasan|          2.0| (19,[2],[1.0])|   75|
|      Rishabh Pant|          3.0| (19,[3],[1.0])|   43|
|Mohammad Saifuddin|          4.0| (19,[4],[1.0])|   42|
|     Sabbir Rahman|          5.0| (19,[5],[1.0])|   40|
|     Soumya Sarkar|          6.0| (19,[6],[1.0])|   39|
|          MS Dhoni|          7.0| (19,[7],[1.0])|   33|
|       Tamim Iqbal|          8.0| (19,[8],[1.0])|   31|
|       Virat Kohli|          9.0| (19,[9],[1.0])|   27|
|         Liton Das|         10.0|(19,[10],[1.0])|   24|
|   Mushfiqur Rahim|         11.0|(19,[11],[1.0])|   23|
|     Rubel Hossain|         12.0|(19,[12],[1.0])|   11|
|    Dinesh Karthik|         13.0|(19,[13],[1.0])|    9|
|  Mosaddek Hossain|         14

In [41]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Isboundary',
                                       'Iswicket',
                                       'Over',
                                       'Runs',
                                       'Batsman_Index',
                                       'Bowler_Index',
                                       'Batsman_OHE',
                                       'Bowler_OHE'],
                           outputCol='vector')

fill_null_data = OHE_data.fillna(0)

final_data = assembler.transform(fill_null_data)

final_data.show()

+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+---------+-------------+------------+---------------+--------------+--------------------+
|     Batsman_Name|       Bowler_Name|          Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|Timestamp|Batsman_Index|Bowler_Index|    Batsman_OHE|    Bowler_OHE|              vector|
+-----------------+------------------+--------------------+------+---------+------+----------+--------+----+----+---------+-------------+------------+---------------+--------------+--------------------+
|   Mohammed Shami| Mustafizur Rahman|OUT! Bowled! 5-fe...|     W|    28994|  true|         0|       1|49.6|   0|     null|         18.0|         0.0|(19,[18],[1.0])|(11,[0],[1.0])|(36,[1,2,4,24,25]...|
|Bhuvneshwar Kumar| Mustafizur Rahman|WIDE AND RUN OUT!...|  W+wd|     5132|  true|         0|       1|49.6|   1|     null|         16.0|         0.0|(19,[16],[1.0])|(11,[0],[1.0])|(36,[1,

In [42]:
from pyspark.ml import Pipeline

# create a sample dataframe
sample_df = spark.createDataFrame([
    (1, 'L101', 'R'),
    (2, 'L201', 'C'),
    (3, 'D111', 'R'),
    (4, 'F210', 'R'),
    (5, 'D110', 'C')
], ['id', 'category_1', 'category_2'])

sample_df.show()

+---+----------+----------+
| id|category_1|category_2|
+---+----------+----------+
|  1|      L101|         R|
|  2|      L201|         C|
|  3|      D111|         R|
|  4|      F210|         R|
|  5|      D110|         C|
+---+----------+----------+



In [41]:
# define stage 1 : transform the column category_1 to numeric
stage_1 = StringIndexer(inputCol= 'category_1', outputCol= 'category_1_index')
# define stage 2 : transform the column category_2 to numeric
stage_2 = StringIndexer(inputCol= 'category_2', outputCol= 'category_2_index')
# define stage 3 : one hot encode the numeric category_2 column
stage_3 = OneHotEncoder(inputCols=['category_2_index'], outputCols=['category_2_OHE'])

# setup the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3])

# fit the pipeline model and transform the data as defined
pipeline_model = pipeline.fit(sample_df)
sample_df_updated = pipeline_model.transform(sample_df)

# view the transformed data
sample_df_updated.show()

+---+----------+----------+----------------+----------------+--------------+
| id|category_1|category_2|category_1_index|category_2_index|category_2_OHE|
+---+----------+----------+----------------+----------------+--------------+
|  1|      L101|         R|             3.0|             0.0| (1,[0],[1.0])|
|  2|      L201|         C|             4.0|             1.0|     (1,[],[])|
|  3|      D111|         R|             1.0|             0.0| (1,[0],[1.0])|
|  4|      F210|         R|             2.0|             0.0| (1,[0],[1.0])|
|  5|      D110|         C|             0.0|             1.0|     (1,[],[])|
+---+----------+----------+----------------+----------------+--------------+



### Supervised Learning Algorithm
- Classification
  - Logistic regression
  - Decision tree
  - Random forest
  - Gradient-bossted tree
  - ...

- Regression
  - Linear regression 
  - Decision tree regression
  - Random forest regression
  - Gradient-boosted regression
  - ... 
  
[참고](https://spark.apache.org/docs/latest/ml-classification-regression.html#classification)


In [43]:
from pyspark.ml.classification import LogisticRegression

# create a sample dataframe with 4 features and 1 label column
sample_data_train = spark.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

# view the data
sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



In [44]:
# define stage 1: transform the column feature_2 to numeric
stage_1 = StringIndexer(inputCol= 'feature_2', outputCol= 'feature_2_index')

# define stage 2: transform the column feature_3 to numeric
stage_2 = StringIndexer(inputCol= 'feature_3', outputCol= 'feature_3_index')

# define stage 3: one hot encode the numeric versions of feature 2 and 3 generated from stage 1 and stage 2
stage_3 = OneHotEncoder(inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], 
                                 outputCols= ['feature_2_encoded', 'feature_3_encoded'])

# define stage 4: create a vector of all the features required to train the logistic regression model 
stage_4 = VectorAssembler(inputCols=['feature_1', 'feature_2_encoded', 'feature_3_encoded', 'feature_4'],
                          outputCol='features')

# define stage 5: logistic regression model                          
stage_5 = LogisticRegression(featuresCol='features',labelCol='label')

# setup the pipeline
regression_pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, stage_5])

# fit the pipeline for the trainind data
model = regression_pipeline.fit(sample_data_train)
# transform the data
sample_data_train = model.transform(sample_data_train)

# view some of the columns generated
sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.956871848873...|[5.84972108437629...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-20.158269476976...|[1.75944137519440...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.0148602858563...|[0.99999998499466...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[24.5051237560211...|[0.99999999997721...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-50.288624611182...|[1.44519958724341...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.3280841853910...|[0.99999998902980...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-17.986823547341...|[1.54319845459293...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+



In [45]:
# create a sample data without the labels
sample_data_test = spark.createDataFrame([
    (3.0, 'Z', 'S10', 40),
    (1.0, 'X', 'E10', 20),
    (4.0, 'A', 'S20', 10),
    (3.0, 'A', 'S10', 20),
    (4.0, 'X', 'D10', 30),
    (1.0, 'Z', 'E10', 20),
    (4.0, 'A', 'S10', 30),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4'])

# transform the data using the pipeline
sample_data_test = model.transform(sample_data_test)

# see the prediction on the test data
sample_data_test.select('features', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(7,[0,3,6],[3.0,1...|[21.9361235191363...|[0.99999999970265...|       0.0|
|[1.0,1.0,0.0,0.0,...|[-19.516019417755...|[3.34426325212871...|       1.0|
|(7,[0,2,6],[4.0,1...|[-22.297362790363...|[2.07194574533260...|       1.0|
|[3.0,0.0,1.0,1.0,...|[-12.779832278243...|[2.81700837724637...|       1.0|
|[4.0,1.0,0.0,0.0,...|[-24.163863117971...|[3.20455394170236...|       1.0|
|(7,[0,4,6],[1.0,1...|[-22.543286459710...|[1.62022409523199...|       1.0|
|[4.0,0.0,1.0,1.0,...|[-10.456293062940...|[2.87658445082044...|       1.0|
+--------------------+--------------------+--------------------+----------+

