In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local[4]") \
.appName("RegressionTuningWithAdvertisementData") \
.config("spark.driver.memory","2g") \
.config("spark.executer.memory","4g") \
.getOrCreate()

# Veri Okuma

In [3]:
df = spark.read.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("sep",",") \
.load("D:\\Datasets\LifeExpectancyData.csv")

In [4]:
df.toPandas().head()

Unnamed: 0,Country,Year,Status,Life expectancy,Adult Mortality,infant deaths,Alcohol,percentage expenditure,Hepatitis B,Measles,...,Polio,Total expenditure,Diphtheria,HIV/AIDS,GDP,Population,thinness 1-19 years,thinness 5-9 years,Income composition of resources,Schooling
0,Afghanistan,2015,Developing,65.0,263.0,62,0.01,71.279624,65.0,1154,...,6.0,8.16,65.0,0.1,584.25921,33736494.0,17.2,17.3,0.479,10.1
1,Afghanistan,2014,Developing,59.9,271.0,64,0.01,73.523582,62.0,492,...,58.0,8.18,62.0,0.1,612.696514,327582.0,17.5,17.5,0.476,10.0
2,Afghanistan,2013,Developing,59.9,268.0,66,0.01,73.219243,64.0,430,...,62.0,8.13,64.0,0.1,631.744976,31731688.0,17.7,17.7,0.47,9.9
3,Afghanistan,2012,Developing,59.5,272.0,69,0.01,78.184215,67.0,2787,...,67.0,8.52,67.0,0.1,669.959,3696958.0,17.9,18.0,0.463,9.8
4,Afghanistan,2011,Developing,59.2,275.0,71,0.01,7.097109,68.0,3013,...,68.0,7.87,68.0,0.1,63.537231,2978599.0,18.2,18.2,0.454,9.5


# Nitelik isimlerini Değiştir

In [5]:
new_cols = ["Country", "Year", "Status", "label", "AdultMortality",
      "InfantDeaths", "Alcohol", "PercentageExpenditure", "HepatitisB", "Measles", "BMI", "UnderFiveDeaths",
      "Polio", "TotalExpenditure", "Diphtheria", "HIV_AIDS", "GDP", "Population", "Thinness119", "Thinness59",
      "IncomeCompositionOfResources", "Schooling"]

In [6]:
df2 = df.toDF(*new_cols)

In [7]:
df2.toPandas().head()

Unnamed: 0,Country,Year,Status,label,AdultMortality,InfantDeaths,Alcohol,PercentageExpenditure,HepatitisB,Measles,...,Polio,TotalExpenditure,Diphtheria,HIV_AIDS,GDP,Population,Thinness119,Thinness59,IncomeCompositionOfResources,Schooling
0,Afghanistan,2015,Developing,65.0,263.0,62,0.01,71.279624,65.0,1154,...,6.0,8.16,65.0,0.1,584.25921,33736494.0,17.2,17.3,0.479,10.1
1,Afghanistan,2014,Developing,59.9,271.0,64,0.01,73.523582,62.0,492,...,58.0,8.18,62.0,0.1,612.696514,327582.0,17.5,17.5,0.476,10.0
2,Afghanistan,2013,Developing,59.9,268.0,66,0.01,73.219243,64.0,430,...,62.0,8.13,64.0,0.1,631.744976,31731688.0,17.7,17.7,0.47,9.9
3,Afghanistan,2012,Developing,59.5,272.0,69,0.01,78.184215,67.0,2787,...,67.0,8.52,67.0,0.1,669.959,3696958.0,17.9,18.0,0.463,9.8
4,Afghanistan,2011,Developing,59.2,275.0,71,0.01,7.097109,68.0,3013,...,68.0,7.87,68.0,0.1,63.537231,2978599.0,18.2,18.2,0.454,9.5


In [8]:
# Model oluşturulduğu için tekrar nitelik seçmeye gerek yok. 
# Aşağıda sadece regresyona girecek nitelikler var.

In [9]:
categorical_cols = ["Status"]
numerical_cols = ["Year", "AdultMortality",
      "InfantDeaths", "Alcohol",   "BMI", "UnderFiveDeaths",
      "TotalExpenditure", "Diphtheria", "HIV_AIDS", "GDP",  
      "IncomeCompositionOfResources", "Schooling"]
