
####Problem  : Function like map() , filter() can use variables defined outside them in the driver program but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.

The Solution : Spark provides two type of shared variables.

1.    Accumulators
2.    Broadcast variables

#####What is Broadcast Variable
- they are immutable shared variable which are cached on each worker nodes on a Spark cluster.
- Another Definition : Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.


- Imagine you want to make some information, that information can be variable , rdd, collection of object, large datasets, databse connetion or anything, you want to make this information avaialable to all of your worker so that your executors can use that information & process that data as part of executing task, that’s process will complete by broadcast variable.

#####Use Case
- Let’s Imagine,. I have a large table of zip codes/pin code and want to perform the transformation on that data for analysis.

Here, it is neither feasible to send the large lookup table every time to the executors, nor can we query the database every time. so, the solution should be to convert this lookup table to a broadcast variable and Spark will cache it in every executor for future reference.

This will solve two main problems namely network overhead and time consumption

- Broadcast Manager (BroadcastManager) is a Spark service to manage broadcast variables in Spark. It creates for a Spark application when SparkContext is initialized and is a simple wrapper around BroadcastFactory.

- ContextCleaner is a Spark service that is responsible for application-wide cleanup of shuffles, RDDs, broadcasts, any many more.

In [0]:
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).collect()
print(result)

[('James', 'Smith', 'USA', 'California'), ('Michael', 'Rose', 'USA', 'New York'), ('Robert', 'Williams', 'USA', 'California'), ('Maria', 'Jones', 'USA', 'Florida')]


- Important  : When your work is complete with a broadcast variable, you should destroy it to release memory.

broadcast.destroy()

In [0]:
##BRoadcast example with dataframes
states = {"NY":"New York", "CA":"California", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]

columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)

root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+

+---------+--------+-------+----------+
|firstname|lastname|country|state     |
+---------+--------+-------+----------+
|James    |Smith   |USA    |California|
|Michael  |Rose    |USA    |New York  |
|Robert   |Williams|USA    |California|
|Maria    |Jones   |USA    |Florida   |
+---------+--------+-------+----------+



#####Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program.
- Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program. But, only the driver program is allowed to access the Accumulator variable using the value property.



In [0]:
accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([1,2,3,4,5])
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

accuSum=spark.sparkContext.accumulator(0)
def countFun(x):
    global accuSum
    accuSum+=x
rdd.foreach(countFun)
print(accuSum.value)

accumCount=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:accumCount.add(1))
print(accumCount.value)

15
15
5



####difference between client and cluster mode

- Whenever we submit a Spark application to the cluster, the Driver or the Spark App Master should get started. And the Driver will be starting N number of workers. Spark driver will be managing spark context object to share the data and coordinates with the workers and cluster manager across the cluster. Cluster Manager can be Spark Standalone or Hadoop YARN or Mesos. Workers will be assigned a task and it will consolidate and collect the result back to the driver. A spark application gets executed within the cluster in two different modes – one is cluster mode and the second is client mode.

#####Cluster Mode
- In the cluster mode, the Spark driver or spark application master will get started in any of the worker machines. So, the client who is submitting the application can submit the application and the client can go away after initiating the application or can continue with some other work. So, it works with the concept of Fire and Forgets.

##### Client Mode
- In the client mode, the client who is submitting the spark application will start the driver and it will maintain the spark context. So, till the particular job execution gets over, the management of the task will be done by the driver. Also, the client should be in touch with the cluster. The client will have to be online until that particular job gets completed.

- The default deployment mode is client mode.
- In client mode, if a machine or a user session running spark-submit terminates, your application also terminates with status fail.
- Using Ctrl-c after submitting the spark-submit command also terminates your application.
- Client mode is not used for Production jobs. This is used for testing purposes.
- Driver logs are accessible from the local machine itself.

In [0]:
#spark-submit --deploy-mode client --driver-memory xxxx  ......


#####Wide and Narrow Transformation
- Narrow Transformation : Narrow transformations are the result of map() and filter() functions and these compute data that live on a single partition meaning there will not be any data movement between partitions to execute narrow transformations.
- Functions such as map(), mapPartition(), flatMap(), filter(), union() are some examples of narrow transformation

#####Wider Transformation
- Wider transformations are the result of groupByKey() and reduceByKey() functions and these compute data that live on many partitions meaning there will be data movements between partitions to execute wider transformations. Since these shuffles the data, they also called shuffle transformations.



In [0]:
#flatmap example

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

rdd2=rdd.flatMap(lambda x: x.split(" "))
for element in rdd2.collect():
    print(element)

