#### Spark
Apache Spark is a lightning fast real-time processing framework.
It does in-memory computations to analyze data in real-time.
Apache Spark has its own cluster manager, where it can host its application.
It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well.

#### PySpark
Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this.

#### PySpark - SparkContext
SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes.

SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as sc, so creating a new SparkContext won't work.

#### RDD (Resilient Distributed Dataset)
RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically.

Let us understand these two ways in detail.

Transformation − These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations.

Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver.

#### Broadcast & Accumulator
For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

Two types of share variables:
Broadcast and Accumulator


Broascast -
Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks.

Accumulator - 
A write-only shared variable used to accumulate values (e.g., counters or sums) from multiple tasks.
Best for debugging or monitoring, not returning results to your program logic.

In [1]:
from pyspark import SparkContext

sc = SparkContext("local", "Broadcast Example")

# A large read-only lookup dictionary
lookup_dict = {"a": 1, "b": 2, "c": 3}

# Broadcast it
broadcast_var = sc.broadcast(lookup_dict)

rdd = sc.parallelize(["a", "b", "c", "a", "b", "d"])

# Use the broadcast variable inside map
result = rdd.map(lambda x: broadcast_var.value.get(x, 0))

print(result.collect())  # Output: [1, 2, 3, 1, 2, 0]


25/06/04 15:54:11 WARN Utils: Your hostname, my-HP-EliteDesk-800-G1-DM resolves to a loopback address: 127.0.1.1; using 192.168.29.156 instead (on interface wlp6s0)
25/06/04 15:54:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/04 15:54:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 1) / 1]

[1, 2, 3, 1, 2, 0]


                                                                                

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "Accumulator Example")

accum = sc.accumulator(0)

rdd = sc.parallelize([1, 2, 3, 4, 5])

def count_even(x):
    if x % 2 == 0:
        accum.add(1)
    return x

# Perform an action
rdd.map(count_even).collect()

print("Number of even numbers:", accum.value)  # Output: 2


#### SparkConf
To run a spark application we need to set few configurations and paramaters.

Initially, we will create a SparkConf object with SparkConf()


set(key, value) − To set a configuration property.

setMaster(value) − To set the master URL.

setAppName(value) − To set an application name.

get(key, defaultValue=None) − To get a configuration value of a key.

setSparkHome(value) − To set Spark installation path on worker nodes.

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)

#### Spark Files
SparkContext.addFile(path) - Distributes a file (local or remote) to all worker nodes so each task can access it locally.

SparkFiles.get("filename") - Returns the local path to the file that was distributed via addFile

SparkFiles.getRootDirectory() - Returns the directory where Spark stores files added via addFile.

#### Storage Level
StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions.

class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)

#### MLlib
Apache Spark offers a Machine Learning API called MLlib. PySpark has this machine learning API in Python as well. It supports different kind of algorithms, which are mentioned below −

mllib.classification − The spark.mllib package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are Random Forest, Naive Bayes, Decision Tree, etc.

mllib.clustering − Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity.

mllib.fpm − Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years.

mllib.linalg − MLlib utilities for linear algebra.

mllib.recommendation − Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix.

spark.mllib − It currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors.

mllib.regression − Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case.

#### Serializer
PySpark uses serializers to convert Python objects into a byte stream for efficient storage or transmission across a network. Deserialization does the reverse, transforming the byte stream back into a usable object. 

1. Default Serializer:
Java Serializer:
This is the default serializer in PySpark. It handles Spark's internal objects, such as RDDs and DataFrame metadata. It is robust but can be slower for Python-heavy workloads.

Pickle Serializer:
Uses Python's pickle library. It can serialize almost any Python object, making it versatile. However, it may not be as fast as more specialized serializers. 


2. Other Serializers:
Marshal Serializer: Faster than PickleSerializer but supports fewer data types.

Kryo Serializer: A more efficient serializer that can provide better performance, especially for network-intensive applications. However, it requires custom registration.


3. Batched Serializers:
AutoBatchedSerializer: Dynamically chooses the batch size from the input object.

FlattenedValuesSerializer: Used for streams of list of pairs, creating lists of the same size for each key. It is involved in shuffle operations.

##### Working of Serializer

Step 1: Data is prepared on the driver node and serialized into a binary format by the configured serializer.

Step 2: Serialized data is transmitted to worker nodes in the cluster.

Step 3: Worker nodes deserialize the data for processing.

Step 4: The results from worker nodes are serialized again before being sent back to the driver node.

In [1]:
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num 
print("Accumulated value is ->",final)

25/06/04 17:12:10 WARN Utils: Your hostname, my-HP-EliteDesk-800-G1-DM resolves to a loopback address: 127.0.1.1; using 192.168.29.156 instead (on interface wlp6s0)
25/06/04 17:12:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/04 17:12:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 1) / 1]

Accumulated value is -> 150


                                                                                

In [17]:
num = sc.accumulator(10) 
# print(type(num))
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
print(type(rdd.foreach(f))) 
# print("Accumulated value is ->",num)


print(type(num))

print(type(final))

<class 'NoneType'>
<class 'pyspark.accumulators.Accumulator'>
<class 'NoneType'>
