# Linear Regression

This example contains a demo of using Spark's Linear Regression algorithm along with the Vertica database. 

Old Faithful is a geyser that sits in Yellowstone National Park. Using Linear Regression we want to train a model that can predict how long an eruption will be based off it's feature value, the time taken between eruptions.

## Spark Setup

First we start with the basics of setting up Spark to work with Vertica. To do this we need to create a Spark Context that has the Spark Connector passed through it as a configuration option.

In [1]:
# Get Connector JAR name
import glob
import os

files = glob.glob("/spark-connector/connector/target/scala-2.12/spark-vertica-connector-assembly-*")
os.environ["CONNECTOR_JAR"] = files[0]
print(os.environ["CONNECTOR_JAR"])

/spark-connector/connector/target/scala-2.12/spark-vertica-connector-assembly-3.3.5.jar


In [2]:
# Create the Spark session and context
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .config("spark.master", "spark://spark:7077")
    .config("spark.driver.memory", "2G")
    .config("spark.executor.memory", "1G")
    .config("spark.jars", os.environ["CONNECTOR_JAR"])
    .getOrCreate())
sc = spark.sparkContext

## Import Data

Our Faithful dataset has been randomly split up into two. One for training the model and one for testing it. Let's use Spark's native .csv reader to load these up into DataFrames. Then we can write each one to Vertica to their respective tables "faithful_training" and "faithful_testing."

In [3]:
# Load the training set from a CSV file into a dataframe and show some if its contents
df = spark.read.options(header="true", inferschema="true").csv("/spark-connector/examples/jupyter/data/faithful_training.csv")
df.printSchema()
df.show() # So that we can see a bit of what our training dataset looks like and what the schema is

# Write the data into a table in Vertica
df.write.mode("overwrite").format("com.vertica.spark.datasource.VerticaSource").options(
    host="vertica",
    user="dbadmin",
    password="",
    db="docker",
    table="faithful_training",
    staging_fs_url="webhdfs://hdfs:50070/linearregression").save()

# Do the same for the testing set
df = spark.read.options(header="true", inferschema="true").csv("/spark-connector/examples/jupyter/data/faithful_testing.csv")
df.write.mode("overwrite").format("com.vertica.spark.datasource.VerticaSource").options(
    host="vertica",
    user="dbadmin",
    password="",
    db="docker",
    table="faithful_testing",
    staging_fs_url="webhdfs://hdfs:50070/linearregression").save()

print("Data of the Old Faithful geyser in Yellowstone National Park.")
print("eruptions = duration of eruption \nwaiting = time between eruptions")

root
 |-- id: integer (nullable = true)
 |-- eruptions: double (nullable = true)
 |-- waiting: integer (nullable = true)

+---+---------+-------+
| id|eruptions|waiting|
+---+---------+-------+
|  1|      3.6|     79|
|  2|      1.8|     54|
|  3|    3.333|     74|
|  6|    2.883|     55|
|  7|      4.7|     88|
| 10|     4.35|     85|
| 13|      4.2|     78|
| 15|      4.7|     83|
| 16|    2.167|     52|
| 17|     1.75|     62|
| 18|      4.8|     84|
| 19|      1.6|     52|
| 21|      1.8|     51|
| 25|    4.533|     74|
| 27|    1.967|     55|
| 28|    4.083|     76|
| 29|     3.85|     78|
| 32|    4.467|     77|
| 33|    3.367|     66|
| 34|    4.033|     80|
+---+---------+-------+
only showing top 20 rows

Data of the Old Faithful geyser in Yellowstone National Park.
eruptions = duration of eruption 
waiting = time between eruptions


## Read Data

Now that our data is saved in Vertica. We can read from both tables and store them once again in a Spark DF for processing using PySpark's ML toolkit.

In [4]:
# Read our data from Vertica into a Spark dataframe
df_training = spark.read.load(format="com.vertica.spark.datasource.VerticaSource",
    host="vertica",
    user="dbadmin",
    password="",
    db="docker",
    table="faithful_training",
    staging_fs_url="webhdfs://hdfs:50070/linearregression")

df_testing = spark.read.load(format="com.vertica.spark.datasource.VerticaSource",
    host="vertica",
    user="dbadmin",
    password="",
    db="docker",
    table="faithful_testing",
    staging_fs_url="webhdfs://hdfs:50070/linearregression")

## Select Features

Linear Regression analyzes the relationship between an independant and dependant variable using a line of best fit. The dependant variable (eruptions) is what we are trying to predict, whereas the independant variables consists of our features that we are using to make our model. In this case we just have the one variable "waiting", and this will compose our features array.

In [5]:
# Import Spark's ML Regression tool
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Spark's Linear Regression tool requires an array of the features we want to use. Since we only have one in this case, we add "waiting"
featureassembler = VectorAssembler(inputCols = ["waiting"], outputCol = "features")

# Show our new table with a features column added. We are also going to do the same with the testing table so we can compare our results later.
df_testing = featureassembler.transform(df_testing)
df_training = featureassembler.transform(df_training)
df_training.show()

