In [1]:
import pyspark
import numpy as np

from pyspark import SparkContext
from pyspark import SQLContext

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)


In [4]:
#read data
df = sqlContext.read.load('abalone.data.txt', 
                          format='com.databricks.spark.csv', 
                          header='false', 
                          inferSchema='true')

In [5]:
#set headers
oldColumns = df.schema.names
newColumns = ['Sex','Length', 'Diameter','Height','Whole weight','Shucked weight','Viscera weight','Shell weight','Rings']

df = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), df)
df.printSchema()
df.show()


root
 |-- Sex: string (nullable = true)
 |-- Length: double (nullable = true)
 |-- Diameter: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Whole weight: double (nullable = true)
 |-- Shucked weight: double (nullable = true)
 |-- Viscera weight: double (nullable = true)
 |-- Shell weight: double (nullable = true)
 |-- Rings: integer (nullable = true)

+---+------+--------+------+------------+--------------+--------------+------------+-----+
|Sex|Length|Diameter|Height|Whole weight|Shucked weight|Viscera weight|Shell weight|Rings|
+---+------+--------+------+------------+--------------+--------------+------------+-----+
|  M| 0.455|   0.365| 0.095|       0.514|        0.2245|         0.101|        0.15|   15|
|  M|  0.35|   0.265|  0.09|      0.2255|        0.0995|        0.0485|        0.07|    7|
|  F|  0.53|    0.42| 0.135|       0.677|        0.2565|        0.1415|        0.21|    9|
|  M|  0.44|   0.365| 0.125|       0.516|        0.2155|         0.114|       0

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [7]:
# mapping string labels to numbers
sex = df.select("Sex").rdd.flatMap(lambda x: x).collect()
mappingSex = {j:i for i,j in enumerate(np.unique(sex))}
mapping = udf(lambda Sex: mappingSex.get(Sex), IntegerType())
df = df.withColumn("Sex", mapping(df.Sex))

In [8]:
df.printSchema()
df.show(10)


root
 |-- Sex: integer (nullable = true)
 |-- Length: double (nullable = true)
 |-- Diameter: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Whole weight: double (nullable = true)
 |-- Shucked weight: double (nullable = true)
 |-- Viscera weight: double (nullable = true)
 |-- Shell weight: double (nullable = true)
 |-- Rings: integer (nullable = true)

+---+------+--------+------+------------+--------------+--------------+------------+-----+
|Sex|Length|Diameter|Height|Whole weight|Shucked weight|Viscera weight|Shell weight|Rings|
+---+------+--------+------+------------+--------------+--------------+------------+-----+
|  2| 0.455|   0.365| 0.095|       0.514|        0.2245|         0.101|        0.15|   15|
|  2|  0.35|   0.265|  0.09|      0.2255|        0.0995|        0.0485|        0.07|    7|
|  0|  0.53|    0.42| 0.135|       0.677|        0.2565|        0.1415|        0.21|    9|
|  2|  0.44|   0.365| 0.125|       0.516|        0.2155|         0.114|       

In [9]:
from pyspark.ml.feature import VectorAssembler

In [10]:
#construct a feature vector as assembler
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ['Rings']],
    outputCol='features')

assembler.transform(df)

DataFrame[Sex: int, Length: double, Diameter: double, Height: double, Whole weight: double, Shucked weight: double, Viscera weight: double, Shell weight: double, Rings: int, features: vector]

In [11]:
df

DataFrame[Sex: int, Length: double, Diameter: double, Height: double, Whole weight: double, Shucked weight: double, Viscera weight: double, Shell weight: double, Rings: int]

In [12]:
# Split the data into training and test sets (20% held out for testing)
(trainingData, testData) = df.randomSplit([0.8, 0.2])

In [13]:
# Train a RandomForest model.
rf1 = RandomForestRegressor(featuresCol="features", labelCol="Rings")


In [14]:
# Chain indexer and forest in a Pipeline
pipeline1 = Pipeline(stages=[assembler, rf1])

In [15]:
# Train model.  This also runs the indexer.
model1 = pipeline1.fit(trainingData)

In [16]:
# Make predictions.
predictions1 = model1.transform(testData)

In [17]:
# Select example rows to display.
predictions1.select("prediction", "Rings", "features").show(10)

