# logistic regression and linear svm do link prediction
## logistic regression
load data

In [2]:
df = sqlContext.read.format('csv').option("header", 'true').load("/FileStore/tables/pandas_train.csv")
# test = sqlContext.read.format('csv').option("header", 'true').load("/FileStore/tables/pandas_test.csv")
df.cache()

Above schema shows that values are string, we need to transform them into double

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.functions import col  # for indicating a column using a string in the line below
df = df.select([col(c).cast("double").alias(c) for c in df.columns])
# test = test.select([col(c).cast("double").alias(c) for c in test.columns])
df.printSchema()

In [5]:
# pack features 
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featuresCols = df.columns
featuresCols.remove('label')
# This concatenates all feature columns into a single feature vector in a new column "rawFeatures".
vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")
# This identifies categorical features and indexes them.
vectorIndexer = VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4)

In [6]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer])

In [7]:
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
display(df)
# pipelineModeltest = pipeline.fit(test)
# test = pipelineModeltest.transform(test)

label,RPR,simRank,katz_score,social,rawFeatures,features
0.0,0.5,1.0,0.0,0.0,"List(1, 4, List(), List(0.5, 1.0, 0.0, 0.0))","List(1, 4, List(), List(0.5, 1.0, 0.0, 0.0))"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"
0.0,0.0,0.0,0.0,0.0,"List(0, 4, List(), List())","List(0, 4, List(), List())"


In [8]:
display(df.select("label",*featuresCols))

label,RPR,simRank,katz_score,social
0.0,0.5,1.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0
0.0,0.0,0.0,0.0,0.0


In [9]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())

In [10]:
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
# Train model with Training Data
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print ("roca = ", evaluator.evaluate(predictions))
# the defualt matrix is areadUnderROC
print("defalut metric is",evaluator.getMetricName())
evaluator.setMetricName('areaUnderPR')
print("precision-recall area is",evaluator.evaluate(predictions))

In [12]:
# see which parameter we can explain 
print(lr.explainParams())

In [13]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
3.2504491066521664
-1.953569094311836
0.7127240770981456
1.731381859902649


In [14]:
evaluator.setMetricName('areaUnderROC')
print("area under roc=",evaluator.evaluate(predictions))
evaluator.setMetricName('areaUnderPR')
print("area under precison-recall=",evaluator.evaluate(predictions))

In [15]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415553))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415564))"
0.0,0.0,"List(1, 2, List(), List(0.9783886569295844, 0.021611343070415564))"


#linear SVM

In [17]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(labelCol="label", featuresCol="features",maxIter=10, regParam=0.1)
lsvcModel = lsvc.fit(trainingData)
predictions = lsvcModel.transform(testData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("roc area = ",evaluator.evaluate(predictions))
evaluator.setMetricName('areaUnderPR')
print("precision-recall area",evaluator.evaluate(predictions))

In [18]:
print(lsvc.explainParams())

In [19]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.1,0.5,1])
             .addGrid(lsvc.maxIter, [100,150,200])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lsvc, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)
# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF)

Feature Weight
0.1361175748851809
-0.1867408140381855
0.1377249623025839
-0.4820755381717728


In [20]:
evaluator.setMetricName('areaUnderROC')
print("area under roc=",evaluator.evaluate(predictions))
evaluator.setMetricName('areaUnderPR')
print("area under precison-recall=",evaluator.evaluate(predictions))
print('Model Intercept: ', cvModel.bestModel.intercept)

#decision tree

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier
# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
# Train model with Training Data
dtModel = dt.fit(trainingData)
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)
predictions.printSchema()

In [23]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"
0.0,0.0,"List(1, 2, List(), List(0.9754724567752312, 0.024527543224768796))"


In [24]:
# Entropy and the Gini coefficient are the supported measures of impurity for Decision Trees. This is Gini by default. Changing this value is simple, model.setImpurity("Entropy").
dt.getImpurity()

In [25]:
# Now we will try tuning the model with the ParamGridBuilder and the CrossValidator.

# As we indicate 3 values for maxDepth and 3 values for maxBin, this grid will have 3 x 3 = 9 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6, 10])
             .addGrid(dt.maxBins, [20, 40, 80])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# Takes ~5 minutes
