<a href="https://colab.research.google.com/github/mnshcodie/IIScEx_2021/blob/main/M4_AST_30_Big_Data_%26_PySpark_ML_B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Programme in Deep Learning (Foundations and Applications)
## A Program by IISc and TalentSprint
### Assignment 30: Big Data, Cassandra and PySpark ML


## Learning Objectives

At the end of the experiment, you will be able to

* understand the concept of NoSQL databases
* understand the type of NoSQL databases available, i.e. Cassandra
* implement Machine Learning using PySpark

### Introduction

NoSQL ('Not only SQL') is a non-relational or distributed database system. It has a dynamic schema, best suited for hierarchical data storage. It is horizontally scalable.

Whereas **SQL databases** are used for querying and manipulating structured query language (SQL) and are very powerful and versatile, they also have the disadvantage of being restrictive: SQL requires the usage of predefined schemas to determine the structure of the data before you work with it; Also all of your data must follow the same structure. This can require significant prior preparation, which offers challenges.

A **NoSQL database** has dynamic schema for unstructured data. It stores data in many ways, such as document-oriented, column-oriented, graph-based or organized as a KeyValue. This flexibility means that documents can be created without having a prior defined structure. Also, each document can have its own unique structure. The syntax varies from database to database, and you can add fields as you go.

Here are some key comparisons between SQL and NoSQL

- **Scalability**: SQL databases are vertically scalable. This allows an increase in load on a single server by increasing the RAM, CPU, or SSD; NoSQL databases are horizontally scalable i.e. it can handle more traffic by sharding, or adding more servers in your NoSQL database
- **Structure**: SQL databases are table-based. NoSQL databases are either key-value pairs, document-based, graph databases, or wide-column stores. Relational SQL databases are a better option for applications that require multi-row transactions such as an accounting system.
- **Property**: SQL databases follow ACID properties (Atomicity, Consistency, Isolation and Durability) whereas the NoSQL databases follow the Brewers CAP theorem (Consistency, Availability and Partition tolerance). 
- **Examples**: Examples of SQL databases include PostgreSQL, MySQL, Oracle and Microsoft SQL Server. NoSQL database examples include Cassandra, MongoDB, BigTable, HBase, Neo4j and CouchDB.

**Why we need NoSQL databases?**

We see two primary reasons why people consider using a NoSQL database.
* Productivity in Application development: A lot of application development effort is spent on mapping data between in-memory data structures and a relational database. A NoSQL database may provide a data model that better fits the application’s needs, thus simplifying that interaction and resulting in less code to write, debug, and evolve.
* Large-scale data: Organizations are finding it valuable to capture more data and process it more quickly. They are finding it expensive, if even possible, to do so with relational databases. The primary reason is that a relational database is designed to run on a single machine, but it is usually more economic to run large data and computing loads on clusters of many smaller and cheaper machines. Many NoSQL databases are designed explicitly to run on clusters, so they make a better fit for big data scenarios.

**Terminology**

The basic terms related to NoSQL databases are as follows:

* **Big data:** a collection of data that is huge in volume, yet growing exponentially with time.
* **Polyglot persistent:** a term that refers to using different data stores in different circumstances.
* **Database cluster:** a collection of databases that is managed by a single instance of a running database server. 

### Types of NoSQL Databases

The following are the different types of NoSQL databases:

* **Document databases** pair each key with a complex data structure known as a document. A document is a set of key-value pairs. MongoDB is an example of a document store database. A group of MongoDB documents is known as a collection. This is the equivalent of an RDBMS table.

* **Graph stores** are used to store information about networks of data, for instance, social connections. Graph stores include Neo4J and Giraph.

* **Key-value stores** databases store every single item in the database as a key together with its value. Examples of key-value stores are Riak and Berkeley DB. Some key-value stores, such as Redis, allow each value to have a type, such as an integer, which adds functionality.

* **Wide-column** stores such as Cassandra and HBase are optimized for queries over large datasets, and store columns of data together, instead of rows.

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Images/Nosql_databases.png' />
</figure>

