In [1]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
import numpy
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

In [2]:
data = sc.textFile("swift://Notebooks.spark/hydrogendata.csv")


In [3]:
print "The first line in the 2015.csv dataset:", data.take(2)

The first line in the 2015.csv dataset: [u'1.000057,646,0.124937,37.4,22.501,-15.7', u'2.000114,646,0.299909,37.4,22.5999,-15.7']


In [4]:
def config_spark_acct(name, auth_url, tenant_id, username, password):
   prefix = "fs.swift.service." + name 
   hconf = sc._jsc.hadoopConfiguration()
   hconf.set(prefix + ".auth.url", auth_url + "/" + tenant_id)
   hconf.set(prefix + ".username", username)
   hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
   hconf.setInt(prefix + ".http.port", 8080)
   hconf.set(prefix + ".apikey", password)
   hconf.setBoolean(prefix + ".public", True)
   hconf.set(prefix + ".use.get.auth", "basic64")
   hconf.setBoolean(prefix + ".location-aware", False)
   hconf.set(prefix + ".password", password)

In [5]:
config_spark_acct("myacct",'https://identity.open.softlayer.com','d58f4bebc9c24fbf813ec53473e2ec9c','user_ce1c2baedfc5e647982f4681b8f177350ea4cce8','H}.cv0^3FqvW=#Ke')

In [6]:
sc.textFile("swift://Notebooks.myacct/hydrogendata.csv")

MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:-2

In [7]:
DataParse = data.map(lambda line : line.split(","))

In [8]:
DataParse.take(1)

[[u'1.000057', u'646', u'0.124937', u'37.4', u'22.501', u'-15.7']]

In [9]:
DataParse.take(2)[0]

[u'1.000057', u'646', u'0.124937', u'37.4', u'22.501', u'-15.7']

In [10]:
from pyspark.sql import SQLContext, Row

In [11]:
sqlContext = SQLContext(sc)

In [12]:
# Separate the header line from the data, and drop the row index
hydrogendata = DataParse.map(lambda x: [float(x[0]), float(x[1]), float(x[2]), float(x[3]), float(x[4]), float(x[5])]).toDF(["Time","VehiclePressure","FlowRate","VehicleTemp","AmbientTemp","ChillerCoilTemp"])

In [13]:
hydrogendata.printSchema()

root
 |-- Time: double (nullable = true)
 |-- VehiclePressure: double (nullable = true)
 |-- FlowRate: double (nullable = true)
 |-- VehicleTemp: double (nullable = true)
 |-- AmbientTemp: double (nullable = true)
 |-- ChillerCoilTemp: double (nullable = true)



In [14]:
#Separate out the label column.
labelCol = "VehiclePressure"
# Get a list of feature column labels.
featureCols = filter(lambda c: c != labelCol, map(lambda c: c.name, hydrogendata.schema.fields))
numFeatures = len(featureCols)#
print 'Our %d features are: %s' % (len(featureCols), ", ".join(featureCols))

Our 5 features are: Time, FlowRate, VehicleTemp, AmbientTemp, ChillerCoilTemp


In [15]:
import numpy
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

In [16]:
def indexColumn(col):
  """ Return dictionary for column: original value -> 0-based index """
  distinctValues = sorted(hydrogendata.select(col).distinct().collect())
  # Map Row(value) to value:
  distinctValues = map(lambda row: row[0], distinctValues)
  return dict([(distinctValues[i], i) for i in xrange(len(distinctValues))])


In [17]:
def reIndexRow(row):
  
  featureVector = numpy.zeros(len(row))
  for j in xrange(len(featureCols)):
   return Vectors.dense(featureVector)
# Create RDD of feature vectors using the function defined above.
indexedFeatures = hydrogendata.select(*featureCols).map(lambda row: reIndexRow(row))
# Create corresponding RDD of labels 
vp = hydrogendata.select(labelCol).map(lambda row: row[0])

In [18]:
# Create an RDD of LabeledPoints.
indexedvp = vp.zip(indexedFeatures).map(lambda l_p: LabeledPoint(l_p[0], l_p[1]))


In [19]:
(trainingData, testData) = indexedvp.randomSplit([0.75, 0.25])

In [20]:
trainingData.cache()
testData.cache()


PythonRDD[26] at RDD at PythonRDD.scala:43

In [21]:
model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    numTrees=8, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=24, maxBins=28)

In [22]:
predictions = model.predict(testData.map(lambda x: x.features))


In [23]:
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

In [24]:
testMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() /\
    float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression forest model:')
print(model.toDebugString())


Test Mean Squared Error = 46612.3474187
Learned regression forest model:
TreeEnsembleModel regressor with 8 trees

  Tree 0:
    Predict: 441.63430866886216
  Tree 1:
    Predict: 443.65025477855977
  Tree 2:
    Predict: 440.7397622602904
  Tree 3:
    Predict: 443.08489351205026
  Tree 4:
    Predict: 444.1960029581033
  Tree 5:
    Predict: 442.6248831349886
  Tree 6:
    Predict: 443.77263613062075
  Tree 7:
    Predict: 443.40950869634804



In [25]:
print(model)

TreeEnsembleModel regressor with 8 trees