print("numNodes = ", cvModel.bestModel.numNodes)
print("depth = ", cvModel.bestModel.depth)
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# View Best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.9968652037617555, 0.003134796238244514))"
0.0,0.0,"List(1, 2, List(), List(1.0, 0.0))"
0.0,0.0,"List(1, 2, List(), List(0.9986263736263736, 0.0013736263736263737))"
0.0,0.0,"List(1, 2, List(), List(0.9968652037617555, 0.003134796238244514))"
0.0,0.0,"List(1, 2, List(), List(0.9986263736263736, 0.0013736263736263737))"
0.0,0.0,"List(1, 2, List(), List(0.9491525423728814, 0.05084745762711865))"
0.0,0.0,"List(1, 2, List(), List(0.9491525423728814, 0.05084745762711865))"
0.0,0.0,"List(1, 2, List(), List(0.9968652037617555, 0.003134796238244514))"
0.0,0.0,"List(1, 2, List(), List(0.9968652037617555, 0.003134796238244514))"
0.0,0.0,"List(1, 2, List(), List(0.9968652037617555, 0.003134796238244514))"


In [26]:
# Evaluate best model
evaluator.setMetricName('areaUnderROC')
print("area under roc=",evaluator.evaluate(predictions))
evaluator.setMetricName('areaUnderPR')
print("area under precison-recall=",evaluator.evaluate(predictions))

#random forest

In [28]:
# Random Forests uses an ensemble of trees to improve model accuracy. You can read more about Random Forest from the classification and regression section of MLlib Programming Guide.
from pyspark.ml.classification import RandomForestClassifier
# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
# Train model with Training Data
rfModel = rf.fit(trainingData)
# Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(testData)
predictions.printSchema()

In [29]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "probability")
display(selected)

label,prediction,probability
0.0,0.0,"List(1, 2, List(), List(0.9667447781905227, 0.03325522180947726))"
0.0,0.0,"List(1, 2, List(), List(0.9393369064262143, 0.06066309357378583))"
0.0,0.0,"List(1, 2, List(), List(0.9813024172915978, 0.018697582708402185))"
0.0,0.0,"List(1, 2, List(), List(0.9667447781905227, 0.03325522180947726))"
0.0,0.0,"List(1, 2, List(), List(0.9813024172915978, 0.018697582708402185))"
0.0,0.0,"List(1, 2, List(), List(0.9813024172915978, 0.018697582708402185))"
0.0,0.0,"List(1, 2, List(), List(0.9813024172915978, 0.018697582708402185))"
0.0,0.0,"List(1, 2, List(), List(0.9667447781905227, 0.03325522180947726))"
0.0,0.0,"List(1, 2, List(), List(0.9667447781905227, 0.03325522180947726))"
0.0,0.0,"List(1, 2, List(), List(0.9543165086887774, 0.04568349131122257))"


In [30]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)
# Now we will try tuning the model with the ParamGridBuilder and the CrossValidator.

# As we indicate 3 values for maxDepth, 2 values for maxBin, and 2 values for numTrees, this grid will have 3 x 2 x 2 = 12 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.
# = 12 parameter settings for CrossValidator to choose from. We will create a 5-fold CrossValidator.

# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [2, 4, 6])
             .addGrid(rf.maxBins, [20, 60])
             .addGrid(rf.numTrees, [5, 20])
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations.  This can take about 6 minutes since it is training over 20 trees!
cvModel = cv.fit(trainingData)
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.setMetricName('areaUnderROC')
print("area under roc=",evaluator.evaluate(predictions))
evaluator.setMetricName('areaUnderPR')
print("area under precison-recall=",evaluator.evaluate(predictions))

bestModel = cvModel.bestModel
# Generate predictions for entire dataset
finalPredictions = bestModel.transform(df)
# Evaluate best model
evaluator.setMetricName('areaUnderROC')
print("area under roc(entire dataset)=",evaluator.evaluate(finalPredictions))
evaluator.setMetricName('areaUnderPR')
print("area under precison-recall(entire dataset)=",evaluator.evaluate(finalPredictions))

finalPredictions.createOrReplaceTempView("finalPredictions")

In [31]:
# cvModel uses the best model found from the Cross Validation
# weights = cvModel.bestModel.coefficients
# weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
# weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
# display(weightsDF)

In [32]:
df = sqlContext.read.format('csv').option("header", 'true').load("/FileStore/tables/weight.csv")
display(df)

feature,weight
RPR,4.948308305364363
simRank,-1.8205038643386973
katz_score,1.7137269737672634
aa,-0.2584003735587221
social,-22.66383817319179
CT,-0.0014594653715631
CST,-1.2092076922359584e-14
CN,-0.1259621945801202
SC,-0.3523584928616498
HP,-0.2631777344963998