Project Gutenberg’s
Alice’s Adventures in Wonderland
Project Gutenberg’s
Adventures in Wonderland
Project Gutenberg’s
Project
Gutenberg’s
Alice’s
Adventures
in
Wonderland
Project
Gutenberg’s
Adventures
in
Wonderland
Project
Gutenberg’s


In [0]:
#reducebykey example
data = [('Project', 1),
('Gutenberg’s', 1),
('Alice’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1)]

rdd=spark.sparkContext.parallelize(data)

rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
    print(element)

('Gutenberg’s', 3)
('Adventures', 2)
('Wonderland', 2)
('Alice’s', 1)
('in', 2)
('Project', 3)


#####What is a UDF in Spark, and what are the challenges associated with using it?
-  PySpark UDF’s are similar to UDF on traditional databases. In PySpark, you create a function in a Python syntax and wrap it with PySpark SQL udf() or register it as udf and use it on DataFrame and SQL respectively.

#####Performance Association with UDF
- Using UDFs can be an expensive process, as they may require data serialization and deserialization. Therefore, it’s important to use UDFs judiciously and only when built-in functions cannot meet your requirements.


- Serialization and Deserialization Overhead:
 -  When using UDFs, the data needs to be serialized and deserialized between the JVM and the user-defined function this leads to significant performance overhead.
- Garbage Collection:
  - While using UDFs they can create temporary objects that can accumulate in the JVM heap, leading to garbage collection overhead.
- Resource Utilization:
  - UDFs can easily chew up loads of your resources, such as CPU and memory. So it’s necessary you carefully tune the Spark configuration parameters, such as spark.driver.memory, spark.executor.memory, spark.executor.cores, and spark.executor.instances.
- Data Skew:
  - UDFs can cause data skew, where some partitions have significantly more data than others. This can result in performance degradation and resource contention.

  - Please practice more here : https://docs.databricks.com/en/udf/index.html

In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)

def upperCase(str):
    return str.upper()

upperCaseUDF = udf(lambda z:upperCase(z),StringType())    

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
.show(truncate=False)

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)
     
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE " + \
          "where Name is not null and convertUDF(Name) like '%John%'") \
     .show(truncate=False)  
     
""" null check """

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders"),
    ('4',None)]

df2 = spark.createDataFrame(data=data,schema=columns)
df2.show(truncate=False)
df2.createOrReplaceTempView("NAME_TABLE2")
    
spark.udf.register("_nullsafeUDF", lambda str: convertCase(str) if not str is None else "" , StringType())

spark.sql("select _nullsafeUDF(Name) from NAME_TABLE2") \
     .show(truncate=False)

spark.sql("select Seqno, _nullsafeUDF(Name) as Name from NAME_TABLE2 " + \
          " where Name is not null and _nullsafeUDF(Name) like '%John%'") \
     .show(truncate=False) 

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

+-----+-----------+
|Seqno|Name       |
+-----+-----------+
|1    |John Jones |
+-----+-----------+

+-----+------------+
|Seqno|Name        |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
|4    |null        |
+-----+------------+

+------------------+
|_nul

#####When should partitioning, bucketing, or both be used in Spark? 

- Partitioning in Spark

  - Partitioning is a way to split data into separate folders on disk based on one or multiple columns. This enables efficient parallelism and partition pruning in Spark. Partition pruning is a technique used to optimize queries by skipping reading parts of the data that are not required.
  - To partition a dataset, you need to provide the method with one or multiple columns to partition by. The dataset is then written to disk split by the partitioning column, with each partition saved into a separate folder on disk

In Spark, partitioning is implemented by the .partitionBy() method of the DataFrameWriter class.

- Bucketing in Spark

  - Bucketing is a way to assign rows of a dataset to specific buckets and collocate them on disk. This enables efficient wide transformations in Spark, as the data is already collocated in the executors correctly. Wide transformations are operations that require shuffling data across partitions, which can be a costly operation.

   - In Spark, bucketing is implemented by the .bucketBy() method of the DataFrameWriter class. To bucket a dataset, you need to provide the method with the number of buckets you want to create and the column to bucket by. The bucket number for a given row is assigned by calculating a hash on the bucket column and performing modulo by the number of desired buckets operation on the resulting hash.

In [0]:
#df = spark.read.format("csv").option("header", "true").load("path/to/dataset")

#df.write.partitionBy("date").format("parquet").save("path/to/partitioned/dataset")

# Bucket the dataset by the "id" column into 10 buckets
#df.write.bucketBy(10, "id").sortBy("id").format("parquet").save("path/to/bucketed/dataset")
# Bucketing is used when saving as a table 