label = ["label"]

# Veri Temizliği 

In [10]:
df3 = df2.na.drop()

In [11]:
df3.count()

1649

# Veri Ön Hazırlığı

In [12]:
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression, LinearRegressionModel

# StringIndexer

In [13]:
# Country'de kategori sayısı çok fazla olduğundan analize dahil etmedik
status_string_indexer = StringIndexer().setInputCol("Status").setOutputCol("StatusIndexed")

# OneHotEncoder

In [14]:
encoder = OneHotEncoderEstimator().setInputCols(["StatusIndexed"]).setOutputCols(["StatusEncoded"])

# VectorAssembler

In [15]:
vector_assembler = VectorAssembler().setInputCols(numerical_cols + encoder.getOutputCols()).setOutputCol("features")

# LinearModel

In [16]:
lr_obj = LinearRegression().setFeaturesCol("features").setLabelCol("label")

# Pipeline

In [19]:
pipeline_object = Pipeline().setStages([status_string_indexer, encoder, vector_assembler, lr_obj])

# MODEL TUNING

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


# paramGrid

In [25]:
param_grid = ParamGridBuilder() \
.addGrid(lr_obj.aggregationDepth, [2,5]) \
.addGrid(lr_obj.elasticNetParam, [0.0, 0.2, 0.7]) \
.addGrid(lr_obj.epsilon, [1.35, 1.55]) \
.addGrid(lr_obj.maxIter, [10, 20]) \
.addGrid(lr_obj.regParam, [0.00, 0.01, 0.05]) \
.addGrid(lr_obj.solver, ["auto", "normal", "l-bfgs"]) \
.addGrid(lr_obj.tol, [1.0E-6, 1.0E-4]) \
.build()

# Cross-Validation

In [29]:
cv = CrossValidator() \
.setEstimator(pipeline_object) \
.setEvaluator(RegressionEvaluator()) \
.setEstimatorParamMaps(param_grid) \
.setNumFolds(5) \
.setParallelism(2)

In [53]:
df_train, df_test = df3.randomSplit([0.9, 0.2], seed=142)

In [54]:
df_train.cache()
df_test.cache()

DataFrame[Country: string, Year: int, Status: string, label: double, AdultMortality: int, InfantDeaths: int, Alcohol: double, PercentageExpenditure: double, HepatitisB: int, Measles: int, BMI: double, UnderFiveDeaths: int, Polio: int, TotalExpenditure: double, Diphtheria: int, HIV_AIDS: double, GDP: double, Population: double, Thinness119: double, Thinness59: double, IncomeCompositionOfResources: double, Schooling: double]

In [55]:
cv_model = cv.fit(df_train)

In [56]:
cv_model.transform(df3).select("label","prediction").toPandas().head()

Unnamed: 0,label,prediction
0,65.0,63.553597
1,59.9,63.351518
2,59.9,63.370156
3,59.5,63.344286
4,59.2,62.869877


# En İyi Model

In [57]:
# En iyi pipeline model
best_model = cv_model.bestModel

In [58]:
# En iyi pipeline model içinden lr modeli almak
lr_model = best_model.stages[-1]

In [59]:
lr_model.coefficients

DenseVector([-0.1339, -0.0166, 0.0813, -0.1013, 0.032, -0.0619, 0.1044, 0.0151, -0.4454, 0.0001, 9.8021, 0.9331, -0.8385])

In [60]:
lr_model.intercept

322.18818722247215

In [61]:
lr_model.summary.r2

0.8392000429810356

In [62]:
lr_model.explainParams().split("\n")

['aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2, current: 2)',
 'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.0)',
 'epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. (default: 1.35, current: 1.35)',
 'featuresCol: features column name (default: features, current: features)',
 'fitIntercept: whether to fit an intercept term (default: True)',
 'labelCol: label column name (default: label, current: label)',
 'loss: The loss function to be optimized. Supported options: squaredError, huber. (Default squaredError) (default: squaredError)',
 'maxIter: maximum number of iterations (>= 0) (default: 100, current: 10)',
 'predictionCol: prediction column name (default: prediction)',
 'regParam: regularization parameter (>= 0) (default: 0.0, current: 0.0)',
 'solver: The solver algorithm for optimization. Supported 