## Pyspark Mllib
### Linear Regression

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Regress').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

In [None]:
고용_df = spark.read.csv("Employee.csv",header=True,encoding='cp949',inferSchema=True)
고용_df.printSchema()

### 자료의 척도 변환: log 변환

In [None]:
from pyspark.sql.functions import log, exp

고용_df = 고용_df.withColumn("salary_ln",log(고용_df.salary))
고용_df = 고용_df.withColumn("salbegin_ln",log(고용_df.salbegin))

고용_df.show()

### Handling categorical features

In [None]:
from pyspark.ml.feature import StringIndexer

요인처리 = StringIndexer(inputCols=['gender','jobcat','minority'],
                    outputCols=['gender_indexed','jobcat_indexed','min_indexed'])
고용_df = 요인처리.fit(고용_df).transform(고용_df)
고용_df.show()

### 다범주 변수 가변수 처리

In [None]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
중간과정 = StringIndexer(inputCol='jobcat',outputCol='jobcat_step1').fit(고용_df).transform(고용_df)
고용_df = OneHotEncoder(inputCol='jobcat_step1',outputCol="jobcat_vec").fit(중간과정).transform(중간과정)
고용_df.show()

### 상호작용(Interaction)

In [None]:
from pyspark.ml.feature import Interaction

In [None]:
상호작용 = Interaction()
상호작용.setInputCols(["gender_indexed", "jobcat_vec"])
상호작용.setOutputCol("gender_jobcat")
고용_df = 상호작용.transform(고용_df)
고용_df.show()

### 설명변수 설정

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
설명벡터 = VectorAssembler(inputCols=['salbegin_ln','jobtime','educ'], outputCol="설명변수")
변수묶음 = 설명벡터.transform(고용_df)
변수묶음["salary_ln","설명변수"].show()

In [None]:
분석자료 = 변수묶음.select("salary","설명변수","salary_ln")
분석자료.show()

### 학습자료와 검증자료 분할

In [None]:
from pyspark.ml.regression import LinearRegression
학습자료, 검증자료 = 분석자료.randomSplit([0.75,0.25],seed=1)

### 회귀분석

In [None]:
회귀분석 = LinearRegression(featuresCol="설명변수",labelCol="salary_ln",predictionCol="예측값")
회귀적합 = 회귀분석.fit(학습자료)

In [None]:
회귀적합.coefficients

In [None]:
회귀적합.intercept

In [None]:
print("R2:",회귀적합.summary.r2)
print("MSE:",회귀적합.summary.meanSquaredError)

In [None]:
설명변수명 = ['salbegin','jobtime','educ']

import pandas as pd
pd.DataFrame({"coefficients":회귀적합.coefficients}, index=설명변수명)

In [None]:
적합검증 = 회귀적합.evaluate(검증자료)
적합검증.predictions.show()

In [None]:
예측자료 = 적합검증.predictions
예측자료 = 예측자료.withColumn("salary_예측값",exp(예측자료.예측값))
예측자료.show()

In [None]:
### 모든 변수 적용

In [None]:
설명벡터 = VectorAssembler(inputCols=['salbegin_ln','jobtime','educ','gender_indexed','jobcat_vec','min_indexed','gender_jobcat'], 
                       outputCol="설명변수")
변수묶음 = 설명벡터.transform(고용_df)
변수묶음.show()

In [None]:
최종자료 = 변수묶음.select("설명변수","salary","salary_ln")
최종자료.show()
학습자료, 검증자료 = 최종자료.randomSplit([0.75,0.25])

In [None]:
회귀분석 = LinearRegression(featuresCol="설명변수",labelCol="salary_ln")
회귀적합 = 회귀분석.fit(학습자료)


In [None]:
회귀적합.coefficients

In [None]:
회귀적합.summary.predictions.show()

In [None]:
회귀적합.summary.residuals.show()

In [None]:
적합검증 = 회귀적합.evaluate(검증자료)
적합검증.predictions.show()

In [None]:
예측자료 = 적합검증.predictions
예측자료 = 예측자료.withColumn("salary_예측값",exp(예측자료.prediction))
예측자료.show()

## 의사결정나무

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

회귀나무 = DecisionTreeRegressor(featuresCol="설명변수",labelCol="salary")
회귀나무적합 = 회귀나무.fit(학습자료)

In [None]:
회귀나무적합.featureImportances

In [None]:
회귀나무검증 = 회귀나무적합.transform(검증자료)
회귀나무검증.show()

In [None]:
회귀나무 = DecisionTreeRegressor(featuresCol="설명변수",labelCol="salary_ln")
회귀나무적합 = 회귀나무.fit(학습자료)
회귀나무검증 = 회귀나무적합.transform(검증자료)
회귀나무검증 = 회귀나무검증.withColumn("salary_예측값",exp(회귀나무검증.prediction))
예측자료.show()

## Gradient-boosted tree regression

In [None]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="설명변수",labelCol="salary_ln", maxIter=10)
gbt적합 = gbt.fit(학습자료)
gbt검증 = gbt적합.transform(검증자료)

In [None]:
gbt검증.withColumn("salary_예측값",exp(gbt검증.prediction)).show()
