# Install Spark

In [2]:
!pip3 install pyspark

Collecting pyspark
  Using cached pyspark-3.4.1.tar.gz (310.8 MB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=f761ff96307a23501d3676ed766a7c9e3ac7a2564c113382695054e4fdca5062
  Stored in directory: /Users/macos/Library/Caches/pip/wheels/53/fe/23/517784b9d9dadfb82c5676e76483422096aa5dc20d4d602213
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.1


# Read Data

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate() # Read data from google drive
def read_file_from_drive(filepath):
    """
    filepath: path to the dataset
    """
    data = spark.read.csv(filepath, inferSchema = True, header = True)
    return data

23/09/18 00:08:51 WARN Utils: Your hostname, MacBook-Pro-8.local resolves to a loopback address: 127.0.0.1; using 172.16.16.252 instead (on interface en0)
23/09/18 00:08:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/18 00:08:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


23/09/18 00:09:10 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
df = read_file_from_drive("BostonHousing.csv")
print(df.count())

506


In [5]:
df.printSchema()

root
 |-- crim: double (nullable = true)
 |-- zn: double (nullable = true)
 |-- indus: double (nullable = true)
 |-- chas: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- rm: double (nullable = true)
 |-- age: double (nullable = true)
 |-- dis: double (nullable = true)
 |-- rad: integer (nullable = true)
 |-- tax: integer (nullable = true)
 |-- ptratio: double (nullable = true)
 |-- b: double (nullable = true)
 |-- lstat: double (nullable = true)
 |-- medv: double (nullable = true)



In [6]:
df.describe().toPandas().transpose()

23/09/18 00:11:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
crim,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
zn,506,11.363636363636363,23.32245299451514,0.0,100.0
indus,506,11.136778656126504,6.860352940897589,0.46,27.74
chas,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
rm,506,6.284634387351787,0.7026171434153232,3.561,8.78
age,506,68.57490118577078,28.148861406903595,2.9,100.0
dis,506,3.795042687747034,2.10571012662761,1.1296,12.1265
rad,506,9.549407114624506,8.707259384239366,1,24


# Data Cleaning

In [10]:
from pyspark.sql.functions import when,lit,count,isnan,col

def replace(column, value):
    return when(column!=value,column).otherwise(lit(None))


df.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|crim| zn|indus|chas|nox| rm|age|dis|rad|tax|ptratio|  b|lstat|medv|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+
|   0|  0|    0|   0|  0|  0|  0|  0|  0|  0|      0|  0|    0|   0|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+



There are no missing values in the data

# Data preparation

In [9]:
result = df.select("crim","zn","indus","medv").orderBy("medv").toPandas() 
print(round(result.iloc[0,-1],2))

5.0


## Create input features for the model

In [11]:
from pyspark.ml.regression import LinearRegression 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
def prepare_data_using_pyspark(df):
    assembler = VectorAssembler(
        inputCols=["crim", "zn", "indus", "chas", "nox", "rm", "age", "dis", "rad", "tax", "ptratio", "b", "lstat"], 
        outputCol="features")
    data = assembler.transform(df) 
    final_data = []
    final_data = data.select("features", "medv")
    return final_data

In [14]:
data = prepare_data_using_pyspark(df)
print(data)

DataFrame[features: vector, medv: double]


## Create train and test set

In [19]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42) 
def train(train_data):
    lr = LinearRegression(featuresCol="features", labelCol="medv", predictionCol="predicted_medv")
    lr_model = lr.fit(train_data)
    return lr_model

# Model training and evaluation

In [20]:
lr_model = train(train_data)
coefficients = lr_model.coefficients
intercept = lr_model.intercept
print("Coefficients: ", coefficients) 
print("Intercept: {:.3f}".format(intercept))

23/09/15 01:57:55 WARN Instrumentation: [8e103150] regParam is zero, which might cause numerical instability and overfitting.


Coefficients:  [-0.11362203729408954,0.048909186934053925,0.02379542898673389,2.801771998735119,-18.4154245411894,3.5158797633120065,0.0052116821614709204,-1.4163830723539739,0.3317669315937035,-0.013607893704163878,-0.9534143338408072,0.008602677392853256,-0.519503531247664]
Intercept: 38.617


In [21]:
from pyspark.ml.evaluation import RegressionEvaluator 

predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol="medv", predictionCol="predicted_medv", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))
evaluator_r2 = RegressionEvaluator(labelCol="medv", predictionCol="predicted_medv", metricName="r2")
r2 = evaluator_r2.evaluate(predictions) 
print("R-squared (R2) on test data: {:.3f}".format(r2))

Root Mean Squared Error (RMSE) on test data: 4.672
R-squared (R2) on test data: 0.793


In [22]:
assembler = VectorAssembler(
    inputCols=["crim", "zn", "indus", "chas", "nox", "rm", "age", "dis", "rad", "tax", "ptratio", "b", "lstat"], 
    outputCol="features")
data = assembler.transform(df)
feature_importance = sorted(list(zip(data.columns[:-1], map(abs, coefficients))), key= lambda x: x[1], reverse=True)
print("The most important feature:", feature_importance[0][0])

The most important feature: nox