+---+---------+-------+--------+
| id|eruptions|waiting|features|
+---+---------+-------+--------+
|  1|      3.6|     79|  [79.0]|
|  2|      1.8|     54|  [54.0]|
|  3|    3.333|     74|  [74.0]|
|  6|    2.883|     55|  [55.0]|
|  7|      4.7|     88|  [88.0]|
| 10|     4.35|     85|  [85.0]|
| 13|      4.2|     78|  [78.0]|
| 15|      4.7|     83|  [83.0]|
| 16|    2.167|     52|  [52.0]|
| 17|     1.75|     62|  [62.0]|
| 18|      4.8|     84|  [84.0]|
| 19|      1.6|     52|  [52.0]|
| 21|      1.8|     51|  [51.0]|
| 25|    4.533|     74|  [74.0]|
| 27|    1.967|     55|  [55.0]|
| 28|    4.083|     76|  [76.0]|
| 29|     3.85|     78|  [78.0]|
| 32|    4.467|     77|  [77.0]|
| 33|    3.367|     66|  [66.0]|
| 34|    4.033|     80|  [80.0]|
+---+---------+-------+--------+
only showing top 20 rows



## Train Model

We can now train our model against our training set. We specify our new features column as well as our target "eruptions."

In [6]:
# Create our model using the features to predict eruption duration and fit it against our training set
lr = LinearRegression(maxIter=10, regParam=0.01, elasticNetParam=1, featuresCol="features", labelCol="eruptions")
lr = lr.fit(df_training)


training_predictions = lr.evaluate(df_training)
training_predictions.predictions.show() # Show our new table with the predictions

+---+---------+-------+--------+------------------+
| id|eruptions|waiting|features|        prediction|
+---+---------+-------+--------+------------------+
|  1|      3.6|     79|  [79.0]| 4.147894377732253|
|  2|      1.8|     54|  [54.0]| 2.197896492376823|
|  3|    3.333|     74|  [74.0]| 3.757894800661167|
|  6|    2.883|     55|  [55.0]|2.2758964077910404|
|  7|      4.7|     88|  [88.0]| 4.849893616460207|
| 10|     4.35|     85|  [85.0]| 4.615893870217556|
| 13|      4.2|     78|  [78.0]| 4.069894462318035|
| 15|      4.7|     83|  [83.0]| 4.459894039389121|
| 16|    2.167|     52|  [52.0]| 2.041896661548389|
| 17|     1.75|     62|  [62.0]| 2.821895815690561|
| 18|      4.8|     84|  [84.0]| 4.537893954803338|
| 19|      1.6|     52|  [52.0]| 2.041896661548389|
| 21|      1.8|     51|  [51.0]|1.9638967461341719|
| 25|    4.533|     74|  [74.0]| 3.757894800661167|
| 27|    1.967|     55|  [55.0]|2.2758964077910404|
| 28|    4.083|     76|  [76.0]| 3.913894631489601|
| 29|     3.

## Test Model & Results

Our test data compromises of the missing eruption points in "faithful_training." We are now going to use this dataset and compare it against our model to see how the predictions stack up. From there we also want to evaluate the model and see some statistics to show how our algorithm holds up.

In [7]:
testing_predictions = lr.transform(df_testing)
testing_predictions.select("id", "eruptions","prediction","features").show(20)

test_result = lr.evaluate(df_testing)
print("R Squared (R2) on test data = %g" % test_result.r2)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

+---+---------+------------------+--------+
| id|eruptions|        prediction|features|
+---+---------+------------------+--------+
|  4|    2.283| 2.821895815690561|  [62.0]|
|  5|    4.533| 4.615893870217556|  [85.0]|
|  8|      3.6| 4.615893870217556|  [85.0]|
|  9|     1.95|1.9638967461341719|  [51.0]|
| 11|    1.833| 2.197896492376823|  [54.0]|
| 12|    3.917| 4.537893954803338|  [84.0]|
| 14|     1.75|1.6518970844773033|  [47.0]|
| 20|     4.25| 4.147894377732253|  [79.0]|
| 22|     1.75|1.6518970844773033|  [47.0]|
| 23|     3.45| 4.069894462318035|  [78.0]|
| 24|    3.067|3.3678952235900805|  [69.0]|
| 26|      3.6| 4.459894039389121|  [83.0]|
| 30|    4.433| 4.147894377732253|  [79.0]|
| 31|      4.3|3.6798948852469495|  [73.0]|
| 35|    3.833| 3.757894800661167|  [74.0]|
| 38|    4.833| 4.225894293146469|  [80.0]|
| 42|    1.883| 2.509896154033692|  [58.0]|
| 44|     1.75| 2.509896154033692|  [58.0]|
| 47|    3.833| 2.977895646518995|  [64.0]|
| 49|    4.633| 4.38189412397490

Residuals are a measure of how close our data points are to our regression line.\
R Squared is a statistical measure using residuals that gives us a percentage of variance in the dependant variable.\
RMSE is the root of the average error of these residuals. In general, an RMSE between 0.2 and 0.5 means the model can predict values accurately.