# Linear Regression - Apache Spark

This example contains a demo of using Spark's Linear Regression algorithm along with the Vertica database. 

Old Faithful is a geyser that sits in Yellowstone National Park. Using Linear Regression we want to train a model that can predict how long an eruption will be based off the time taken between eruptions.

## Spark Setup

First we start with the basics of setting up Spark to work with Vertica. To do this we need to create a Spark Context that has the Spark Connector passed through it as a configuration option. 

In [1]:
# Get Connector JAR name
import glob
import os

files = glob.glob("/project/notebooks/spark/spark-vertica-connector-assembly-*")
os.environ["CONNECTOR_JAR"] = files[0]
print(os.environ["CONNECTOR_JAR"])

/project/workspace/spark-vertica-connector-assembly-3.3.5.jar


In [2]:
# Create the Spark session and context
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .config("spark.master", "spark://spark:7077")
    .config("spark.driver.memory", "2G")
    .config("spark.executor.memory", "1G")
    .config("spark.jars", os.environ["CONNECTOR_JAR"])
    .getOrCreate())
sc = spark.sparkContext

23/01/31 21:53:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Import Data

Our Faithful dataset has been randomly split up into two. One for training the model and one for testing it. Both sets are stored in a local .csv, so let's open them and copy them. We can then write each one to Vertica to their respective tables "faithful_training" and "faithful_testing."\
Normally when performing training and testing in ML, we start one with one full dataset and use a function that randomly splits up the dataset. In our case however, we want the datasets the same across Vertica examples to be consistent with our training and results.

In [4]:
# Load the training set from a CSV file into a dataframe and show some if its contents
df = spark.read.options(header="true", inferschema="true").csv("/project/data/old_faithful/faithful_training.csv")
df.printSchema()
df.show()  # So that we can see a bit of what our training dataset looks like and what the schema is

# Write the training data into a table in Vertica
df.write.mode("overwrite").format("com.vertica.spark.datasource.VerticaSource").options(
    host="vertica-demo",
    user="dbadmin",
    password="",
    db="demo",
    table="faithful_training",
    staging_fs_url="webhdfs://hdfs:50070/linearregression").save()

# Do the same write for the testing set
df = spark.read.options(header="true", inferschema="true").csv("/project/data/old_faithful/faithful_testing.csv")
df.write.mode("overwrite").format("com.vertica.spark.datasource.VerticaSource").options(
    host="vertica-demo",
    user="dbadmin",
    password="",
    db="demo",
    table="faithful_testing",
    staging_fs_url="webhdfs://hdfs:50070/linearregression").save()

print("Data of the Old Faithful geyser in Yellowstone National Park.")
print("eruptions = duration of eruption \nwaiting = time between eruptions")

23/01/31 21:55:24 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4) (172.30.0.3 executor 0): java.io.FileNotFoundException: 
File file:/project/workspace/faithful_training.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expr

Py4JJavaError: An error occurred while calling o41.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 7) (172.30.0.3 executor 0): java.io.FileNotFoundException: 
File file:/project/workspace/faithful_training.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:112)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:65)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:537)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: 
File file:/project/workspace/faithful_training.csv does not exist

It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:212)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


## Read Data

Now that our data is saved in Vertica. We can read from both tables and store them once again in a Spark DF for processing using PySpark's ML toolkit.

In [4]:
# Read our training data from Vertica into a Spark dataframe
df_training = spark.read.load(format="com.vertica.spark.datasource.VerticaSource",
    host="vertica-demo",
    user="dbadmin",
    password="",
    db="demo",
    table="faithful_training",
    staging_fs_url="webhdfs://hdfs:50070/linearregression")

# Read our testing data from Vertica into a Spark dataframe
df_testing = spark.read.load(format="com.vertica.spark.datasource.VerticaSource",
    host="vertica-demo",
    user="dbadmin",
    password="",
    db="demo",
    table="faithful_testing",
    staging_fs_url="webhdfs://hdfs:50070/linearregression")

## Select Features

Linear Regression analyzes the relationship between an independant and dependant variable using a line of best fit. The dependant variable (eruptions) is what we are trying to predict, whereas the independant variables consists of our features that we are using to make our model. In this case we just have the one variable "waiting", and this will compose our features array.

