### Broadcast Variables

Broadcast Variables are a mechanism in Apache Spark to allow efficient sharing of read-only variables between different nodes in a cluster. The main idea behind Broadcast Variables is to cache a read-only variable on each node instead of shipping a copy of it with every task. This can help reduce the amount of network I/O and increase the performance of a Spark application.

A Broadcast Variable is created using the SparkContext.broadcast method and can be used in Spark operations like map and reduce as a regular variable. The value of a Broadcast Variable is stored on each node and is accessible by tasks executing on that node.

Broadcast variable can be accessed by broadcast.value. The broadcast variable is read-only and can only be used in Spark operations. It cannot be updated.

broadcast_v.unpersist() is used to remove the broadcast variable from the worker nodes.


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Broadcast Variable").getOrCreate()
spark

In [9]:
data = [("James", "", "Smith", "36636", "M", 3000),
        ("Michael", "Rose", "", "40288", "M", 4000),
        ("Robert", "", "Williams", "42114", "M", 4000),
]

rdd_data = spark.sparkContext.parallelize(data)

In [10]:
rdd_data.collect()

[('James', '', 'Smith', '36636', 'M', 3000),
 ('Michael', 'Rose', '', '40288', 'M', 4000),
 ('Robert', '', 'Williams', '42114', 'M', 4000)]

In [11]:
#broadcast variable
brodcast_var = spark.sparkContext.broadcast(["M", "F"])

In [13]:
#accessing broadcast variable
brodcast_var.value

['M', 'F']

In [14]:
brodcast_var.destroy()

In [15]:
help(brodcast_var)

Help on Broadcast in module pyspark.broadcast object:

class Broadcast(typing.Generic)
 |  Broadcast(sc: Optional[ForwardRef('SparkContext')] = None, value: Optional[~T] = None, pickle_registry: Optional[ForwardRef('BroadcastPickleRegistry')] = None, path: Optional[str] = None, sock_file: Optional[BinaryIO] = None)
 |  
 |  A broadcast variable created with :meth:`SparkContext.broadcast`.
 |  Access its value through :attr:`value`.
 |  
 |  Examples
 |  --------
 |  >>> from pyspark.context import SparkContext
 |  >>> sc = SparkContext('local', 'test')
 |  >>> b = sc.broadcast([1, 2, 3, 4, 5])
 |  >>> b.value
 |  [1, 2, 3, 4, 5]
 |  >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
 |  [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
 |  >>> b.unpersist()
 |  
 |  >>> large_broadcast = sc.broadcast(range(10000))
 |  
 |  Method resolution order:
 |      Broadcast
 |      typing.Generic
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sc: Optional[ForwardRef('S

### Accumulators

Accumulators are a special type of variables in Spark that can be used to perform aggregations on the worker nodes and return the result to the driver program. They are used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

Accumulators are variables in Apache Spark that can be used to implement counters (simple or summarized data) or sums in a parallel and fault-tolerant manner. They are write-only variables from the viewpoint of the worker nodes and are initialized on the driver node. The worker nodes can only add to the value of the accumulator, but they cannot read its value. Only the driver node can read the final value of the accumulator after an action has been executed on the RDD.

Accumulators are commonly used to implement global counters in Spark, where multiple nodes need to update the same variable. The main advantage of using accumulators is that they are resilient to failures, as they only keep track of updates on the successful nodes, while lost updates on failed nodes are ignored.

In [23]:
accum_v = spark.sparkContext.accumulator(0)
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: accum_v.add(x))

[1, 2, 3, 4]

In [31]:
spark.stop()

In [1]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [6]:
help(accum_v)

Help on Accumulator in module pyspark.accumulators object:

class Accumulator(typing.Generic)
 |  Accumulator(aid: int, value: ~T, accum_param: 'AccumulatorParam[T]')
 |  
 |  A shared variable that can be accumulated, i.e., has a commutative and associative "add"
 |  operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=`
 |  operator, but only the driver program is allowed to access its value, using `value`.
 |  Updates from the workers get propagated automatically to the driver program.
 |  
 |  While :class:`SparkContext` supports accumulators for primitive data types like :class:`int` and
 |  :class:`float`, users can also define accumulators for custom types by providing a custom
 |  :py:class:`AccumulatorParam` object. Refer to its doctest for an example.
 |  
 |  Examples
 |  --------
 |  >>> a = sc.accumulator(1)
 |  >>> a.value
 |  1
 |  >>> a.value = 2
 |  >>> a.value
 |  2
 |  >>> a += 5
 |  >>> a.value
 |  7
 |  >>> sc.accumulator(1.0).val

In [7]:
accum_v.value

5

In [8]:
def g(x):
    accum_v.add(x)

In [9]:
rdd = sc.parallelize([1, 2, 3])

In [11]:
# rdd.foreach(g)

### RDD - Transformations - Joins
* join()
* leftOuterJoin()
* rightOuterJoin()
* fullOuterJoin()
* cartesian()

In [1]:
import findspark
findspark.init()

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [2]:
rdd1 = sc.parallelize([('C',1),('B',2),('A',3),('A',4),('B',5),('C',6)])
rdd2 = sc.parallelize([('A',7),('B',8),('C',9),('D',10)])


In [3]:
#join act as inner join
inner_join = rdd1.join(rdd2)

In [4]:
inner_join.collect()

[('B', (2, 8)),
 ('B', (5, 8)),
 ('C', (1, 9)),
 ('C', (6, 9)),
 ('A', (3, 7)),
 ('A', (4, 7))]

In [5]:
#right outer join

rdd1 = sc.parallelize([('C',1),('B',2),('A',3),('A',4),('B',5),('C',6)])
rdd2 = sc.parallelize([('A',7),('B',8),('C',9),('D',10)])

rdd1.rightOuterJoin(rdd2).collect()

[('B', (2, 8)),
 ('B', (5, 8)),
 ('D', (None, 10)),
 ('C', (1, 9)),
 ('C', (6, 9)),
 ('A', (3, 7)),
 ('A', (4, 7))]

In [6]:
#lerft outer join
rdd1.leftOuterJoin(rdd2).collect()

[('B', (2, 8)),
 ('B', (5, 8)),
 ('C', (1, 9)),
 ('C', (6, 9)),
 ('A', (3, 7)),
 ('A', (4, 7))]

In [7]:
#cartesian product(cross join)
rdd1.cartesian(rdd2).collect()

[(('C', 1), ('A', 7)),
 (('C', 1), ('B', 8)),
 (('C', 1), ('C', 9)),
 (('C', 1), ('D', 10)),
 (('B', 2), ('A', 7)),
 (('B', 2), ('B', 8)),
 (('B', 2), ('C', 9)),
 (('B', 2), ('D', 10)),
 (('A', 3), ('A', 7)),
 (('A', 3), ('B', 8)),
 (('A', 3), ('C', 9)),
 (('A', 3), ('D', 10)),
 (('A', 4), ('A', 7)),
 (('A', 4), ('B', 8)),
 (('A', 4), ('C', 9)),
 (('A', 4), ('D', 10)),
 (('B', 5), ('A', 7)),
 (('B', 5), ('B', 8)),
 (('B', 5), ('C', 9)),
 (('B', 5), ('D', 10)),
 (('C', 6), ('A', 7)),
 (('C', 6), ('B', 8)),
 (('C', 6), ('C', 9)),
 (('C', 6), ('D', 10))]

In [9]:
# cogroup
# rdd1.cogroup(rdd2).collect()