### Setup Steps:

In [None]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "" #@param {type:"string"}

In [None]:
#@title Please enter your password (normally your phone number) to continue: { run: "auto", display-mode: "form" }
password = "" #@param {type:"string"}

In [None]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()
  
notebook= "M4_AST_30_Big_Data_&_PySpark_ML_B" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch") 
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/DLFA/Experiment_related_data/cal_housing.data")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDS/Datasets/students.csv")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDS/MiniProjects/secure-connect-cds.zip")


    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")
    
    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:        
        print(r["err"])
        return None        
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None
    
    elif getAnswer1() and getAnswer2() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional, 
              "concepts" : Concepts, "record_id" : submission_id, 
              "answer1" : Answer1, "answer2" : Answer2, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:        
        print(r["err"])
        return None   
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://dlfa.iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id
    

def getAdditional():
  try:
    if not Additional: 
      raise NameError
    else:
      return Additional  
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None
  
def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None
  
  
# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None
  
def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None
  

def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer1():
  try:
    if not Answer1:
      raise NameError 
    else: 
      return Answer1
  except NameError:
    print ("Please answer Question 1")
    return None

def getAnswer2():
  try:
    if not Answer2:
      raise NameError 
    else: 
      return Answer2
  except NameError:
    print ("Please answer Question 2")
    return None
  

def getId():
  try: 
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup 
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup() 
else:
  print ("Please complete Id and Password cells before running setup")



### Importing required packages

In [None]:
import pandas as pd
from pprint import pprint

### Loading the data

In [None]:
students = pd.read_csv('students.csv')

### Cassandra

Apache Cassandra is a free, open-source, distributed, wide-column store, NoSQL database management system designed to handle large amounts of data across many commodity servers. It provides high availability with no single point of failure. It is a NoSQL database developed by Facebook. It is a great database that allows you to effectively run queries on a large amount of structured and semi-structured data.

Cassandra has three containers, one within another. The outermost container is Keyspace. You can think of Keyspace as a database in the RDBMS land. Next, you will see the column family, which is like a table. Within a column family are columns, and columns are placed under rows. Each row is identified by a unique row key, similar to the primary key in RDBMS.

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Datasets/cassandra-data-model.ppm' />
</figure>

The difference from RDBMS is in the way Cassandra treats the data. Column families, unlike tables, can be schema free (schema optional). This means we can have different column names for different rows within the same column family. We can store about two billion columns per row. This means it can be very handy to store time-series data, such as tweets or comments on a blog post.

