In [1]:
import findspark
findspark.init()

import import_ipynb
from utils import rename

import pyspark.sql.types as tp
from pyspark import SparkContext, SparkFiles
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression

importing Jupyter notebook from utils.ipynb


### Classification

In [2]:
sc = SparkContext()

url = 'https://gist.githubusercontent.com/netj/8836201/raw/6f9306ad21398ea43cba4f7d537619d0e07d5ae3/iris.csv'
sc.addFile(url)
sqlContext = SQLContext(sc)

In [3]:
schema_setting = tp.StructType([
    tp.StructField(name='sepal.length', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='sepal.width', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='petal.length', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='petal.width', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='variety', dataType=tp.StringType(), nullable=False)
])

data = sqlContext.read.csv(
    SparkFiles.get("iris.csv"), 
    schema=schema_setting,
    header=True
)

In [4]:
data.show(3)

+------------+-----------+------------+-----------+-------+
|sepal.length|sepal.width|petal.length|petal.width|variety|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| Setosa|
|         4.9|        3.0|         1.4|        0.2| Setosa|
|         4.7|        3.2|         1.3|        0.2| Setosa|
+------------+-----------+------------+-----------+-------+
only showing top 3 rows



In [5]:
rename_data = rename(data, data.columns, ['x1', 'x2', 'x3', 'x4', 'y'])
rename_data.show(3)

+---+---+---+---+------+
| x1| x2| x3| x4|     y|
+---+---+---+---+------+
|5.1|3.5|1.4|0.2|Setosa|
|4.9|3.0|1.4|0.2|Setosa|
|4.7|3.2|1.3|0.2|Setosa|
+---+---+---+---+------+
only showing top 3 rows



In [6]:
split = rename_data.randomSplit([0.6, 0.4])

train_data = split[0]
test_data = split[1]

In [7]:
# string -> category
string_indexer = StringIndexer(inputCol='y', outputCol='label')

# each x value -> vector
vector_assembler = VectorAssembler(inputCols=['x1', 'x2', 'x3', 'x4'], outputCol='features')

# generage model
logistic_regression = LogisticRegression(featuresCol='features', labelCol='label')

# combine
pipeline = Pipeline(stages=[string_indexer, vector_assembler, logistic_regression])

# create model
model = pipeline.fit(train_data)

In [8]:
pred = model.transform(test_data)
pred.select('label', 'prediction').groupBy('label', 'prediction').count().sort('label').show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   16|
|  0.0|       1.0|    1|
|  1.0|       1.0|   19|
|  1.0|       0.0|    1|
|  2.0|       2.0|   21|
+-----+----------+-----+



### Regression

In [9]:
sc = SparkContext()

url = 'https://raw.githubusercontent.com/selva86/datasets/master/BostonHousing.csv'
sc.addFile(url)
sqlContext = SQLContext(sc)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-2-a994ca768833>:1 

In [None]:
schema_setting = tp.StructType([
    tp.StructField(name='crim', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='zn', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='indus', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='chas', dataType=tp.IntegerType(), nullable=False),
    tp.StructField(name='nox', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='rm', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='age', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='dis', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='rad', dataType=tp.IntegerType(), nullable=False),
    tp.StructField(name='tax', dataType=tp.IntegerType(), nullable=False),
    tp.StructField(name='ptratio', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='b', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='lstat', dataType=tp.DoubleType(), nullable=False),
    tp.StructField(name='medv', dataType=tp.DoubleType(), nullable=False)
])

data = sqlContext.read.csv(
    SparkFiles.get("BostonHousing.csv"), 
    schema=schema_setting,
    header=True
)

In [None]:
data.show(5)

In [None]:
split = data.randomSplit([0.6, 0.4])

train_data = split[0]
test_data = split[1]

In [None]:
input_cols = train_data.columns[:-1]

train_vectors = VectorAssembler(inputCols=input_cols, outputCol='features')
train = train_vectors.transform(train_data)

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, featuresCol='features', labelCol='medv')

lrModel = lr.fit(train)


In [None]:
trainingSummary = lrModel.summary

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

In [None]:
test_vectors = VectorAssembler(inputCols=input_cols, outputCol='features')
test = train_vectors.transform(test_data)

pred = lrModel.transform(test)
pred.select('medv', 'prediction').show(3)