In [None]:
#L.Morris
#September 2019
#Python interface to Spark is pyspark
#JDK is a dependency
#pip install pyspark
#OR can install with homebrew, a linux package manager, $brew install apache-spark
#very important that Spark be packaged with your juptyer directory
#Setup instructions:
#https://opensource.com/article/18/11/pyspark-jupyter-notebook

In [None]:
#LM Notes
''' 
import findspark
findspark.init()
rdd1 = pyspark.sparkContext.parallelize([('a',7),('a',2),('b',2)])

import findspark
findspark.init()
# Creating Spark Context
from pyspark import SparkContext
sc = SparkContext("local", "first app")
# Calculating words count
text_file = sc.textFile("OneSentence.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
# Printing each word with its respective count
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
# Stopping Spark Context
sc.stop()

#https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc= SparkContext()
sqlContext = SQLContext(sc)
house_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('boston_housing.csv')
house_df.take(1)

house_df.cache()
house_df.printSchema()

house_df.describe().toPandas().transpose()

Data dictionary for boston housing data:
CRIM: per capita crime rate by town
ZN: proportion of residential land zoned for lots over 25,000 sq.ft.
INDUS: proportion of non-retail business acres per town.
CHAS: Charles River dummy variable (1 if tract bounds river; 0 otherwise)
NOX: nitric oxides concentration (parts per 10 million)
RM: average number of rooms per dwelling
AGE: proportion of owner-occupied units built prior to 1940
DIS: weighted distances to five Boston employment centres
RAD: index of accessibility to radial highways
TAX: full-value property-tax rate per $10,000
PTRATIO: pupil-teacher ratio by town
B: 1000(Bk — 0.63)² where Bk is the proportion of blacks by town
LSTAT: % lower status of the population
MEDV: Median value of owner-occupied homes in $1000's

The goal is to predict medv, which is the median value of the homes
'''

In [None]:
import pandas as pd
import numpy as np

In [None]:
#https://towardsdatascience.com/apache-spark-mllib-tutorial-ec6f1cb336a9

In [1]:
#Every spark application requries a session, so we create one
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
#load the data
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)

In [3]:
#display the data
data.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2| 395.6|12.43|22.9|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2| 396.9|19.15|27.1|
|0.21124|12.5| 7.87|   0|0.524|5.631|100.0|6.0821|  5|311|   15.2

In [12]:
type(data)

pyspark.sql.dataframe.DataFrame

In [4]:
#set up the features
feature_columns = data.columns[:-1] # omit the final column, which is the output, or dependent variable
from pyspark.ml.feature import VectorAssembler

#here, we pass the features to the vector assemler method
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")

In [5]:
data_2 = assembler.transform(data)

In [6]:
data_2.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+
|   crim|  zn|indus|chas|  nox|   rm|  age|   dis|rad|tax|ptratio|     b|lstat|medv|            features|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+------+-----+----+--------------------+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|[0.00632,18.0,2.3...|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|[0.02731,0.0,7.07...|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|[0.02729,0.0,7.07...|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7|394.63| 2.94|33.4|[0.03237,0.0,2.18...|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 396.9| 5.33|36.2|[0.06905,0.0,2.18...|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7|394.12| 5.21|28.7|[0.02985,0.0,2.18...|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5

In [7]:
#test/train split of the data to the 70/30 convention
train, test = data_2.randomSplit([0.7, 0.3])

In [9]:
#create a linear regression model
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features", labelCol="medv")

In [10]:
model = algo.fit(train) #action happens here

IllegalArgumentException: u'Unsupported class file major version 56'

In [None]:
#evaluate the model
evaluation_summary = model.evaluate(test)

In [None]:
#prediction
predictions = model.transform(test)