In [1]:
# Import Spark SQL and Spark ML libraries
from pyspark.sql.types import *
from pyspark.sql.functions import *

from sklearn import cross_validation
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, accuracy_score
from sklearn.ensemble import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression , DecisionTreeRegressor
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
data = sqlContext.sql("SELECT Cast(loan_amnt as Double),Cast(Substring(int_rate,1,4) as Double) as int_rate,Cast(total_pymnt as Double),Cast(installment as Double) as label from loan")
data=data.dropna();
data.show(30)
data.printSchema()

In [3]:
#Feature selection
data_fea = data.select("int_rate","label","loan_amnt")

#Data split to train and test samples. 70% for training and 30% for testing
splits = data_fea.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print "We have %d training examples and %d test examples." % (train.count(), test.count())
#display(train)

In [4]:
#Model1 - Linear Regression
vectorAssembler = VectorAssembler(inputCols=["int_rate","loan_amnt"], outputCol="features")
lr = LinearRegression(labelCol="label",featuresCol="features", maxIter=10, regParam=0.3)
pipeline = Pipeline(stages=[vectorAssembler, lr])

In [5]:
#Using trainvalidationsplit
#It's going to use 80% of the data that it's got in its training set to train the model and then the remaining 20% is going to use to validate the trained model.
paramGrid1 = ParamGridBuilder().addGrid(lr.regParam, [0.3, 0.01]).addGrid(lr.maxIter, [10, 5]).build()
tvs = TrainValidationSplit(estimator=pipeline, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid1, trainRatio=0.8)
model1 = tvs.fit(train)

In [6]:
prediction1 = model1.transform(test)
# LinearRegression
predicted1 = prediction1.select("*")
display(predicted1)

In [7]:
# LinearRegression: predictionCol="prediction", metricName="rmse"
evaluator1 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse1 = evaluator1.evaluate(prediction1)
print "Root Mean Square Error (RMSE):", rmse1

In [8]:
#Model2 - GBT regressor
vectorAssembler2 = VectorAssembler(inputCols=["int_rate","loan_amnt"], outputCol="features")
dt = DecisionTreeRegressor(labelCol="label", featuresCol="features")
pipeline2 = Pipeline(stages=[vectorAssembler2, dt])

In [9]:
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble

paramGrid2 = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [2, 10])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
#evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
#cv = CrossValidator(estimator=gbt, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid2, numFolds=10)

tvs2 = TrainValidationSplit(estimator=pipeline2, evaluator=RegressionEvaluator(), estimatorParamMaps=paramGrid2, trainRatio=0.8)
model2 = tvs2.fit(train)

In [10]:
prediction2 = model2.transform(test)
predicted2 = prediction2.select("*")
display(predicted2)


In [11]:
rmse = evaluator1.evaluate(prediction2)
print "RMSE on our test set: %g" % rmse