In [1]:
raw_data = sc.textFile("Apple_Dataset.csv")

raw_data.take(2)

['Adj_Open,Adj_High,Adj_Low,Adj_Close',
 '0.414962396,0.416694413,0.414962396,0.414962396']

In [2]:
header = raw_data.first()

dataLines= raw_data.filter(lambda ln: ln not in header)

dataLines.take(2)

['0.414962396,0.416694413,0.414962396,0.414962396',
 '0.395188536,0.395188536,0.393312184,0.393312184']

In [3]:
dataLines.count(), raw_data.count()

(9575, 9576)

In [4]:
csvData=dataLines.map((lambda x: x.split(",")))

In [5]:
csvData.take(2)

[['0.414962396', '0.416694413', '0.414962396', '0.414962396'],
 ['0.395188536', '0.395188536', '0.393312184', '0.393312184']]

In [6]:
import math
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from numpy import array
from pyspark.sql import Row

In [7]:
#labelPoint that MLLIB can use.All data must be numeric
def vector_data(fields):
    Adj_open = float(fields[0])
    Adj_high = float(fields[1])
    Adj_low = float(fields[2])
    Adj_close = float(fields[3])
    return Vectors.dense([Adj_open,Adj_high,Adj_low,Adj_close]) 

In [8]:
autoVectors = csvData.map(vector_data)
autoVectors.take(2)

[DenseVector([0.415, 0.4167, 0.415, 0.415]),
 DenseVector([0.3952, 0.3952, 0.3933, 0.3933])]

In [9]:
def transformToLabelPoint (instr):
    lp = (float(instr[3]),Vectors.dense([instr[0],instr[1],instr[2]]))
    return lp

In [10]:
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

In [11]:
autoLp= autoVectors.map(transformToLabelPoint)
autoDF = sqlcontext.createDataFrame(autoLp,["label","features"])
autoDF.select("label","features").show(2)

+-----------+--------------------+
|      label|            features|
+-----------+--------------------+
|0.414962396|[0.414962396,0.41...|
|0.393312184|[0.395188536,0.39...|
+-----------+--------------------+
only showing top 2 rows



In [12]:
#Split into training and testing data
(trainingData, testData) = autoDF.randomSplit([0.9, 0.1],seed=0)
raw_data.count(),trainingData.count(),testData.count()

(9576, 8608, 967)

In [13]:
testData.take(2)

[Row(label=0.173201696, features=DenseVector([0.1751, 0.1751, 0.1732])),
 Row(label=0.189511522, features=DenseVector([0.1895, 0.1912, 0.1895]))]

In [14]:
#Build the model on training data
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
lrModel = lr.fit(trainingData)

In [15]:
print("Coefficients:"+str(lrModel.coefficients))

Coefficients:[-0.555261962948,0.82677350476,0.727976861395]


In [16]:
print("Intercept:"+str(lrModel.intercept))

Intercept:-0.0012528138087026146


In [17]:
#Predict on test data
predictions = lrModel.transform(testData)
predictions.select("features","label","prediction").show()



+--------------------+-----------+-------------------+
|            features|      label|         prediction|
+--------------------+-----------+-------------------+
|[0.175078048,0.17...|0.173201696|0.17236972398372213|
|[0.189511522,0.19...|0.189511522|0.18959374046995067|
|[0.191243539,0.19...|0.189511522|0.18863201731067164|
|[0.191243539,0.19...|0.189511522|0.18863201731067164|
|[0.193119891,0.19...|0.193119891|0.19320026343994517|
|[0.193119891,0.19...|0.193119891| 0.1946322492053396|
|[0.194851908,0.19...|0.194851908|0.19493139434560391|
|[0.203800662,0.20...|0.202068645|0.20118271612682553|
|[0.207553365,0.20...|0.207553365|0.20762635332094642|
|[0.220110488,0.22...|0.220110488|0.21874506637170585|
|[0.220110488,0.22...|0.220110488|0.22029638449090985|
|[0.227327226,0.22...|0.220110488|0.22070449403294243|
|[0.225595209,0.22...|0.223718857|0.22286104056620326|
|[0.229203577,0.23...|0.227327226|0.22789954903008067|
|[0.232667611,0.23...|0.227327226|0.22740808847691701|
|[0.234543

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator =  RegressionEvaluator(predictionCol="prediction",labelCol="label",metricName="r2")

evaluator.evaluate(predictions)

0.9999620802742194

In [21]:
y = -0.0012528138087026146+(-0.555261962948*0.414962396)+(0.82677350476*0.416694413)+(0.727976861395*0.414962396)
y

0.41492927452568207