# Spark Architecture and Applications 

![](https://miro.medium.com/max/1400/1*arBqq7O7umskV4O7JjhdrA.jpeg)

# /etc/rc.d/rc.sysinit

# Install pyspark locally

### Download Spark 
```bash
wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
tar xzf spark-3.5.1-bin-hadoop3.tgz
ln -s spark-3.5.1-bin-hadoop3 spark
```


### Install pyspark/findspark 
```bash
pip install pyspark findspark
```


### Test on bash
```bash
export SPARK_HOME=/home/tap/spark
$SPARK_HOME/bin/run-example SparkPi
```


### Test on jupyter
```bash
export SPARK_HOME=/home/tap/spark
Run Jupyter
```


In [1]:
import findspark
import pyspark
conf = pyspark.SparkConf().setAppName('Tap').setMaster('local[8]')
sc = pyspark.SparkContext(conf=conf)
sc

24/05/21 12:22:28 WARN Utils: Your hostname, pappanics resolves to a loopback address: 127.0.1.1; using 192.168.45.65 instead (on interface ens160)
24/05/21 12:22:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/21 12:22:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Spark Variables

![](https://programmerhumor.io/wp-content/uploads/2023/02/programmerhumor-io-programming-memes-20822be2f46de63-608x776.png)

## Shared Variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. 

These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. 

Supporting general, read-write shared variables across tasks would be inefficient.

However, Spark does provide two limited types of shared variables for two common usage patterns: **broadcast variables** and **accumulators**.

## Broadcast

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 

They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. 

Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. 

Spark automatically broadcasts the common data needed by tasks within each stage. 

The data broadcasted this way is cached in serialized form and deserialized before running each task. 

This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

In [2]:
euroRate = sc.broadcast(1.09)
euroRate.value

1.09

In [3]:
import random
#Generate 5 random numbers between 1 and 100
sales = random.sample(range(1, 100), 10)
salesRDD=sc.parallelize(sales)
salesRDD.collect()

[16, 56, 50, 71, 26, 65, 77, 34, 81, 86]

In [4]:
# Use euroRate to get tle
salesRDD.map(lambda x: str(round(x*euroRate.value,2))+"$").collect()

                                                                                

['17.44$',
 '61.04$',
 '54.5$',
 '77.39$',
 '28.34$',
 '70.85$',
 '83.93$',
 '37.06$',
 '88.29$',
 '93.74$']

In [5]:
salesRDD.reduce(lambda a,b:(a+b)*euroRate.value)

808.4306706424289

### Notes
After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

To release the resources that the broadcast variable copied onto executors, call .unpersist(). If the broadcast is used again afterwards, it will be re-broadcast. To permanently release all resources used by the broadcast variable, call .destroy(). The broadcast variable can’t be used after that. Note that these methods do not block by default. To block until resources are freed, specify blocking=true when calling them.

## Accumulators

![](https://upload.wikimedia.org/wikipedia/en/4/4e/Electro_%28Max_Dillon%29.png)

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. 

They can be used to implement counters (as in MapReduce) or sums. 

Spark natively supports accumulators of numeric types, and programmers can add support for new types.

For accumulator updates performed inside **actions** only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. 

In **transformations**, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

In [6]:
accum = sc.accumulator(0)
accum

Accumulator<id=0, value=0>

In [7]:
data=sc.parallelize([1, 2, 3, 4])
data.foreach(lambda x: accum.add(x))

In [8]:
accum.value

10

Accumulators do not change the lazy evaluation model of Spark. 

If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. 

Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). 

In [9]:
accum = sc.accumulator(0) # Reset
accum

Accumulator<id=1, value=0>

In [10]:
data.collect()

[1, 2, 3, 4]

In [11]:
def g(x):
    accum.add(x)
    return x
data.map(g) 
# Here, accum is still 0 because no actions have caused the `map` to be computed.
accum

Accumulator<id=1, value=0>

In [12]:
data.map(g).foreach(lambda x: accum.add(1))

# Which is the value of accum ?

In [13]:
accum

Accumulator<id=1, value=14>

# Spark Application

[Spark Documentation](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

Spark application consists of a **driver** program that runs the user’s main function and executes various parallel operations on a cluster. 



![](https://miro.medium.com/max/700/1*B9lbB8uU7a_Xi0a1uDImRw.jpeg)

![](https://spark.apache.org/docs/latest/img/cluster-overview.png)

[Anatomy](https://medium.com/@meenakshisundaramsekar/anatomy-of-a-spark-application-in-a-nutshell-2e542d5f334e)
![](images/anatomyofsparkapp.png)

## Driver

The life of Spark programs starts and ends with the Spark Driver. 

The Spark driver is the process which the clients used to submit the spark program. 

The Driver is also responsible for application planning and execution of the spark program and returning the status/results to the client.

[Apache Spark Architeture](https://www.dezyre.com/article/apache-spark-architecture-explained-in-detail/338)
![](images/sparkarchiteture.png)

It is the central point and the entry point of the Spark Shell (Scala, Python, and R). 

The driver program runs the main() function of the application and is the place where the Spark Context is created. 

Spark Driver contains various components responsible for the translation of spark user code into actual spark jobs executed on the cluster.

![](https://tr.rbxcdn.com/b2337037a6a5440f80026530d20f84fd/768/432/Image/Png)

## DAG Scheduler
https://spark.apache.org/docs/1.2.1/api/java/org/apache/spark/scheduler/DAGScheduler.html

[SparkBasic](https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454)

DAGScheduler is the scheduling layer of Apache Spark that implements stage-oriented scheduling. 

It transforms a logical execution plan (i.e. RDD lineage of dependencies built using RDD transformations) to a physical execution plan (using stages).

```scala
val input = sc.textFile("log.txt")
val splitedLines = input.map(line => line.split(" "))
.map(words => (words(0), 1)).
reduceByKey{(a,b) => a + b}
```

![](https://miro.medium.com/max/700/1*1WfneX6c7Lc9fqAaR9MaGA.png)

![](https://miro.medium.com/v2/resize:fit:1400/1*2bdRFvxGs7baeKHDk-Z0sA.png)

## Another Resource

In Spark, the DAG Scheduler is responsible for transforming a sequence of RDD transformations and actions into a directed acyclic graph (DAG) of stages and tasks, which can be executed in parallel across a cluster of machines. 


[SparkByExample](https://sparkbyexamples.com/spark/what-is-dag-in-spark/#:~:text=In%20Spark%2C%20the%20DAG%20Scheduler,across%20a%20cluster%20of%20machines.)

## TaskScheduler
https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/scheduler/TaskScheduler.html
> Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks for a single SparkContext. These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers. They return events to the DAGScheduler.

## what does it do ?
[Source])(https://mallikarjuna_g.gitbooks.io/spark/content/spark-taskscheduler.html)



A TaskScheduler schedules tasks for a single Spark application according to scheduling mode.

![](https://mallikarjuna_g.gitbooks.io/spark/content/images/sparkstandalone-sparkcontext-taskscheduler-schedulerbackend.png)

## SchedulerBackend
https://spark.apache.org/docs/3.1.3/api/java/org/apache/spark/scheduler/SchedulerBackend.html

Spark comes with a pluggable backend mechanism called scheduler backend (aka backend scheduler) to support various cluster managers, e.g. Apache Mesos, Hadoop YARN or Spark’s own Spark Standalone, Spark local and Kubernetes

**Scheduler Backend** manages resources to schedule tasks on
• Running mode
• Local mode
• Local cluster mode
• Cluster mode

![](images/SparkSchedulerBackend1.png)

![](images/SparkSchedulerBackend.png)

## BlockManager 

Spark storage system is managed by BlockManager that runs both in Driver and Executor instances.

Is a key-value store of blocks of data (block storage) identified by a block ID.

Among the types of data stored in blocks we can find:
- RDD 
- shuffle: in this category we can distinguish shuffle data, shuffle index and temporary shuffle files (intermediate results)
- broadcast - broadcasted data is organized in blocks too
- task results
- stream data
- temp data (including swap)

# Repetita Iuvant

https://databricks.com/glossary/what-are-spark-applications
![](https://databricks.com/wp-content/uploads/2018/05/Spark-Applications.png)

The driver process:

- runs your main() function

- sits on a node in the cluster

- is responsible for three things: 
   1. maintaining information about the Spark Application;
   2. responding to a user’s program or input; 
   3. and analyzing, distributing, and scheduling work across the executors (defined momentarily). 

The driver process is absolutely essential 

it’s the heart of a Spark Application and maintains all relevant information during the lifetime of the application.

## Spark Context 

The Spark context is application's Instance created by the Spark driver for each individual Spark programs when it is first submitted by the user.

Allows Spark Driver to access the cluster through a Cluster Resource Manager and it can be used to create RDDs, accumulators and broadcast variables on the cluster. Spark Context also keeps track of live executors by sending heartbeat messages regularly.

The Spark Context is created by the Spark Driver for each Spark application when it is first submitted by the user. It exists throughout the entire life of a spark application.

Usually referred to as variable name sc in programming.

The Spark Context terminates once the spark application completes. Only one Spark Context can be active per JVM. You must stop() the active Spark Context before creating a new one.

![](https://mallikarjuna_g.gitbooks.io/spark/diagrams/sparkcontext-createtaskscheduler.png)

## Example in Yarn

 https://luminousmen.com/post/spark-anatomy-of-spark-application

![](https://luminousmen.com/media/spark-yarn-architecture.jpg)

# Deploy

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster. 

It can use all of Spark’s supported cluster managers through a uniform interface so you don’t have to configure your application specially for each one.

# Package

If your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. To do this, create an assembly jar (or “uber” jar) containing your code and its dependencies. Both sbt and Maven have assembly plugins. When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime. 

Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar.

For Python, you can use the --py-files argument of spark-submit to add .py, .zip or .egg files to be distributed with your application. If you depend on multiple Python files we recommend packaging them into a .zip or .egg.

# Launch

Once a user application is bundled, it can be launched using the bin/spark-submit script. 

This script takes care of setting up the classpath with Spark and its dependencies, and can support different cluster managers and deploy modes that Spark supports:
```bash
./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
```

Some of the commonly used options are:

* --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
* --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
* --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) †
* --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).
* application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
* application-arguments: Arguments passed to the main method of your main class, if any

```bash
# Run application locally on 8 cores
# cd spark home
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[8] examples/jars/spark-examples_2.12-3.251.jar 1000

```

More in https://spark.apache.org/docs/latest/submitting-applications.html

#  Spark Docker

- Code in tap/spark/code is copied into docker 

- Dataset is inside spark and linked into tap root (check previuos example), reference as spark/dataset...

- [Hint] Test on machine and then test on Docker (including dependencies)

- https://github.com/apache/spark/tree/master/examples/src/main/python is a good source

# Run Python Example in Docker
cd repo

```bash
docker run --hostname spark -p 4040:4040 -it --rm -v /home/tap/tap-workspace/tap2024/spark/code/:/opt/tap/ -v /home/tap/tap-workspace/tap2024/spark/dataset:/tmp/dataset apache/spark /opt/spark/bin/spark-submit /opt/tap/simpleapp.py
```

# Biblio

- https://medium.com/@meenakshisundaramsekar/anatomy-of-a-spark-application-in-a-nutshell-2e542d5f334e
- https://medium.com/luckspark/scala-spark-tutorial-1-hello-world-7e66747faec
- https://luminousmen.com/post/spark-anatomy-of-spark-application
- https://medium.com/@goyalsaurabh66/spark-basics-rdds-stages-tasks-and-dag-8da0f52f0454
- http://cds.iisc.ac.in/wp-content/uploads/DS256.2017.L17.Spark_.Execution.pdf
- https://www.waitingforcode.com/apache-spark/apache-spark-blocks-explained/read
- https://hyperj.net/note.arts/asset/pdf/deep-dive-into-the-apache-spark-scheduler.pdf