First of all, import the findspark library and use the init() function and supply the path where you've installed Spark. 
- If you have no idea where did you install Spark, just use 
`findspark.find()`

In [1]:
import findspark
findspark.init('/usr/local/spark')

Import the `SparkSession` module from `pyspark.sql`

In [2]:
# Import SparkSession
from pyspark.sql import SparkSession

Build a SparkSession with the builder() method. 
- set the master URL to connect to, 
- the app name, 
- add some additional configuration like the executor memory 
- then lastly, use getOrCreate() to either get the current Spark session or to create one if there is none running.

In [3]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

## Load Data

Download the data [from here](http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html). Find the cal_housing.zip folder, download and untar it so that you can access the data folders.

Use `textFile()` method to read data to RDDs. 

In [4]:
# Load in the data
rdd = sc.textFile('CaliforniaHousing/cal_housing.data')

# Load in the header
header = sc.textFile('CaliforniaHousing/cal_housing.domain')

Note that Spark’s execution is “lazy” in the sense that nothing has been executed yet and data hasn’t been actually read in. 
- The rdd and header variables are just concepts in your mind. 
- You have to push Spark to work for you, so use the collect() method to look at the header. 

The collect() method brings the entire RDD to a single machine:

## EDA

In [5]:
header.collect()

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

calling entire rdd to one machine is unsafe approach that can lead the driver to run out of memory, try to pratice with `take()` method on RDD. It is safe approach that will print a few elements of the RDD.

In [6]:
#read first 3 elements of RDD
rdd.take(3)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000',
 '-122.240000,37.850000,52.000000,1467.000000,190.000000,496.000000,177.000000,7.257400,352100.000000']

This is not what we want, all the lines are together separated by commas, entries separated by commas...let's use map() with a lambda function to split the line at commas.

In [7]:
rdd = rdd.map(lambda x:x.split(','))
# Inspect the first 2 lines 
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

we can alternatively use first() or top() actions to inspect our data. 

In [8]:
# Inspect the first line 
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [9]:
# Take top elements
rdd.top(2)

[['-124.350000',
  '40.540000',
  '52.000000',
  '1820.000000',
  '300.000000',
  '806.000000',
  '270.000000',
  '3.014700',
  '94600.000000'],
 ['-124.300000',
  '41.840000',
  '17.000000',
  '2677.000000',
  '531.000000',
  '1244.000000',
  '456.000000',
  '3.031300',
  '103600.000000']]

but still this is not what we are used to do in pandas - dataframes. To make our life simple, we can easily convert the RDD to DataFrame. In Python such DataFrames are preferred over RDDs given that the performance of DFs is better than RDDs.

Differences between RDDs and DFs:
- RDDs used to perform low-level transformations and actions on your unstructured data, because we dont care about imposing a schema while processing or accessing the attributes by name or column. 
- RDDs preferred when we want to manipulate our data with functional programming constructs rather than domain specific expressions.

So, lets **switch to DFs to use high-level expressions, to perform SQL queries to explore our data and to gain columnar access**.
- make a RDD of row objects with a schema - because like in DFs we eventually want rows and coulumns. Each entry is linked to rows and a certain columns that has data types.
- use map() that will map each enetry to a filed in a Row.
- lambda function is used to construct a row in a SchemaRDD and that the element at index 0 will have the name 'longitude' and so on.
- once this SchemaRDD is ready, convert it to DF with toDF() method.

In [10]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

df is ready, we can inspect it with first() or take() or head() or show() etc.

In [11]:
df.first()

Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue='452600.000000', medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000')

In [12]:
rdd.first()

['-122.230000',
 '37.880000',
 '41.000000',
 '880.000000',
 '129.000000',
 '322.000000',
 '126.000000',
 '8.325200',
 '452600.000000']

In [13]:
df.head()

Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue='452600.000000', medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000')

In [14]:
rdd.head()

AttributeError: 'PipelinedRDD' object has no attribute 'head'

In [16]:
df.take(2)

