<a href="https://colab.research.google.com/github/miladshiraniUCB/Spark-5-dsc-sparkcontext-lab/blob/master/index.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Understanding `SparkContext` - Lab

## Introduction

The following series of PySpark lessons and labs in this section will provide you with an introduction to **Apache Spark**, the leading framework for big data processing in jupyter notebooks and PySpark, using a PySpark docker image in a standalone mode. These lessons require you to explore the spark documentation and practice methods and properties which are not directly covered in the labs. In this first lesson, we'll look at creating a spark session, called a Spark Context and explore its properties. 

## Objectives

In this lab you will: 

- Describe Spark's parallelism with master and executor nodes 
- List the major properties and methods of SparkContext 
- Define a SparkContext and why it is important to a spark application 


## Cluster Resource Manager

Spark comes bundled with a **Cluster Resource Manager** which divides and shares the physical resources of a cluster of machines between multiple Spark applications. Spark's **standalone cluster manager** operates in the standalone mode and allows Spark to manage its own cluster on a local machine. We will mainly use the standalone cluster manager for resource allocation in these labs. 

In Spark computational model, communication routinely occurs between a **driver** and **executors**. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the workers for completion. The results from these tasks are delivered back to the driver. 

This image, taken from the [Spark documentation](https://spark.apache.org/docs/latest/spark-standalone.html) demonstrates this process well.

![](https://github.com/miladshiraniUCB/Spark-5-dsc-sparkcontext-lab/blob/master/images/cluster.png?raw=1)


The spark driver declares the transformations and actions on data and submits such requests to the **master**. 

> The machine on which the Spark cluster manager runs is called the **Master Node**. 

For our labs, this distributed arrangement will be simulated on a single machine allowing you to initialize master and worker nodes. 

## `SparkContext()`

In order to use Spark and its API we will need to use a **SparkContext**. SparkContext is how we are able to control what is happening in the Spark program from Python. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes as shown above.

SparkContext uses Py4J to create a bridge between Python and Java, the language spark is built with. Even though all the code we'll be executing is in Python, Java is the code being executed underneath the hood in a JavaSparkConext. You'll see in error messages that they will frequently contain errors related specifically to Java. 

*Py4j provides a bridge between Python and Java. [Click here](https://www.py4j.org/) to see more details on this. Here is a visual representation of how SparkContext functions found in the [Apache documentation](https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals)* 

![](https://github.com/miladshiraniUCB/Spark-5-dsc-sparkcontext-lab/blob/master/images/spark_context.png?raw=1)

Spark applications driver program launches parallel operations on executor Java Virtual Machines (JVMs). This can occur either locally on a single machine using multiple cores to create parallel processing or across a cluster of computers that are controlled by a master computer. When running locally, "PySparkShell" is the driver program. The driver program contains the key instructions for the program and it determines how to best distribute datasets across the cluster and apply operations to those datasets.

The key takeaways for SparkContext are listed below:

- SparkContext is a client of Spark’s execution environment and it acts as the master of the Spark application 
- SparkContext sets up internal services and establishes a connection to a Spark execution environment  
- The driver is the program that creates the SparkContext, connecting to a given Spark Master  

After creation, SparkContext asks the master for some cores to use to do work. The master sets these cores aside and they are used to complete whatever operation they are assigned to do. You can visualize the setup in the figure below:

<img src ="./images/spark_master_workers.png" width="280">

This image depicts the worker nodes at work. Every worker has 4 cores to work with, and the master allocates tasks to run on certain cores within each worker node.

As stated before, a SparkContext object (usually shown as `sc`) is the main entry point for Spark functionality and can be used to create _Resilient Distributed Datasets_ (RDDs) on a cluster as we will see in our next lab.

Lets start a Spark application by importing PySpark, creating a spark context as `sc` and try printing out type of `sc`. For this SparkContext, we are going to assign the `master` parameter to 'local[ * ]' to indicate that we are running this SparkContext to be parallelized on our local machine.

In [None]:
!pip install pyspark 

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 60.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8061aa0f3c917fde48d05c971dd3b48cb68a7295d760574eaa7999d30d94b98f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
# Create a local spark context with pyspark
import pyspark
sc = pyspark.SparkContext('local[*]')

In [None]:
# Display the type of the Spark Context
type(sc)

# pyspark.context.SparkContext

pyspark.context.SparkContext

In [None]:
# Create second spark context
sc1 = pyspark.SparkContext('local[*]')

ValueError: ignored

As you can see, only one SparkContext can be created within a Python kernel at once!

### SparkContext attributes

We can use Python's `dir()` function to get a list of all the attributes (including methods) accessible through the `sc` object.

In [None]:
# Use Python's dir(obj) to get a list of all attributes of SparkContext

# Code here 
dir(sc)


['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'bina

Alternatively, you can use Python's `help()` function to get an easier to read list of all the attributes, including examples, that the `sc` object has.

In [None]:
# Use Python's help ( help(object) ) function to get information on attributes and methods for sc object. 


# Code here 
help(sc)

Help on SparkContext in module pyspark.context object:

class SparkContext(builtins.object)
 |  SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
 |  
 |  Main entry point for Spark functionality. A SparkContext represents the
 |  connection to a Spark cluster, and can be used to create :class:`RDD` and
 |  broadcast variables on that cluster.
 |  
 |  When you create a new SparkContext, at least the master and app name should
 |  be set, either through the named parameters here or through `conf`.
 |  
 |  Parameters
 |  ----------
 |  master : str, optional
 |      Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
 |  appName : str, optional
 |      A name for your job, to display on the cluster web UI.
 |  sparkHome : str, optional
 |      Location where Spark is installed on cluster 

You should also have a look at [Spark's SparkContext Documentation Page](https://spark.apache.org/docs/0.6.0/api/core/spark/SparkContext.html) to explore these in further detail.

Let's try to check a few spark context attributes including `SparkContext.version` and `SparkContext.defaultParalellism` to check the current version of Apache Spark and number of cores being used for parallel processing.

In [None]:
# Check the number of cores being used

# Code here 
print("Number of Cores being used: ", sc.defaultParallelism)
# Check for the current version of Spark

# Code here 
print("Spark Version is:", sc.version)


# Default number of cores being used: 2
# Current version of Spark: 2.3.1

Number of Cores being used:  2
Spark Version is: 3.2.1


Let's also check the name of current application by using `SparkContext.appName` attribute. 

In [None]:
# Check the name of application currently running in spark environment
print("Name of the Current Application: ", sc.appName)

# Code here 


# 'pyspark-shell'

Name of the Current Application:  pyspark-shell


We can access complete configuration settings (including all defaults) for the current spark context using `_conf.getAll()` method. 

In [None]:
# Get all configuration settings

# Code here 
sc._conf.getAll()
# [('spark.driver.port', '36035'),
#  ('spark.rdd.compress', 'True'),
#  ('spark.driver.host', '588b1d2e9e9b'),
#  ('spark.serializer.objectStreamReset', '100'),
#  ('spark.master', 'local[*]'),
#  ('spark.executor.id', 'driver'),
#  ('spark.submit.deployMode', 'client'),
#  ('spark.ui.showConsoleProgress', 'true'),
#  ('spark.app.name', 'pyspark-shell'),
#  ('spark.app.id', 'local-1545010504175')]

[('spark.app.startTime', '1653206322417'),
 ('spark.driver.port', '38889'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1653206324085'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.host', 'cc8d2c0db169')]

A SparkContext can be shut down using `SparkContext.stop()` method. Let's use this method to shut down the current spark context. 

In [None]:
# Shut down SparkContext

# Code here
sc.stop()

Once shut down, you can no longer access spark functionality before starting a new SparkContext. 

## Additional Resources

- [Apache Spark Context](https://data-flair.training/blogs/learn-apache-spark-sparkcontext/)

## Summary

In this short lab, we saw how SparkContext is used as an entry point to Spark applications. We learned how to start a SparkContext, how to list and use some of the attributes and methods in SparkContext and how to shut it down. Students are encouraged to explore other attributes and methods offered by the `sc` object. Some of these, namely creating and transforming datasets as RDDs will be explored in later labs. 