<a href="https://colab.research.google.com/github/jolmcla/Machine-Learning-Operationalization/blob/master/regression_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [75]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [76]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

In [77]:
import os

print(os.getcwd())

/content


In [78]:
!tar xf /content/spark-3.2.0-bin-hadoop3.2.tgz

In [79]:
!pip install -q findspark

In [80]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [81]:
import findspark
findspark.init()


In [82]:
findspark.find()

'/content/spark-3.2.0-bin-hadoop3.2'

In [83]:
from pyspark.sql import SparkSession

spark= SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

In [92]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')
#df1 = pd.read_csv('/content/drive/MyDrive/heart.csv')

housing=spark.read.option("InferSchema",'true').csv("/content/drive/MyDrive/housing2.csv", header=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [93]:
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [94]:
#columns of dataframe
housing.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'ocean_proximity']

In [95]:
#create sparksession object
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('lin_reg').getOrCreate()

In [96]:
#import Linear Regression from spark's MLlib
from pyspark.ml.regression import LinearRegression

In [97]:
#shape of dataset
print((housing.count(),len(housing.columns)))

(20640, 10)


In [98]:
#printSchema
housing.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [99]:
#view statistical measures of data 
housing.describe().show(5,False)

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|longitude          |latitude         |housing_median_age|total_rooms       |total_bedrooms    |population        |households       |median_income     |median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|count  |20640              |20640            |20640             |20640             |20433             |20640             |20640            |20640             |20640             |20640          |
|mean   |-119.56970445736148|35.6318614341087 |28.639486434108527|2635.7630813953488|537.8705525375618 |1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|null           |
|stddev |2.003531723

url = 'copied_raw_GH_link'
df1 = pd.read_csv(url)

In [100]:
#sneak into the dataset
housing.head(3)

[Row(longitude=-122.23, latitude=37.88, housing_median_age=41.0, total_rooms=880.0, total_bedrooms=129.0, population=322.0, households=126.0, median_income=8.3252, median_house_value=452600.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.22, latitude=37.86, housing_median_age=21.0, total_rooms=7099.0, total_bedrooms=1106.0, population=2401.0, households=1138.0, median_income=8.3014, median_house_value=358500.0, ocean_proximity='NEAR BAY'),
 Row(longitude=-122.24, latitude=37.85, housing_median_age=52.0, total_rooms=1467.0, total_bedrooms=190.0, population=496.0, households=177.0, median_income=7.2574, median_house_value=352100.0, ocean_proximity='NEAR BAY')]

In [102]:
#import corr function from pyspark functions
from pyspark.sql.functions import corr

# check for correlation
housing.select(corr('total_rooms','median_house_value')).show()

+-------------------------------------+
|corr(total_rooms, median_house_value)|
+-------------------------------------+
|                  0.13415311380656275|
+-------------------------------------+



In [106]:
#import vectorassembler to create dense vectors
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [116]:
#create the vector assembler 
vec_assmebler=VectorAssembler(inputCols=['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'households','total_bedrooms'],outputCol='features')

In [117]:
#transform the values
features_df=vec_assmebler.transform(housing)

IllegalArgumentException: ignored

In [114]:
#validate the presence of dense vectors 
features_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- features: vector (nullable = true)



In [115]:
#view the details of dense vector
features_df.select('features').show(5,False)

+-----------------------------------------+
|features                                 |
+-----------------------------------------+
|[-122.23,37.88,41.0,880.0,126.0,129.0]   |
|[-122.22,37.86,21.0,7099.0,1138.0,1106.0]|
|[-122.24,37.85,52.0,1467.0,177.0,190.0]  |
|[-122.25,37.85,52.0,1274.0,219.0,235.0]  |
|[-122.25,37.85,52.0,1627.0,259.0,280.0]  |
+-----------------------------------------+
only showing top 5 rows



In [118]:
#create data containing input features and output column
model_df=features_df.select('features','median_house_value')