In [1]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionModel
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1481496952186_0006,pyspark,idle,Link,Link,✔


SparkContext available as 'sc'.
HiveContext available as 'sqlContext'.


In [2]:
def parsePoint(line):
    values = [float(x) for x in line.split(',')[0:7]]
    return LabeledPoint(values[0], values[1:])
input = sc.textFile("wasb:///autos.csv")
featurevector = input.map(lambda line : parsePoint(line)).persist()

In [3]:
labels = featurevector.map(lambda x: x.label)
features = featurevector.map(lambda x: x.features)
scaler = StandardScaler().fit(features)
scaledData = scaler.transform(features)
newfeatures = labels.zip(scaledData)
allset = newfeatures.map(lambda line: LabeledPoint(line[0], line[1]))

In [4]:
training, test = allset.randomSplit(weights=[0.7, 0.3], seed=1)

In [5]:
numIterations = 100
stepSize = 0.001

In [6]:
algorithm = LinearRegressionWithSGD.train(training, iterations=numIterations, step=stepSize, intercept=True)

In [7]:
valuesAndPreds = test.map(lambda p: (p.label, algorithm.predict(p.features)))
valuesAndPreds.take(20)

[(18.0, 22.128269194125018), (18.0, 22.0312203809492), (15.0, 22.025508507968652), (14.0, 21.87321961819093), (14.0, 22.055806781122165), (15.0, 21.882771171766098), (10.0, 22.592864010531148), (28.0, 22.535309934809128), (25.0, 22.374082240371724), (16.0, 22.698173356590914), (17.0, 22.682959060741549), (19.0, 22.669276631439665), (14.0, 22.484461180400736), (12.0, 22.496700249536605), (13.0, 22.525151818370045), (18.0, 22.546630266507471), (30.0, 22.400811906735715), (24.0, 22.816716210033032), (14.0, 22.7762419285242), (15.0, 22.905988531326884)]

In [8]:
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 53.8697651551