In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('app').getOrCreate()

emp_data = [(10,"Nani","2000","100","M",2500000),
            (20,"Nikky","2022","200","M",50000000),
            (30,"Raghav","2010",None,"F",20000),
            (40,"Raja","2012","100",None,450000),
            (50,"Rama","2014","400","F",442000),
            (60,"Rasul","2015","500","M",562000)]

emp_schema = ["emp_id","name","doj","dept_id","gender","salary"]

emp_df = spark.createDataFrame(data=emp_data,schema=emp_schema)
emp_df.show()

dept_data =[("Nani",100),
            ("Janu",200),
            ("Nikky",None),
            ("Lukky",300)]

dept_schema = ["dept_name","dept_id"]
dept_df = spark.createDataFrame(data=dept_data,schema=dept_schema)
dept_df.show()

+------+------+----+-------+------+--------+
|emp_id|  name| doj|dept_id|gender|  salary|
+------+------+----+-------+------+--------+
|    10|  Nani|2000|    100|     M| 2500000|
|    20| Nikky|2022|    200|     M|50000000|
|    30|Raghav|2010|   null|     F|   20000|
|    40|  Raja|2012|    100|  null|  450000|
|    50|  Rama|2014|    400|     F|  442000|
|    60| Rasul|2015|    500|     M|  562000|
+------+------+----+-------+------+--------+

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|     Nani|    100|
|     Janu|    200|
|    Nikky|   null|
|    Lukky|    300|
+---------+-------+



### Inner Join

In [21]:
from pyspark.sql.functions import col
dfjoin = emp_df.join(dept_df,emp_df.dept_id==dept_df.dept_id,"inner")\
               .withColumn("bonus",col("salary")*0.1) \
               .groupBy("dept_name").sum("salary")
dfjoin.show()

+---------+-----------+
|dept_name|sum(salary)|
+---------+-----------+
|     Janu|   50000000|
|     Nani|    2950000|
+---------+-----------+



In [22]:
dfjoin.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dept_name#532], functions=[sum(salary#500L)])
   +- Exchange hashpartitioning(dept_name#532, 200), ENSURE_REQUIREMENTS, [plan_id=955]
      +- HashAggregate(keys=[dept_name#532], functions=[partial_sum(salary#500L)])
         +- Project [salary#500L, dept_name#532]
            +- SortMergeJoin [cast(dept_id#498 as bigint)], [dept_id#533L], Inner
               :- Sort [cast(dept_id#498 as bigint) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cast(dept_id#498 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=947]
               :     +- Project [dept_id#498, salary#500L]
               :        +- Filter isnotnull(dept_id#498)
               :           +- Scan ExistingRDD[emp_id#495L,name#496,doj#497,dept_id#498,gender#499,salary#500L]
               +- Sort [dept_id#533L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(dept_id#533L, 200), ENSURE_REQUIREMENT

In [8]:
dfjoin.explain(mode="simple")

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dept_name#202], functions=[sum(salary#170L)])
   +- Exchange hashpartitioning(dept_name#202, 200), ENSURE_REQUIREMENTS, [plan_id=341]
      +- HashAggregate(keys=[dept_name#202], functions=[partial_sum(salary#170L)])
         +- Project [salary#170L, dept_name#202]
            +- SortMergeJoin [cast(dept_id#168 as bigint)], [dept_id#203L], Inner
               :- Sort [cast(dept_id#168 as bigint) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cast(dept_id#168 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=333]
               :     +- Project [dept_id#168, salary#170L]
               :        +- Filter isnotnull(dept_id#168)
               :           +- Scan ExistingRDD[emp_id#165L,name#166,doj#167,dept_id#168,gender#169,salary#170L]
               +- Sort [dept_id#203L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(dept_id#203L, 200), ENSURE_REQUIREMENT

In [9]:
dfjoin.explain(mode="extended")

== Parsed Logical Plan ==
'Aggregate ['dept_name], ['dept_name, sum(salary#170L) AS sum(salary)#251L]
+- Project [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L, dept_name#202, dept_id#203L, (cast(salary#170L as double) * 0.1) AS bonus#231]
   +- Join Inner, (cast(dept_id#168 as bigint) = dept_id#203L)
      :- LogicalRDD [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L], false
      +- LogicalRDD [dept_name#202, dept_id#203L], false

== Analyzed Logical Plan ==
dept_name: string, sum(salary): bigint
Aggregate [dept_name#202], [dept_name#202, sum(salary#170L) AS sum(salary)#251L]
+- Project [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L, dept_name#202, dept_id#203L, (cast(salary#170L as double) * 0.1) AS bonus#231]
   +- Join Inner, (cast(dept_id#168 as bigint) = dept_id#203L)
      :- LogicalRDD [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L], false
      +- LogicalRDD [dept_name#202, dept_id#203L

In [10]:
dfjoin.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (15)
+- HashAggregate (14)
   +- Exchange (13)
      +- HashAggregate (12)
         +- Project (11)
            +- SortMergeJoin Inner (10)
               :- Sort (5)
               :  +- Exchange (4)
               :     +- Project (3)
               :        +- Filter (2)
               :           +- Scan ExistingRDD (1)
               +- Sort (9)
                  +- Exchange (8)
                     +- Filter (7)
                        +- Scan ExistingRDD (6)


(1) Scan ExistingRDD
Output [6]: [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L]
Arguments: [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L], MapPartitionsRDD[44] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter
Input [6]: [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L]
Condition : isnotnull(dept_id#168)

(3) Project
Output [2]: [dept_id#168, salary#170L]
Inp

In [11]:
dfjoin.explain(mode="cost")

== Optimized Logical Plan ==
Aggregate [dept_name#202], [dept_name#202, sum(salary#170L) AS sum(salary)#251L], Statistics(sizeInBytes=1.66E+37 B)
+- Project [salary#170L, dept_name#202], Statistics(sizeInBytes=1.66E+37 B)
   +- Join Inner, (cast(dept_id#168 as bigint) = dept_id#203L), Statistics(sizeInBytes=2.94E+37 B)
      :- Project [dept_id#168, salary#170L], Statistics(sizeInBytes=2.8 EiB)
      :  +- Filter isnotnull(dept_id#168), Statistics(sizeInBytes=8.0 EiB)
      :     +- LogicalRDD [emp_id#165L, name#166, doj#167, dept_id#168, gender#169, salary#170L], false, Statistics(sizeInBytes=8.0 EiB)
      +- Filter isnotnull(dept_id#203L), Statistics(sizeInBytes=8.0 EiB)
         +- LogicalRDD [dept_name#202, dept_id#203L], false, Statistics(sizeInBytes=8.0 EiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[dept_name#202], functions=[sum(salary#170L)], output=[dept_name#202, sum(salary)#251L])
   +- Exchange hashpartitioning(dept_name#202, 200), ENS