+-----------------+-----+--------------------+
|       prediction|Rings|            features|
+-----------------+-----+--------------------+
|5.205824341982557|    5|[0.0,0.275,0.195,...|
|6.558412515328068|    6|[0.0,0.335,0.22,0...|
|7.200615279765475|    9|[0.0,0.345,0.255,...|
|7.497625729847648|   10|[0.0,0.35,0.275,0...|
| 7.31755916909393|    8|[0.0,0.36,0.265,0...|
|7.567988534610538|   10|[0.0,0.36,0.265,0...|
| 8.42847347324996|    7|[0.0,0.375,0.295,...|
| 7.84430938483155|    8|[0.0,0.38,0.305,0...|
|8.645993349120971|   10|[0.0,0.38,0.325,0...|
| 8.55664838727922|    7|[0.0,0.385,0.305,...|
+-----------------+-----+--------------------+
only showing top 10 rows



In [18]:
# Select (prediction, true label) and compute test error
evaluator1 = RegressionEvaluator(
    labelCol="Rings", predictionCol="prediction", metricName="rmse")
rmse = evaluator1.evaluate(predictions1)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 2.18156


In [19]:
evaluator2 = RegressionEvaluator(
    labelCol="Rings", predictionCol="prediction", metricName="mse")
mse = evaluator2.evaluate(predictions1)
print("Mean Squared Error (MSE) on test data = %g" % mse)

Mean Squared Error (MSE) on test data = 4.75919


In [20]:
evaluator3 = RegressionEvaluator(
    labelCol="Rings", predictionCol="prediction", metricName="r2")
r2 = evaluator3.evaluate(predictions1)
print("R^2 on test data = %g" % r2)

R^2 on test data = 0.537818


In [21]:
evaluator4 = RegressionEvaluator(
    labelCol="Rings", predictionCol="prediction", metricName="mae")
mae = evaluator4.evaluate(predictions1)
print("Mean Absolute Error (MAE) on test data = %g" % mae)

Mean Absolute Error (MAE) on test data = 1.54974


In [22]:
rfModel1 = model1.stages[1]
print(rfModel1)

RandomForestRegressionModel (uid=rfr_f0051394a247) with 20 trees


In [23]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [24]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer= StringIndexer(inputCol="Rings", outputCol="label").fit(df)

In [25]:
labelIndexer

StringIndexer_4ec2a2b2d6ec4de669ee

In [26]:
# Train a RandomForest model.
rf2 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20)

In [27]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)


In [28]:
# Chain indexers and forest in a Pipeline
pipeline2 = Pipeline(stages=[labelIndexer, assembler, rf2, labelConverter])

In [29]:
# Train model.  This also runs the indexers.
model2 = pipeline2.fit(trainingData)

In [30]:
# Make predictions.
predictions2 = model2.transform(testData)

In [31]:
# Select example rows to display.
predictions2.select("predictedLabel", "Rings", "features").show(10)

+--------------+-----+--------------------+
|predictedLabel|Rings|            features|
+--------------+-----+--------------------+
|             5|    5|[0.0,0.275,0.195,...|
|             6|    6|[0.0,0.335,0.22,0...|
|             7|    9|[0.0,0.345,0.255,...|
|             6|   10|[0.0,0.35,0.275,0...|
|             6|    8|[0.0,0.36,0.265,0...|
|             7|   10|[0.0,0.36,0.265,0...|
|             7|    7|[0.0,0.375,0.295,...|
|             7|    8|[0.0,0.38,0.305,0...|
|             7|   10|[0.0,0.38,0.325,0...|
|             7|    7|[0.0,0.385,0.305,...|
+--------------+-----+--------------------+
only showing top 10 rows



In [32]:
# Select (prediction, true label) and compute accuracy
evaluator_cl1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_cl1.evaluate(predictions2)
print("Accuracy = %g" % (accuracy))

Accuracy = 0.265455


In [33]:
# Select (prediction, true label) and compute weighted precision
evaluator_cl2 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluator_cl2.evaluate(predictions2)
print("Weighted Precision = %g" % (wp))

Weighted Precision = 0.233744


In [34]:
# Select (prediction, true label) and compute weighted recall
evaluator_cl3 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="weightedRecall")
wc = evaluator_cl3.evaluate(predictions2)
print("Weighted Recall = %g" % (wc))

Weighted Recall = 0.265455


In [35]:
# Select (prediction, true label) and compute f1 score
evaluator_cl4 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1")
f1= evaluator_cl3.evaluate(predictions2)
print("f1 = %g" % (f1))

f1 = 0.265455