In [5]:
# Import Spark's ML Regression tool
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Spark's Linear Regression tool requires an array of the features we want to use. Since we only have one in this case, we add "waiting"
featureassembler = VectorAssembler(inputCols = ["waiting"], outputCol = "features")

# Show our new table with a features column added. We are also going to do the same with the testing table so we can compare our results later.
df_testing = featureassembler.transform(df_testing)
df_training = featureassembler.transform(df_training)
df_training.show()

+---+---------+-------+--------+
| id|eruptions|waiting|features|
+---+---------+-------+--------+
|  1|      3.6|     79|  [79.0]|
|  2|      1.8|     54|  [54.0]|
|  3|    3.333|     74|  [74.0]|
|  6|    2.883|     55|  [55.0]|
|  7|      4.7|     88|  [88.0]|
| 10|     4.35|     85|  [85.0]|
| 13|      4.2|     78|  [78.0]|
| 15|      4.7|     83|  [83.0]|
| 16|    2.167|     52|  [52.0]|
| 17|     1.75|     62|  [62.0]|
| 18|      4.8|     84|  [84.0]|
| 19|      1.6|     52|  [52.0]|
| 21|      1.8|     51|  [51.0]|
| 25|    4.533|     74|  [74.0]|
| 27|    1.967|     55|  [55.0]|
| 28|    4.083|     76|  [76.0]|
| 29|     3.85|     78|  [78.0]|
| 32|    4.467|     77|  [77.0]|
| 33|    3.367|     66|  [66.0]|
| 34|    4.033|     80|  [80.0]|
+---+---------+-------+--------+
only showing top 20 rows



## Train Model

We can now train our model against our training set. We specify our new features column as well as our target "eruptions."

In [6]:
# Create our model using the features to predict eruption duration and fit it against our training set
lr = LinearRegression(maxIter=100, regParam=0.01, elasticNetParam=1, solver="l-bfgs", featuresCol="features", labelCol="eruptions")
lr = lr.fit(df_training)

## Test Model & Results

Our test data comprises of the missing eruption points in "faithful_training." We are now going to use this dataset and compare it against our model to see how the predictions stack up. From there we also want to evaluate the model and see some statistics to show how our algorithm holds up.

In [7]:
testing_predictions = lr.transform(df_testing)
testing_predictions.select("id", "eruptions","prediction","features").show(20)

test_result = lr.evaluate(df_testing)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

+---+---------+------------------+--------+
| id|eruptions|        prediction|features|
+---+---------+------------------+--------+
|  4|    2.283| 2.821895815690561|  [62.0]|
|  5|    4.533| 4.615893870217556|  [85.0]|
|  8|      3.6| 4.615893870217556|  [85.0]|
|  9|     1.95|1.9638967461341719|  [51.0]|
| 11|    1.833| 2.197896492376823|  [54.0]|
| 12|    3.917| 4.537893954803338|  [84.0]|
| 14|     1.75|1.6518970844773033|  [47.0]|
| 20|     4.25| 4.147894377732253|  [79.0]|
| 22|     1.75|1.6518970844773033|  [47.0]|
| 23|     3.45| 4.069894462318035|  [78.0]|
| 24|    3.067|3.3678952235900805|  [69.0]|
| 26|      3.6| 4.459894039389121|  [83.0]|
| 30|    4.433| 4.147894377732253|  [79.0]|
| 31|      4.3|3.6798948852469495|  [73.0]|
| 35|    3.833| 3.757894800661167|  [74.0]|
| 38|    4.833| 4.225894293146469|  [80.0]|
| 42|    1.883| 2.509896154033692|  [58.0]|
| 44|     1.75| 2.509896154033692|  [58.0]|
| 47|    3.833| 2.977895646518995|  [64.0]|
| 49|    4.633| 4.38189412397490

**RMSE** is the average deviation of the dependant variables to the regression line. \
As such, a value closer to 0 means there is less deviation and therefore less error. Given our unit dimensions (minutes) An RMSE under 0.5 means the model can likely predict values accurately.