Other than Cassandra, HBase is also a Wide-column store. For similarities and dissimilarities between them, refer [here](https://www.scnsoft.com/blog/cassandra-vs-hbase).

#### Components of Cassandra

Cassandra consists of the following components:

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Images/cassandra_cluster.jpg' width= 600 px/>
</figure>

**Node** is the place where data is stored. It is the basic component of Cassandra.

**Data Center** A collection of nodes is called a data center. Many nodes are categorized as a data center.

**Cluster** The cluster is the collection of many data centers.

**Commit Log** Every write operation is written to Commit Log. Commit log is used for crash recovery.

**Mem-table** After data written in Commit log, data is written in Mem-table. Data is written in Mem-table temporarily.

**SSTable** When Mem-table reaches a certain threshold, data is flushed to an SSTable disk file.

#### Data Replication

As hardware problems can occur or link can be down at any time during the data process, a solution is required to provide a backup when the problem has occurred. So data is replicated for assuring no single point of failure.

Cassandra places replicas of data on different nodes based on these two factors.
* Where to place the next replica is determined by the Replication Strategy.
* While the total number of replicas placed on different nodes is determined by the Replication Factor.

One Replication factor means that there is only a single copy of data while three replication factor means that there are three copies of the data on three different nodes.

To know more about data replication click [here](https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/architecture/archDataDistributeReplication.html#:~:text=and%20fault%20tolerance.-,A%20replication%20strategy%2A).

#### Installation

In [None]:
!pip install cassandra-driver

In [None]:
import cassandra
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

In [None]:
print(cassandra.__version__)

#### Connecting the database

**DataStax**

DataStax, Inc. is a data management company based in Santa Clara, California.
Its product provides commercial support, software, and cloud database-as-a-service based on open source NoSQL database Apache Cassandra.

We will be using its free tier version here.

**Important:** Datastax account and keyspace creation steps provided below are encouraged but not mandatory. It will allow you to create your own cluster, perform data insertion and code execution steps end-to-end using your own credentials. Note that we have already inserted the data and provided the cluster connection through the CDS account in Datastax for the purpose of running this notebook.

**For detailed instructions on account creation and keyspace creation**, please refer to this [document](https://cdn.iisc.talentsprint.com/CDS/DB_Connect_Docs/Instruction_for_Astra_Datastax_Database_Creation.pdf)

**Astra Datastax login:** Login to [Datastax](https://www.datastax.com/) and create a database


**Connect the database and create keyspace:**

* Download Secure Connect Bundle zip file from Datastax [connect](https://docs.datastax.com/en/astra/docs/obtaining-database-credentials.html) section. Follow the instructions on the page
* Upload the `Secure-connect-XXXX.zip` file, which is downloaded from Datastax.
* Generate the token and save the credentials (.csv) from settings section.
    * Hint: Select role as admin-user and generate token
* Using the credentials generated in settings, specify the `client Id` and `Client Secret` to the variables below.

Set the `Secure connect bundle zip file` path and specify the `clientID` and `Client_Secret`

In [None]:
zip_path = '/content/secure-connect-cds.zip'
Client_ID = 'SzdMZDsXLvXUQiHRsEogQgtR'
Client_Secret = 'SaYcoWaejFAx4CxXzuf1spOMRa+t1oyTd8Z+Medbuba1q0Ww5AY,1MOPNvrr9wWSnR82,IiQa4muFoF8OfOhxdndNXtZbuZsv.dSwKKccUaHr96B8-88gyAWGURFO2Wa'

#### Create a Cluster instance to connect to your Astra database

You will typically have one instance of Cluster for each Cassandra cluster you want to interact with. Create a session object using the cluster.

In [None]:
cloud_config= {
        'secure_connect_bundle': zip_path
}
auth_provider = PlainTextAuthProvider(Client_ID,  Client_Secret)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)

In [None]:
session = cluster.connect()

#### Verifying the database connection

In [None]:
# YOUR CODE HERE

#### Setting the Key Space in database

A keyspace is the top-level database object that controls the replication for the object it contains at each datacenter in the cluster. Keyspaces contain tables, materialized views and user-defined types, functions and aggregates. Typically, a cluster has one keyspace per application. Since replication is controlled on a per-keyspace basis, store data with different replication requirements (at the same datacenter) in different keyspaces.

Before creating tables and inserting data let us create and set the keyspace
* we can either create keyspace manually on Datastax dashboard or using the CQL command. 
[Hint](https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlCreateKeyspace.html)
* once the keyspace is created successfully, set the keyspace using the command `set_keyspace()`

In [None]:
try:
    session.set_keyspace('ast_student')
except Exception as e:
    print(e)

#### Insert the data into Database

In [None]:
# Display few rows of students dataframe
students.head()

For the following data insertion step we have already uploaded the data on Datastax using the CDS account, so you are not required to insert the data again. Therefore we have commented out the below lines of code.

However, if you would like to perform the data insertion step then please **create your own account** on Datastax as given in the reference [here](https://cdn.iisc.talentsprint.com/CDS/DB_Connect_Docs/Assignment_Datastax_Connect.pdf) and change the credentials and run the below code by uncommenting it.

**Create a column family in keyspace and insert the data using CQL command**

In [None]:
# Creating the students table
# query = """CREATE TABLE  IF NOT EXISTS students (studentID INT,
#                                                 name TEXT,
#                                                 age INT,
#                                                 marks INT,
#                                                 PRIMARY KEY (studentID)); """
# try:
#     session.execute(query)
# except Exception as e:
#     print(e)

In [None]:
#students_cols = ','.join(students.columns.values)
#for (i,row) in students.iterrows():
#    query = 'INSERT INTO ast_student.students ({}) VALUES (%s, %s, %s, %s)'.format(students_cols)
#    session.execute(query, tuple(row))

#### Querying the database

Select first 5 rows of the students table

In [None]:
query = 'SELECT * FROM ast_student.students LIMIT 5;'
rows=session.execute(query)
for row in rows:
    print(row)

Select the count of records where marks are above 85

In [None]:
# YOUR CODE HERE

#### Updating the database

**Uncomment and run the below line of code only if you are using your own credentials , to not affect original database given from DLFA account.**

Update the value of the marks to 98 in the document where studendID is 2.

In [None]:
# Query = 'UPDATE ast_student.students SET marks = 98 WHERE studentID = 2;'
# session.execute(Query)

# query = 'SELECT * FROM ast_student.students LIMIT 5;'
# rows = session.execute(query)
# for row in rows:
#     print(row)

To verify the tables in the keyspace below CQL command will be helpful.

In [None]:
query = "SELECT * FROM system_schema.tables WHERE keyspace_name = 'ast_student';"
rows = session.execute(query)
for row in rows:
    print(row[1])

#### Drop table

It is not advisable to delete a table but to reduce space we might sometimes need to delete tables from the Datastax database.

**Uncomment and run the below line of code only if you are using your own credentials , to not affect original database given from DLFA account.**

In [None]:
# query = "DROP TABLE IF EXISTS ast_student.students;"
# session.execute(query)

#### Close the session and cluster connection

In [None]:
# YOUR CODE HERE

### Machine Learning using PySpark
Predicting House Prices using California Housing Dataset

In this section, we'll make use of the California Housing data set. Note, of course, that this is actually 'small', but, the purpose of this notebook is meant to give you an idea of how we can use PySpark to build a machine learning model.

**Dataset Description** : The California Housing data set appeared in a 1997 paper titled Sparse Spatial Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. The researchers built this data set by using the 1990 California census data.

The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area.

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

`Longitude`:refers to the angular distance of a geographic place north or south of the earth’s equator for each block group

`Latitude` :refers to the angular distance of a geographic place east or west of the earth’s equator for each block group

`Housing Median Age`:is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values

`Total Rooms`:is the total number of rooms in the houses per block group

`Total Bedrooms`:is the total number of bedrooms in the houses per block group

`Population`:is the number of inhabitants of a block group

`Households`:refers to units of houses and their occupants per block group

`Median Income`:is used to register the median income of people that belong to a block group

`Median House Value`:is the dependent variable and refers to the median house value per block group


The Median house value is the dependent variable and will be assigned the role of the target variable in our ML model.

### Importing the required libraries and packages

In [None]:
!pip install pyspark

In [None]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import FloatType

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [None]:
# Setting random seed
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed

#### Creating the Spark Session

In [None]:
# YOUR CODE HERE

Creating Spark Context

In [None]:
sc = spark.sparkContext
sc

Creating SQL Context

In [None]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

#### Load The Data From the File

In [None]:
HOUSING_DATA = '/content/cal_housing.data'

Specifying the schema when loading data into a DataFrame will give better performance than schema inference.

In [None]:
# Define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)

In [None]:
# Load housing data
# YOUR CODE HERE

In [None]:
# Inspect first five rows
housing_df.take(5)

In [None]:
# Display first five rows
housing_df.show(5)

In [None]:
# Show the dataframe columns
housing_df.columns

In [None]:
# Show the schema of the dataframe
housing_df.printSchema()

### Data Exploration

In [None]:
# Run a sample selection
# YOUR CODE HERE

#### Distribution of the median age of the people living in the area

In [None]:
# Group by housing median age and see the distribution
# YOUR CODE HERE

In [None]:
result_df.show(10)

In [None]:
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))

Most of the residents are either in their youth or they settle here during their senior years. Some data are showing median age < 10 which seems to be out of place.

#### Summary Statistics
Spark DataFrames include some built-in functions for statistical processing. The describe() function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.

In [None]:
(housing_df.describe().select(
                    "summary",
                    F.round("medage", 4).alias("medage"),
                    F.round("totrooms", 4).alias("totrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv"))
                    .show())

Look at the minimum and maximum values of all the (numerical) attributes. We see that multiple attributes have a wide range of values: we will need to normalize the dataset.

### Data Preprocessing

With all this information that we gathered from our exploratory data analysis, we know enough to preprocess our data to feed it to the model.


* We should probably standardize our data, as we have seen that the range of minimum and maximum values is quite big.

* There are possibly some additional attributes that we could add, such as a feature that registers the number of bedrooms per room or the rooms per household.

* Our dependent variable is also large in value; To make it easier to work with it, we'll have to slightly adjust the values.

#### Preprocessing The Target Values

First, let's start with the medianHouseValue, our dependent variable. To facilitate our working with the target values, we will express the house values in units of 100,000. That means that a target such as 452600.000000 should become 4.526.

In [None]:
# Adjust the values of `medianHouseValue`
housing_df = housing_df.withColumn("medhv", col("medhv")/100000)

In [None]:
# Show the first 2 lines of `df`
housing_df.show(2)

We can clearly see that the values have been adjusted correctly when we look at the result of the show() method.

### Feature Engineering

Now that we have adjusted the values in medianHouseValue, we will now add the following columns to the data set:



*   Rooms per household which refers to the number of rooms in households per block group;

*   Population per household, which basically gives us an indication of how many people live in households per block group; And
*   Bedrooms per room which will give us an idea about how many rooms are bedrooms per block group;

As we're working with DataFrames, we can best use the select() method to select the columns that we're going to be working with, namely totalRooms, households, and population. Additionally, we have to indicate that we're working with columns by adding the col() function to our code. Otherwise, we won't be able to do element-wise operations like the division that we have in mind for these three variables.




In [None]:
housing_df.columns

In [None]:
# Add the new columns to `df`
# YOUR CODE HERE

In [None]:
# Inspect the result
housing_df.show(5)

We can see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14.

Since we don't want to necessarily standardize our target values, we'll want to make sure to isolate those in our data set. Note also that this is the time to leave out variables that we might not want to consider in our analysis. In this case, let's leave out variables such as longitude, latitude, housingMedianAge and totalRooms.

In this case, we will use the select() method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won't be affected by the standardization.

In [None]:
# Re-order and select columns
housing_df = housing_df.select("medhv", 
                              "totbdrms", 
                              "pop", 
                              "houshlds", 
                              "medinc", 
                              "rms_per_hh", 
                              "pop_per_hh", 
                              "bdrms_per_rm")

#### Feature Extraction

Now that we have re-ordered the data, we're ready to normalize the data. We will choose the features to be normalized.

In [None]:
featureCols = ["totbdrms", "pop", "houshlds", "medinc", "rms_per_hh", "pop_per_hh", "bdrms_per_rm"]

**Use a VectorAssembler to put features into a feature vector column**

In [None]:
# Put features into a feature vector column
# YOUR CODE HERE

In [None]:
assembled_df = assembler.transform(housing_df)

In [None]:
assembled_df.show(10, truncate=False)

All the features have transformed into a Dense Vector.



#### Standardization

Next, we can finally scale the data using StandardScaler. The input columns are the features, and the output column with the rescaled values that will be included in the scaled_df will be named "features_scaled".

In [None]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

In [None]:
# Fit the DataFrame to the scaler
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)

In [None]:
# Inspect the result
scaled_df.select("features", "features_scaled").show(10, truncate=False)

#### Building A Machine Learning Model With Spark ML

With all the preprocessing done, it's finally time to start building our Linear Regression model! Just like always, we first need to split the data into training and test sets. Luckily, this is no issue with the randomSplit() method:

In [None]:
# Split the data into train and test sets
# YOUR CODE HERE

We pass in a list with two numbers that represent the size that we want training and test sets to have including a seed.

Note that the argument elasticNetParam corresponds to  α  or the vertical intercept and that the regParam or the regularization paramater corresponds to  λ .

In [None]:
train_data.columns

**Create an ElasticNet model**

ElasticNet is a linear regression model trained with L1 and L2 prior as regularizer. Elastic-net is useful when there are multiple features which are correlated with one another. Lasso is likely to pick one of these at random, while elastic-net is likely to pick both.

In [None]:
# Initialize `lr`
# YOUR CODE HERE

In [None]:
# Fit the data to the model
# YOUR CODE HERE

### Evaluating the Model

With our model in place, we can generate predictions for our test data: use the transform() method to predict the labels for our test_data. Then, we can use RDD operations to extract the predictions as well as the true labels from the DataFrame.

#### Inspect the Model Co-efficients

In [None]:
# Coefficients for the model
linearModel.coefficients

In [None]:
featureCols

In [None]:
# Intercept for the model
linearModel.intercept

In [None]:
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + featureCols, "Co-efficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df = coeff_df[["Feature", "Co-efficients"]]

In [None]:
coeff_df

#### Generating Predictions

In [None]:
# Generate predictions
# YOUR CODE HERE

In [None]:
# Extract the predictions and the "known" correct labels
# YOUR CODE HERE

#### Inspect the Metrics

Looking at predicted values is one thing, but another and better thing is looking at some metrics to get a better idea of how good our model actually is.

Here, we will use the LinearRegressionModel.summary attribute.

Next, we can also use the summary attribute to pull up the rootMeanSquaredError and the r2.

In [None]:
# Get the RMSE
print("RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))

In [None]:
print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))

In [None]:
# Get the R2
print("R2: {0}".format(linearModel.summary.r2))

**Using the RegressionEvaluator from pyspark.ml package**

In [None]:
# Get the RMSE
# YOUR CODE HERE

In [None]:
# Get the MAE
# YOUR CODE HERE

In [None]:
# Get the R2
# YOUR CODE HERE

**Using the RegressionMetrics from pyspark.mllib package**



In [None]:
# mllib is old that is why the methods are available in rdd
metrics = RegressionMetrics(predandlabels.rdd)

In [None]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))

