# Creating Your First Spark Program
Import the SparkSession module from pyspark.sql and build a SparkSession with the `builder()` method. 
Afterwards, you can set the master URL to connect to, the application name, add some additional configuration 
like the executor memory and then lastly, use `getOrCreate()` to either get the current Spark session or to create 
one if there is none running.

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler


In [3]:
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

In [4]:
sc = spark.sparkContext

This tutorial makes use of the California Housing data set. It appeared in a 1997 paper titled Sparse Spatial 
Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability 
Letters journal. The researchers built this data set by using the 1990 California census data.

The data contains one row per census block group. A block group is the smallest geographical unit for which 
the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). 
In this sample a block group on average includes 1425.5 individuals living in a geographically compact area. 
You’ll gather this information from this [web page](http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html) 
or by reading the paper which was mentioned above and which you can find [here](http://www.spatial-statistics.com/pace_manuscripts/spletters_ms_dir/statistics_prob_lets/html/ms_sp_lets1.html).

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

* Longitude refers to the angular distance of a geographic place north or south of the earth’s equator for each block group;

* Latitude refers to the angular distance of a geographic place east or west of the earth’s equator for each block group;

* Housing median age is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values;

* Total rooms is the total number of rooms in the houses per block group;

* Total bedrooms is the total number of bedrooms in the houses per block group;

* Population is the number of inhabitants of a block group;

* Households refers to units of houses and their occupants per block group;

* Median income is used to register the median income of people that belong to a block group; And,

* Median house value is the dependent variable and refers to the median house value per block group.

* What’s more, you also learn that all the block groups have zero entries for the independent and dependent variables have been excluded from the data.

* The Median house value is the dependent variable and will be assigned the role of the target variable in your ML model.


Next, you’ll use the ```textFile()``` method to read in the data from the folder that you downloaded it to RDDs. 
This method takes an URI for the file, which is in this case the local path of your machine, and reads 
it as a collection of lines. For all convenience, you’ll not only read in the .data file, but also 
the .domain file that contains the header. This will allow you to double check the order of the variables.

In [5]:
rdd = sc.textFile('cal_housing.data')
#rdd = sc.textFile('CaliforniaHousing/cal_housing.data')

header = sc.textFile('cal_housing.domain')
#header = sc.textFile('file:///C:/Users/krism/GDrive/UWI/2021-2022 SEM2/Big Data/Labs/Shelly/COMP3610-2010/Lab4/cal_housing.domain')

Important to understand here is that, because Spark’s execution is “lazy” execution, nothing has been executed yet. Your data hasn’t been actually read in. The rdd and header variables are actually just concepts in your mind. You have to push Spark to work for you, so let’s use the ```collect()``` method to look at the header:

In [6]:
header.collect()

#The collect() method brings the entire RDD to a single machine, and you’ll get to see the following result:

['longitude: continuous.',
 'latitude: continuous.',
 'housingMedianAge: continuous. ',
 'totalRooms: continuous. ',
 'totalBedrooms: continuous. ',
 'population: continuous. ',
 'households: continuous. ',
 'medianIncome: continuous. ',
 'medianHouseValue: continuous. ']

Tip: be careful when using ```collect()```! Running this line of code can possibly cause the driver to run out of memory. 
That’s why the following approach with the take() method is a safer approach if you want to just print a few
elements of the RDD. In general, it’s a good principle to limit your result set whenever possible, 
just like when you’re using SQL.

You learn that the order of the variables is the same as the one that you saw above in the presentation of the data set, and you also learn that all columns should have continuous values. Let’s force Spark to do some more work and take a look at the California housing data to confirm this.

Call the ```take()``` method on your RDD:

By executing the ```take()``` method, you take the first 2 elements of the RDD. The result is as you expected: because you read in the files with the ```textFile()``` function, the lines are just all read in together. The entries are separated by a single comma and the rows themselves are also separated by a comma:

In [8]:
rdd.take(2)

['-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000',
 '-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']

You definitely need to solve this. Now, you don’t need to split the entries, but you definitely need to make sure that the rows of your data are separate elements. To solve this, you’ll use the ```map()``` function to which you pass a lambda function to split the line at the comma. Then, check your result by running the same line with the ```take()``` method, just like you did before:

Remember that `lambda` functions are anonymous functions which are created at runtime.

In [9]:
rdd = rdd.map(lambda line: line.split(","))

In [10]:
rdd.take(2)

[['-122.230000',
  '37.880000',
  '41.000000',
  '880.000000',
  '129.000000',
  '322.000000',
  '126.000000',
  '8.325200',
  '452600.000000'],
 ['-122.220000',
  '37.860000',
  '21.000000',
  '7099.000000',
  '1106.000000',
  '2401.000000',
  '1138.000000',
  '8.301400',
  '358500.000000']]

If you’re used to working with Pandas or data frames in R, you’ll have probably also expected to see a header, but there is none. To make your life easier, you will move on from the RDD and convert it to a DataFrame. Dataframes are preferred over RDDs whenever you can use them. Especially when you’re working with Python, the performance of DataFrames is better than RDDs.

But what is the difference between the two?

You can use RDDs when you want to perform low-level transformations and actions on your unstructured data. This means that you don’t care about imposing a schema while processing or accessing the attributes by name or column. Tying in to what was said before about performance, by using RDDs, you don’t necessarily want the performance benefits that DataFrames can offer for (semi-)structured data. Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.

To recapitulate, you’ll switch to DataFrames now to use high-level expressions, to perform SQL queries to explore your data further and to gain columnar access.

So let’s do this.

The first step is to make a SchemaRDD or an RDD of Row objects with a schema. This is normal, because just like a DataFrame, you eventually want to come to a situation where you have rows and columns. Each entry is linked to a row and a certain column and columns have data types.

You’ll use the `map()` function again and another lambda function in which you’ll map each entry to a field in a Row. to do this consider the first line: 

`[u'-122.230000', u'37.880000', u'41.000000', u'880.000000', u'129.000000', u'322.000000', u'126.000000', u'8.325200', u'452600.000000']`

The lambda function says that you’re going to construct a row in a SchemaRDD and that the element at `index 0` will have the name “longitude”, and so on.

With this SchemaRDD in place, you can easily convert the RDD to a DataFrame with the `toDF()` method.


In [11]:
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

Now that you have your DataFrame df, you can inspect it with the methods that you have also used before, namely `first()` and `take()`, but also with `head()` and `show()`:

In [12]:
df.show()

+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|  longitude| latitude|housingMedianAge| totalRooms|totalBedRooms| population| households|medianIncome|medianHouseValue|
+-----------+---------+----------------+-----------+-------------+-----------+-----------+------------+----------------+
|-122.230000|37.880000|       41.000000| 880.000000|   129.000000| 322.000000| 126.000000|    8.325200|   452600.000000|
|-122.220000|37.860000|       21.000000|7099.000000|  1106.000000|2401.000000|1138.000000|    8.301400|   358500.000000|
|-122.240000|37.850000|       52.000000|1467.000000|   190.000000| 496.000000| 177.000000|    7.257400|   352100.000000|
|-122.250000|37.850000|       52.000000|1274.000000|   235.000000| 558.000000| 219.000000|    5.643100|   341300.000000|
|-122.250000|37.850000|       52.000000|1627.000000|   280.000000| 565.000000| 259.000000|    3.846200|   342200.000000|
|-122.250000|37.850000|       52

The data seems all nicely ordered into columns, but what about the data types? By reading in your data, Spark will try to infer a schema, but has this been successful here? Use either `df.dtypes` or `df.printSchema()` to get to know more about the data types that are contained within your DataFrame.

In [13]:
df.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housingMedianAge: string (nullable = true)
 |-- totalRooms: string (nullable = true)
 |-- totalBedRooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- medianIncome: string (nullable = true)
 |-- medianHouseValue: string (nullable = true)



All columns are still of data type string… 

If you want to continue with this DataFrame, you’ll need to rectify this situation and assign “better” or more accurate data types to all columns. Your performance will also benefit from this. Intuitively, you could go for a solution like the following, where you declare that each column of the DataFrame df should be cast to a `FloatType()`:

In [14]:
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

In [15]:
# Assign all column names to `columns`
columns = df.columns

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

Now that you’ve got that all sorted out, it’s time to really get started on the data exploration. You have seen that columnar access and SQL queries were two advantages of using DataFrames. Well, now it’s time to dig a little bit further into that. Let’s start small and just select two columns from df of which you only want to see 10 rows:

In [16]:
df.select('population','totalBedRooms').show(10)

+----------+-------------+
|population|totalBedRooms|
+----------+-------------+
|     322.0|        129.0|
|    2401.0|       1106.0|
|     496.0|        190.0|
|     558.0|        235.0|
|     565.0|        280.0|
|     413.0|        213.0|
|    1094.0|        489.0|
|    1157.0|        687.0|
|    1206.0|        665.0|
|    1551.0|        707.0|
+----------+-------------+
only showing top 10 rows



In [17]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housingMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedRooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



A more complex example..

In [18]:
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()

+----------------+-----+
|housingMedianAge|count|
+----------------+-----+
|            52.0| 1273|
|            51.0|   48|
|            50.0|  136|
|            49.0|  134|
|            48.0|  177|
|            47.0|  198|
|            46.0|  245|
|            45.0|  294|
|            44.0|  356|
|            43.0|  353|
|            42.0|  368|
|            41.0|  296|
|            40.0|  304|
|            39.0|  369|
|            38.0|  394|
|            37.0|  537|
|            36.0|  862|
|            35.0|  824|
|            34.0|  689|
|            33.0|  615|
+----------------+-----+
only showing top 20 rows



Besides querying, you can also choose to describe your data and get some summary statistics. This will most definitely help you after!

In [19]:
df['households', 'housingMedianAge', 'medianHouseValue', 'medianIncome', 'population'].describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+
|summary|       households|  housingMedianAge|  medianHouseValue|      medianIncome|        population|
+-------+-----------------+------------------+------------------+------------------+------------------+
|  count|            20640|             20640|             20640|             20640|             20640|
|   mean|499.5396802325581|28.639486434108527|206855.81690891474|3.8706710030346416|1425.4767441860465|
| stddev|382.3297528316098| 12.58555761211163|115395.61587441359|1.8998217183639696|  1132.46212176534|
|    min|              1.0|               1.0|           14999.0|            0.4999|               3.0|
|    max|           6082.0|              52.0|          500001.0|           15.0001|           35682.0|
+-------+-----------------+------------------+------------------+------------------+------------------+



# Data Preprocessing

You shouldn’t care about missing values; all zero values have been excluded from the data set.

You should probably standardize your data, as you have seen that the range of minimum and maximum values is quite big.

There are possibbly some additional attributes that you could add, such as a feature that registers the number of bedrooms per room or the rooms per household.

Your dependent variable is also quite big; To make your life easier, you’ll have to adjust the values slightly.

Preprocessing The Target Values

First, let’s start with the medianHouseValue, your dependent variable. To facilitate your working with the target values, you will express the house values in units of 100,000. That means that a target such as 452600.000000 should become 4.526:

In [20]:
# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
df.show(2)

+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|housingMedianAge|totalRooms|totalBedRooms|population|households|medianIncome|medianHouseValue|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
|  -122.23|   37.88|            41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|           4.526|
|  -122.22|   37.86|            21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|           3.585|
+---------+--------+----------------+----------+-------------+----------+----------+------------+----------------+
only showing top 2 rows



# Feature Engineering

Now that you have adjusted the values in medianHouseValue, you can also add the additional variables that you read about above. You’re going to add the following columns to the data set:

Rooms per household which refers to the number of rooms in households per block group;
Population per household, which basically gives you an indication of how many people live in households per block group; And
Bedrooms per room which will give you an idea about how many rooms are bedrooms per block group;

As you’re working with DataFrames, you can best use the `select()` method to select the columns that you’re going to be working with, namely totalRooms, households, and population. Additionally, you have to indicate that you’re working with columns by adding the `col()` function to your code. Otherwise, you won’t be able to do element-wise operations like the division that you have in mind for these three variables:

In [21]:
# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

Next, -and this is already forseeing an issue that you might have when you’ll standardize the values in your data set- you’ll also re-order the values. Since you don’t want to necessarily standardize your target values, you’ll want to make sure to isolate those in your data set.

In this case, you’ll need to do this by using the `select()` method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won’t be affected by the standardization.

Note also that this is the time to leave out variables that you might not want to consider in your analysis. In this case, let’s leave out variables such as longitude, latitude, housingMedianAge and totalRooms.

In [22]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

# Standardization

Now that you have re-ordered the data, you’re ready to normalize the data. Or almost, at least. There is just one more step that you need to go through: separating the features from the target variable. In essence, this boils down to isolating the first column in your DataFrame from the rest of the columns.

In this case, you’ll use the `map()` function that you use with RDDs to perform this action. You also see that you make use of the `DenseVector()` function. A dense vector is a local vector that is backed by a double array that represents its entry values. In other words, it's used to store arrays of values for use in PySpark.

Next, you go back to making a DataFrame out of the input_data and you re-label the columns by passing a list as a second argument. This list consists of the column names "label" and "features":

In [23]:
# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

Next, you can finally scale the data. You can use Spark ML to do this: this library will make machine learning on big data scalable and easy. You’ll find tools such as ML algorithms and everything you need to build practical ML pipelines. In this case, you don’t need to do that much preprocessing so a pipeline would maybe be overkill, but if you want to look into it, definitely consider visiting this [page](https://spark.apache.org/docs/latest/ml-pipeline.html)

The input columns are the features, and the output column with the rescaled that will be included in the `scaled_df` will be named "features_scaled":

In [24]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

# Inspect the result
scaled_df.show(2)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|4.526|[129.0,322.0,126....|[0.30623297630686...|
|3.585|[1106.0,2401.0,11...|[2.62553233949916...|
+-----+--------------------+--------------------+
only showing top 2 rows

