### Working of Master and Worker Nodes

Master Node --> 
    Driver Program is installed with SparkContext
    It creates Plan of Execution and share it with Worker Nodes
    Distributes the data with Worker Nodes
    Get the result from Worker Nodes and Share it with end user
    YARN is available --> 
        It's a Resouce/Cluster Manager, YARN will monitor the CPU and Memory Usage of Worker Nodes 
        And share the same with SparkContext available in Master Node

Worker Node -->
    Execute the plan shared by Master Node
    Shuffles the data with other Worker Nodes if required
    Share the output with Master Node

### Joins in PySpark

In [2]:
from pyspark.sql import SparkSession

In [3]:
# Create a Spark session
spark = SparkSession.builder.appName("JoinExample").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/08/18 10:18:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/18 10:18:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
# Sample data for orders and customers
orders_data = [(1, "A", 100), (2, "B", 200), (3, "C", 150)]
customers_data = [("A", 25), ("B", 30), ("D", 28)]

# Create DataFrames
orders = spark.createDataFrame(orders_data, ["order_id", "customer", "total"])
customers = spark.createDataFrame(customers_data, ["customer", "age"])

23/08/18 10:18:24 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
orders.show(truncate=False)

                                                                                

+--------+--------+-----+
|order_id|customer|total|
+--------+--------+-----+
|1       |A       |100  |
|2       |B       |200  |
|3       |C       |150  |
+--------+--------+-----+



In [6]:
customers.show(truncate=False)

+--------+---+
|customer|age|
+--------+---+
|A       |25 |
|B       |30 |
|D       |28 |
+--------+---+



In [9]:
orders.join(customers, on=["customer"], how="inner").show(truncate=False)

+--------+--------+-----+---+
|customer|order_id|total|age|
+--------+--------+-----+---+
|A       |1       |100  |25 |
|B       |2       |200  |30 |
+--------+--------+-----+---+



In [11]:
orders.join(customers, on=["customer"], how="left").show(truncate=False)

+--------+--------+-----+----+
|customer|order_id|total|age |
+--------+--------+-----+----+
|A       |1       |100  |25  |
|B       |2       |200  |30  |
|C       |3       |150  |null|
+--------+--------+-----+----+



In [12]:
orders.join(customers, on=["customer"], how="right").show(truncate=False)

+--------+--------+-----+---+
|customer|order_id|total|age|
+--------+--------+-----+---+
|A       |1       |100  |25 |
|B       |2       |200  |30 |
|D       |null    |null |28 |
+--------+--------+-----+---+



In [13]:
orders.join(customers, on=["customer"], how="left").filter("age is null").show(truncate=False)

+--------+--------+-----+----+
|customer|order_id|total|age |
+--------+--------+-----+----+
|C       |3       |150  |null|
+--------+--------+-----+----+



In [28]:
orders.join(customers, on=["customer"], how="left_anti").show(truncate=False)

+--------+--------+-----+
|customer|order_id|total|
+--------+--------+-----+
|C       |3       |150  |
+--------+--------+-----+



In [44]:
customers.join(orders, on=["customer"], how="left_anti").show(truncate=False)

+--------+---+
|customer|age|
+--------+---+
|D       |28 |
+--------+---+



In [36]:
columns = ["customer", "order_id", "total", "age"]

In [40]:
df1 = orders.join(customers, on=["customer"], how="left").filter("age is null").select(columns)

In [41]:
df2 = customers.join(orders, on=["customer"], how="left").filter("order_id is null").select(columns)

In [42]:
df1.union(df2).show(truncate=False)

+--------+--------+-----+----+
|customer|order_id|total|age |
+--------+--------+-----+----+
|C       |3       |150  |null|
|D       |null    |null |28  |
+--------+--------+-----+----+



                                                                                

### Optimization 

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable