In [1]:
from pyspark.sql.types import *

customSchema = StructType([ \
    StructField("Date", StringType(), True), \
    StructField("dji", DoubleType(), True), \
    StructField("hsi", DoubleType(), True), \
    StructField("n225", DoubleType(), True), \
    StructField("aord", DoubleType(), True), \
    StructField("sp500", DoubleType(), True)])
FinData = spark.read.csv('/FileStore/tables/COMP4651_Data.csv', header="true", inferSchema="true", schema = customSchema)

In [2]:
from pyspark.sql.types import *

sqlContext.registerDataFrameAsTable(FinData, "Fin_data")

In [3]:
df = sqlContext.table("Fin_data")

display(df.describe())

summary,Date,dji,hsi,n225,aord,sp500
count,660,660.0,660.0,660.0,660.0,660.0
mean,,0.0002409798868014485,0.00010510285297831629,0.00016048888547283584,0.0001604286571749024,0.00022749265187408825
stddev,,0.0036357862767214,0.0045344787575142,0.0043289037690509,0.0029367049584505,0.0035516040376773
min,2016-12-06T00:00:00.000+0000,-0.0204738647505688,-0.0228089355566476,-0.0210232845440101,-0.0142777421916431,-0.0181719847344359
max,2019-11-29T00:00:00.000+0000,0.0132144612449955,0.0179149054433924,0.0139183306911983,0.0124347107877396,0.0127868602460146


In [4]:
%sql
select dji, n225, aord, hsi, if (sp500 > 0,1, 0 ) AS sp500 from Fin_data WHERE dji != 0 and hsi != 0 and n225 != 0 and aord != 0

dji,n225,aord,hsi,sp500
0.0008024547939964322,0.0020282743343251,0.0022697755797,0.0032605302181156,1
0.0066674267692663,0.0032085772273546,0.0038456948067122,0.0024021931617497,1
0.0014458167407331,0.0062654573871796,0.0049614657746932,0.0011588087007963,1
0.0031335761925825,0.0053111377880954,0.0013011499434512,-0.0019202119057624,1
0.0008691777746925311,0.0036122202753956,0.0002551511971029541,-0.0063032316631632,0
0.0025108015263546,0.002159634137544,-0.0014244430908787,0.0002647526928427624,1
-0.0025963768148935,6.970173018583381e-05,0.0030136920702648,0.00019188637334366376,0
0.0013082255421865,0.000454945223486547,-0.0034559267505551,-0.0077506700611635,1
-0.0001932128168071756,0.0028603805300386,-0.000411575848936252,-0.0007615970312029319,0
0.0008669258270321833,-0.0002138467628247653,0.0017910372462281,-0.0037250632410321,1


In [5]:
from pyspark.ml.feature import VectorAssembler

datasetDF = sqlContext.sql("select dji, n225, aord, hsi, if (sp500 > 0,1, 0 ) AS sp500 from Fin_data WHERE dji != 0 and hsi != 0 and n225 != 0 and aord != 0")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["dji", "n225", "aord", "hsi"])
vectorizer.setOutputCol("features")

In [6]:
type(FinData)
#display(datasetDF.describe())

In [7]:
# We'll hold out 15% of our data for testing and leave 85% for training
(split25DF, split75DF) = datasetDF.randomSplit([0.25,0.75],seed= 1900009193)

# Let's cache these datasets for performance
testSetDF = split25DF.cache()
trainingSetDF = split75DF.cache()

In [8]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

lr = LogisticRegression()

# We use explain params to dump the parameters we can use
#print(lr.explainParams())

In [9]:
from pyspark.ml import Pipeline
# Now we set the parameters for the method
lr.setPredictionCol("Prediction_sp500")\
  .setLabelCol("sp500")\
  .setMaxIter(100)\
  .setRegParam(0.15)


# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline()

lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
lrModel = lrPipeline.fit(trainingSetDF)

In [10]:
# The intercept is as follows:
intercept = lrModel.stages[1].intercept

# The coefficents (i.e., weights) are as follows:
weights = lrModel.stages[1].coefficients
print("intercept is: ", intercept)
print("weights are:", weights)

In [11]:
resultsDF = lrModel.transform(testSetDF).select("dji", "n225", "aord", "hsi", "sp500", "Prediction_sp500")

display(resultsDF)

dji,n225,aord,hsi,sp500,Prediction_sp500
-0.0204738647505688,-0.0111980272264053,-0.0071269938452021,-0.0047759492473549,0,0.0
-0.0184045093605211,0.0048978144228728,0.0009935828045808393,0.001830384148902,0,0.0
-0.0127673320642074,-0.0076222866579929,-0.0086818858851311,-0.0125585274863651,0,0.0
-0.0101801384312638,0.0003829986177841249,0.0011688889414487,0.0005299470740682821,0,0.0
-0.0093618005327122,-0.0172467013833568,-0.0121680150940086,-0.0156435718851621,0,0.0
-0.0087280854233924,-0.0124972221290198,-0.005967205301451,-0.0040800938481559,0,0.0
-0.0079418670711577,-0.0048483334898694,-0.0030970407423422,0.0021959615249285,0,0.0
-0.0077870194641427,-0.0022939511521045,-0.0045272659938464,-0.000725836426225257,0,0.0
-0.0076078993804742,0.0037213604888499,0.0024205167624069,0.0054469162703929,0,0.0
-0.007353333476864,-0.0068184989652309,-0.002963430028946,0.0028003300722057,0,0.0


In [12]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Prediction_sp500", labelCol="sp500", metricName="r2")

# Run the evaluator on the DataFrame
r2 = regEval.evaluate(resultsDF)

print("r2: %.2f" % r2)

In [13]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.05 to 0.50
regParam = [x / 200.0 for x in range(1, 11)]

# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
cvModel = crossval.fit(trainingSetDF).bestModel

In [14]:
resultsDF = cvModel.transform(testSetDF)

# Now let's compute the r2 evaluation metric for our test dataset
r2New = regEval.evaluate(resultsDF, {regEval.metricName: 'r2'})
print("r2: %.2f" % r2)

In [15]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

def evaluate(predictionAndLabels):
  log = {}
  test = predictionAndLabels.select('sp500','Prediction_sp500').rdd.map(lambda line: (float(line[1]),float(line[0])))
  print(predictionAndLabels)
  # Instantiate metrics object
  metrics = BinaryClassificationMetrics(test)
  # Area under precision-recall curve
  print("Area under PR = %s" % metrics.areaUnderPR)
  # Area under ROC curve
  print("Area under ROC = %s" % metrics.areaUnderROC)

  metrics = MulticlassMetrics(test)
  
  # Confusion Matrix
  print(metrics.confusionMatrix().toArray())
  
  # Overall statistics
  log['precision'] = "%s" % metrics.precision()
  log['recall'] = "%s" % metrics.recall()
  log['F1 Measure'] = "%s" % metrics.fMeasure()
  print("[Overall]\tprecision = %s | recall = %s | F1 Measure = %s" % \
        (log['precision'], log['recall'], log['F1 Measure']))
  
  # Statistics by class
  labels = [0.0, 1.0]
  for label in sorted(labels):
    log[label] = {}
    log[label]['precision'] = "%s" % metrics.precision(label)
    log[label]['recall'] = "%s" % metrics.recall(label)
    log[label]['F1 Measure'] = "%s" % metrics.fMeasure(label,beta=1.0)
    print("[Class %s]\tprecision = %s | recall = %s | F1 Measure = %s" \
              % (label, log[label]['precision'], 
                log[label]['recall'], log[label]['F1 Measure']))
  
evaluate(resultsDF)