In [None]:
print("MAE: {0}".format(metrics.meanAbsoluteError))

In [None]:
print("R2: {0}".format(metrics.r2))

There's definitely some improvements needed to our model! If we want to continue with this model, we can play around with the parameters that we passed to the model, the variables that we included in the original DataFrame.

In [None]:
spark.stop()

### Please answer the questions below to complete the experiment:




What is a NoSQL Database?

A. NoSQL is also referred as Not only SQL to emphasize that they may support SQL-like query language used in relational database.

B. NoSQL database provides a mechanism to store and retrieve data, which are modeled rather than the tabular relations used in Relational databases.

C. There are majorly 4 types of NoSQL Databases,


*   Key Value Store
*   Document Store
*   Column Store
*   Graph Databases

In [None]:
#@title Q.1. Which is TRUE for a NoSQL?
Answer1 = "" #@param ["","Only A and B","Only B and C","Only A nand C", "A, B, C"]


Consider the statements given below:

P: The system allows operations all the time, and operations return quickly

Q: All nodes see same data at any time, or read operations return latest written value by any client

R: The system continues to work in spite of network partitions

In [None]:
#@title Q.2. Identify the correct choices for the above given statements:
Answer2 = "" #@param ["","P: Consistency, Q: Availability, R: Partition tolerance","P: Availability, Q: Consistency, R: Partition tolerance","P: Partition tolerance, Q: Consistency, R: Availability","P: Consistency, Q: Partition tolerance, R: Availability"]


In [None]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [None]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "" #@param {type:"string"}


In [None]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "" #@param ["","Yes", "No"]


In [None]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [None]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")