# Spark In Memory

### Introduction

If we think about our normal understanding of databases, our databases have two components:

1. storage and
2. processing

That is, we both store data in our databases, and we can perform operations on that data.  

As we'll see, with Spark we're mainly concerned with *processing our data*, and not with long term storage.  Instead, our data is stored to disk outside of the database, on a service like S3.  This has vast implications for the way we treat and value data in an organization, as well as the operations we can perform on that data. 

### Diving In

Let's begin by just diving into Spark to see it in action.  From there we'll get a sense of how it operates differently than our previous databases.

> As we move along here, the goal is not to remember details of the steps.  We'll go through each of them in repeated detail later on.

So first we'll need to install some libraries for reading data from `s3`, and then we'll install Spark itself.

In [None]:
# !/Users/jeff/opt/miniconda3/envs/data_engineering/bin/python3.8 -m pip install --upgrade pip

In [1]:
!pip install fsspec --quiet
!pip install s3fs --quiet
!pip install pyspark --quiet

> Then we run the following to connect to our spark cluster.

In [4]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
 
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("films").setMaster("local[2]")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

Finally, we'll read in data from s3 using pandas.

In [6]:
import pandas as pd
# df = pd.read_csv("s3://jigsaw-labs-student/imdb_movies.csv")
df = pd.read_csv('movies.csv')
df

Unnamed: 0.1,Unnamed: 0,title,genre,budget,runtime,year,month,revenue
0,0,Avatar,Action,237000000,162.0,2009,12,2787965087
1,1,Pirates of the Caribbean: At World's End,Adventure,300000000,169.0,2007,5,961000000
2,2,Spectre,Action,245000000,148.0,2015,10,880674609
3,3,The Dark Knight Rises,Action,250000000,165.0,2012,7,1084939099
4,4,John Carter,Action,260000000,132.0,2012,3,284139100
...,...,...,...,...,...,...,...,...
1995,1995,Pitch Black,Thriller,23000000,108.0,2000,2,53187659
1996,1996,Someone Like You...,Comedy,23000000,97.0,2001,3,0
1997,1997,Her,Romance,23000000,126.0,2013,12,47351251
1998,1998,Joy Ride,,23000000,97.0,2001,10,36642838


And from there, we'll turn this dataframe into the equivalent of table in spark.

In [7]:
movies_df = spark.createDataFrame(df.astype(str))

In [9]:
movies_df.take(1)

Py4JJavaError: An error occurred while calling o87.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (infra04-wg011.eduroam.wireless.umn.edu executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 540, in main
    raise RuntimeError(
RuntimeError: Python in worker has different version 3.10 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 540, in main
    raise RuntimeError(
RuntimeError: Python in worker has different version 3.10 than that in driver 3.8, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


### Spark stores data in memory

Ok, so now let's think about what happened above.  We just connected to our database, created the equivalent of a table to store our movies into, loaded in our data and then selected our first record.  

This is a standard workflow in Spark.  We'll use an external resource like S3 for long term storage of our data.  And then we can quickly load in and query our data in Spark.  And unlike previous databases Spark, is primarily leaning on *memory* to process and store our data, as opposed to storing our data on *disk*.

We can visualize our workflow in Spark like so.

> <img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/s3_to_movies.jpg?raw=1" width="60%">

> One thing to clear up is that even though we'll use S3 to store our data, Spark will also read in and store that data as well.  But Spark will mainly store our data *in memory*.  And when we are done querying our data, we can shutdown the cluster, which deletes the data from Spark.  And when we perform another query, we'll just read in the data again.

Finally, we should know that Spark is organized as a cluster.  So when Spark reads our data into memory, it can partition it onto different machines in the cluster.

<img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/spark_cluster_partition.jpg?raw=1" width="80%">

### Contrasting with Postgres

So let's take a moment just to appreciate how, by (1) using an external resource for long term storage, and (2) storing data in memory within the cluster, Spark operates differently than other databases like postgres.

With postgres, we store our data within the database and associated records are stored in a file.  When we perform an operation, like selecting records from a table, we then load data into memory and perform any transformations and return the result of that query to the user.

<img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/db_both.jpg?raw=1" width="40%">

As we saw, by the time we load our data into Spark, these records, and our associated tables are primarily in memory.

### The Benefits of In Memory Storage 

So we just saw some of the architecture when working in Spark.  We use an external resource like S3 for long term storage, read our data into Spark which primarily stores our data in memory as opposed to storing the data on disk.  Now let's get into some of the benefits of this architecture.

1. Cheaper storage

The first benefit is simply that it costs less money if we store our data externally on something like S3 than storing our data inside of a database on disk.  When our data is on S3, our data is simply written to disk, and there is minimal software running along with it.  By contrast, when we store our data in a database like postgres or redshift, for each node we have running, we are not just paying for the data storage but also a database with the capability to process that data.

Because it's cheaper to store data in S3, we can store data that has relatively low value, or even unknown value.  And if a data scientist or data engineer can extract value from it later on, that data will be available.

2. Schema on Read

Not only is the data available, but with storage in memory, it's easier to explore and process this data.  For example, we saw above that it was relatively easy to load in and query our data in Spark.

In [None]:
import pandas as pd
df = pd.read_csv("s3://jigsaw-labs/imdb_movies.csv")
movies_df = spark.createDataFrame(df.astype(str))
movies_df.take(1)

[Row(title='Avatar', genre='Action', budget='237000000', runtime='162.0', year='2009', month='12', revenue='2787965087')]

Contrast this with loading external data into our standard databases that store data on disk (eg. Postgres, MySQL).  There, to load in data we (1) need to create our tables, (2) transform our data to and (3) insert records into those tables.  With Spark, we can let the software determine the schema that fits to the data.  This is called `schema on read`, as the schema is determined when we read in the data.

Also notice that we did not need to really transform the data before loading it into Spark.  For this reason, we do not need to follow the standard ETL pattern when loading in our data.  Instead we can first load our data, and then use Spark to transform it.  This is called Extract Load Transform (or ELT).

3. Memory intensive computations

The third benefit of memory storage really comes from the perspective of data science.  Performing certain computations in machine learning requires having a large amount of data accessible for computation, and having our entire dataset loaded in memory means that the data will be accessible.

### But there's still local some storage

Even though Spark performs much of it's computation in memory, and primarily uses in memory storage, Spark nodes still do use local disks for data that does not fit into RAM, and to store intermediate output in a complex operation.

> But this storage on disk is quite minor as compared to most databases.  

So let's update our diagram to more accurately reflect the hardware spark.

> <img src='https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/spark_cluster_disk.jpg?raw=1' width="90%">

So even though there is technically in local storage to disk Spark, what distinguishes Spark is it's reliance on in memory storage and computation.

### Summary

In this lesson, we saw that with Spark we store our data externally in something like an S3 bucket, and then load this data into memory when used by Spark.  If we need additional memory, we add more nodes to our Spark cluster.

Because Spark is primarily uses memory storage, this means that we are able to store data with low value, or unknown value.  And reading our data into memory means that we do not need to first create our data, transform our data and then load our data into these tables (ETL) but rather can load and then transform ouor data.  Finally, we saw that for machine learning, Spark is useful for memory intensive calculations. 