[Row(households='126.000000', housingMedianAge='41.000000', latitude='37.880000', longitude='-122.230000', medianHouseValue='452600.000000', medianIncome='8.325200', population='322.000000', totalBedRooms='129.000000', totalRooms='880.000000'),
 Row(households='1138.000000', housingMedianAge='21.000000', latitude='37.860000', longitude='-122.220000', medianHouseValue='358500.000000', medianIncome='8.301400', population='2401.000000', totalBedRooms='1106.000000', totalRooms='7099.000000')]

In [17]:
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

In [18]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge| latitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [19]:
rdd.show()

AttributeError: 'PipelinedRDD' object has no attribute 'show'

In [20]:
df.columns

['households',
 'housingMedianAge',
 'latitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

dataframe is ready, a good news for pandas users. But what about data types of columns? lets check.

In [21]:
df.dtypes

[('households', 'string'),
 ('housingMedianAge', 'string'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('medianHouseValue', 'string'),
 ('medianIncome', 'string'),
 ('population', 'string'),
 ('totalBedRooms', 'string'),
 ('totalRooms', 'string')]

In [22]:
df.printSchema()

root
 |-- households: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- population: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- totalRooms: string (nullable = true)



all the data types are strings :( 

so in order to use these dataframes, we need to change the data type. So, we can cast FloatType() to each column of DataFrame df.

lets write a function to do this task for us. Function would take df, column name, new data type as input argument and return df.

In [24]:
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def castColumnNewType(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = df.columns

# Conver the `df` columns to `FloatType()`
df = castColumnNewType(df, columns, FloatType())

In [25]:
df.printSchema()

root
 |-- households: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- population: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- totalRooms: float (nullable = true)



wonderful, we have now data frame with accurate data types and structure.

We can do SQL queries and data exploration.

In [26]:
# select two columns and show 10 rows.
df.select('population', 'totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [27]:
df.groupBy("housingMedianAge") \
.count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



lets get summary statistics

In [29]:
df.describe().show()

+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       households|  housingMedianAge|         latitude|          longitude|  medianHouseValue|      medianIncome|        population|    totalBedRooms|        totalRooms|
+-------+-----------------+------------------+-----------------+-------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|            20640|             20640|            20640|              20640|             20640|             20640|             20640|            20640|             20640|
|   mean|499.5396802325581|28.639486434108527|35.63186143109965|-119.56970444871473|206855.81690891474|3.8706710030346416|1425.4767441860465|537.8980135658915|2635.7630813953488|
| stddev|382.3297528316098| 12.58555761211163|2.135952380602968|  2.003531742932898|115395.61587441359|1.

min and max values of all the attributes are scattered over a wide range of values, so let us normalize the data.

## Preprocessing

target variable `meadianHouseValue` has quite big values, let's adjust it.
- express house values in units of 100,000. 

In [33]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.take(2)

[Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0),
 Row(households=1138.0, housingMedianAge=21.0, latitude=37.86000061035156, longitude=-122.22000122070312, medianHouseValue=3.585, medianIncome=8.301400184631348, population=2401.0, totalBedRooms=1106.0, totalRooms=7099.0)]

## Feature Engineering

add the following columns to the data set:

Rooms per household: the number of rooms in households per block group

Population per household: how many people live in households per block group

Bedrooms per room: how many rooms are bedrooms per block group;

- use select() method to access column names, namely totalRooms, households, and population
- to indicate that you’re working with columns add the col() function to your code. Otherwise, you won’t be able to do element-wise operations like the division that you have in mind for these three variables

In [34]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(households=126.0, housingMedianAge=41.0, latitude=37.880001068115234, longitude=-122.2300033569336, medianHouseValue=4.526, medianIncome=8.325200080871582, population=322.0, totalBedRooms=129.0, totalRooms=880.0, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

## Reorder the data

Why we need this reordering?
- you don’t want to necessarily standardize your target values, you’ll want to make sure to isolate those in your data set.
- do this by using the select() method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won’t be affected by the standardization.
- Note also that this is the time to leave out variables that you might not want to consider in your analysis. In this case, let’s leave out variables such as longitude, latitude, housingMedianAge and totalRooms.

In [35]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [37]:
df.printSchema()

root
 |-- medianHouseValue: double (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- roomsPerHousehold: double (nullable = true)
 |-- populationPerHousehold: double (nullable = true)
 |-- bedroomsPerRoom: double (nullable = true)



## Standardization

- first separate the features from the target variable i.e.  isolate the first column in your DataFrame from the rest of the columns.
- use the map() function that you use with RDDs to perform this action. - use DenseVector() function. A dense vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.
- then you go back to making a DataFrame out of the input_data and you re-label the columns by passing a list as a second argument. This list consists of the column names "label" and "features"

In [38]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

now scale the data

use Spark ML  for this, this library will make machine learning on big data scalable and easy. 

we can use ML pipelines, but here we are not doing much of preprocessing, so let us use simple method of StandardScaler.

The input columns are the features, and the output column with the rescaled that will be included in the scaled_df will be named "features_scaled".

In [39]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.take(2)

[Row(label=4.526, features=DenseVector([129.0, 322.0, 126.0, 8.3252, 6.9841, 2.5556, 0.1466]), features_scaled=DenseVector([0.3062, 0.2843, 0.3296, 4.3821, 2.8228, 0.2461, 2.5264])),
 Row(label=3.585, features=DenseVector([1106.0, 2401.0, 1138.0, 8.3014, 6.2381, 2.1098, 0.1558]), features_scaled=DenseVector([2.6255, 2.1202, 2.9765, 4.3696, 2.5213, 0.2031, 2.6851]))]

In [41]:
scaled_df.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)



note that a third column features_scaled was added to your DataFrame, which you can use to compare with features.

## Build the ML Model with Spark ML

it’s finally time to start building your Linear Regression model! Just like always, you first need to split the data into training and test sets. Luckily, this is no issue with the randomSplit() method:

In [42]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

You pass in a list with two numbers that represent the size that you want your training and test sets to have and a seed, which is needed for reproducibility reasons. 

Note that the argument elasticNetParam corresponds to α or the vertical intercept and that the regParam or the regularization paramater corresponds to λ. Go here for more information.

In [43]:
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)

With your model in place, you can generate predictions for your test data: use the transform() method to predict the labels for your test_data. Then, you can use RDD operations to extract the predictions as well as the true labels from the DataFrame and zip these two values together in a list called predictionAndLabel.

Lastly, you can then inspect the predicted and real values by simply accessing the list with square brackets []:

In [45]:
# Generate predictions
predicted = linearModel.transform(test_data)
predicted.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [46]:
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(1.1340115638008952, 0.14999),
 (1.4485018834650096, 0.14999),
 (1.5713396046425587, 0.14999),
 (1.7496542762527307, 0.283),
 (1.2438468929500472, 0.366)]

## Model Evaluation

let us print coefficients and intercept of model:

In [48]:
# Coefficients for the model
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2796, 0.0, 0.0, 0.0])

In [49]:
# Intercept for the model
linearModel.intercept

0.9841344205626824

Next, you can also use the summary attribute to pull up the rootMeanSquaredError and the r2:

The RMSE measures how much error there is between two datasets comparing a predicted value and an observed or known value. The smaller an RMSE value, the closer predicted and observed values are.

The R2 (“R squared”) or the coefficient of determination is a measure that shows how close the data are to the fitted regression line. This score will always be between 0 and a 100% (or 0 to 1 in this case), where 0% indicates that the model explains none of the variability of the response data around its mean, and 100% indicates the opposite: it explains all the variability. That means that, in general, the higher the R-squared, the better the model fits your data.

In [50]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

0.8765335684459216

In [51]:
# Get the R2
linearModel.summary.r2

0.42282227755911483

clearly, R2 is quite small indicating that model needs improvemnets.

Before you go, make sure to stop the SparkSession with the following line of code:

In [52]:
spark.stop()