<a href="https://colab.research.google.com/github/pragmatizt/coffee_challenges/blob/master/pyspark_intro.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Walkthrough from: https://towardsdatascience.com/pyspark-in-google-colab-6821c2faf41c

In [0]:
# Install the dependencies
# If the URL is not found, there may be a newer version of Spark.  Visit these websites for updates:
# 1. https://www-us.apache.org/dist/spark/ or 2. http://apache.osuosl.org/spark/

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
# set the environment path that enables us to run PySpark in our Colab environment

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
# We can run a local spark session to test our installation:

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [21]:
!ls

BostonHousing.txt		     spark-2.4.4-bin-hadoop2.7.tgz
NOAA_DHW_monthly_798a_0f03_e546.csv  spark-2.4.4-bin-hadoop2.7.tgz.1
sample_data			     spark-2.4.4-bin-hadoop2.7.tgz.2
spark-2.4.4-bin-hadoop2.7


In [0]:
# improt vector assembler, and linear regression modules

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
dataset = spark.read.csv('Boston.csv', inferSchema=True, header=True)

In [27]:
dataset.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- black: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [29]:
"""In the next step, we will convert all the features from different columns into a single column and we can 
call the new vector column as ‘Attributes’ in the outputCol."""

# Input all the features in one vector column 
assembler = VectorAssembler(inputCols=['_c0', 'crim', 'zn', 'indus', 'chas', 'nox', 'rm', 'age', 'dis', 'rad', 'tax', 'ptratio', 'black', 'lstat'], outputCol = 'Attributes')

output = assembler.transform(dataset)

#Input vs Output
finalized_data = output.select("Attributes", "medv")

finalized_data.show()

+--------------------+----+
|          Attributes|medv|
+--------------------+----+
|[1.0,0.00632,18.0...|24.0|
|[2.0,0.02731,0.0,...|21.6|
|[3.0,0.02729,0.0,...|34.7|
|[4.0,0.03237,0.0,...|33.4|
|[5.0,0.06905,0.0,...|36.2|
|[6.0,0.02985,0.0,...|28.7|
|[7.0,0.08829,12.5...|22.9|
|[8.0,0.14455,12.5...|27.1|
|[9.0,0.21124,12.5...|16.5|
|[10.0,0.17004,12....|18.9|
|[11.0,0.22489,12....|15.0|
|[12.0,0.11747,12....|18.9|
|[13.0,0.09378,12....|21.7|
|[14.0,0.62976,0.0...|20.4|
|[15.0,0.63796,0.0...|18.2|
|[16.0,0.62739,0.0...|19.9|
|[17.0,1.05393,0.0...|23.1|
|[18.0,0.7842,0.0,...|17.5|
|[19.0,0.80271,0.0...|20.2|
|[20.0,0.7258,0.0,...|18.2|
+--------------------+----+
only showing top 20 rows



In [31]:
# Split training and testing data
train_data, test_data = finalized_data.randomSplit([0.8,0.2])

regressor = LinearRegression(featuresCol = 'Attributes', labelCol = 'medv')

# Learn to fit the model from training set
regressor = regressor.fit(train_data)

# To predict the prices on testing set
pred = regressor.evaluate(test_data)

# Predict the model
pred.predictions.show()

+--------------------+----+------------------+
|          Attributes|medv|        prediction|
+--------------------+----+------------------+
|[2.0,0.02731,0.0,...|21.6| 25.29340301892609|
|[4.0,0.03237,0.0,...|33.4| 29.07886236456516|
|[5.0,0.06905,0.0,...|36.2|28.453741226694568|
|[10.0,0.17004,12....|18.9|19.530596067147894|
|[16.0,0.62739,0.0...|19.9|19.711170071069503|
|[27.0,0.67191,0.0...|16.6|15.859601134608646|
|[30.0,1.00245,0.0...|21.0|21.310434744518787|
|[38.0,0.08014,0.0...|21.0| 23.67976435438619|
|[41.0,0.03359,75....|34.9| 35.55073385338167|
|[45.0,0.12269,0.0...|21.2|23.315585277205805|
|[46.0,0.17142,0.0...|19.3|22.485233017671664|
|[50.0,0.21977,0.0...|19.4|17.551525295732862|
|[51.0,0.08873,21....|19.7|21.964960377318064|
|[52.0,0.04337,21....|20.5|24.453709008906475|
|[57.0,0.02055,85....|24.7|25.811311208576555|
|[59.0,0.15445,25....|23.3|  22.7070051225814|
|[62.0,0.17171,25....|16.0|19.339275287767002|
|[67.0,0.04379,80....|19.4|26.661064702485323|
|[79.0,0.0564

Next, we print the coefficient and intercept of the regression model

In [32]:
# coefficient of the regression model
coeff = regressor.coefficients

# X and y intercept
intr = regressor.intercept

print("The coefficient of the model is : %a" %coeff)
print("The intercept of the model is : %f" %intr)

The coefficient of the model is : DenseVector([-0.0034, -0.1396, 0.0585, 0.0103, 2.3055, -17.0501, 3.9441, -0.005, -1.5186, 0.3939, -0.014, -0.9339, 0.0079, -0.4938])
The intercept of the model is : 36.471211


Now we can go further and analyze our model statistically by importing RegressionEvaluator module from Pyspark

In [34]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol="medv", predictionCol="prediction", metricName="rmse")

# Root Mean Square Error
rmse = eval.evaluate(pred.predictions)
print("RMSE: %.3f" % rmse)

# Mean Square Error
mse = eval.evaluate(pred.predictions, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

# Mean Absolute Error
mae = eval.evaluate(pred.predictions, {eval.metricName: "mae"})
print("MAE: %.3f" % mae)

# r2 - coefficient of determination
r2 = eval.evaluate(pred.predictions, {eval.metricName: "r2"})
print("r2: %.3f" %r2)

RMSE: 4.915
MSE: 24.162
MAE: 3.434
r2: 0.689
