In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder.master("local").appName("SetOperations").getOrCreate()
sc = spark.sparkContext

In [5]:
# sql adaptive query execution adaptive.coalescePartitions.enabled will makr paritions dynamic
sc.setLogLevel("Error")
spark.conf.set("spark.sql.shuffle.partitions",3)
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.adaptive.enabled","false")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","false")

"""
Set vs Join operations
records present in both files and files present in one file and not the other

Set - Set operators such as intersect, subtract, union etc
Join - left semi, left anti etc
"""

#WithoutInferSchema
#headerTrue will read first row and assign column names but type is String for all
#spark job created to read first column
filepath = "file:///C:/Users/venka/PycharmProjects/pythonProject/dataset/"
masterdf = spark.read.option("header",True) \
                .option("delimiter","|") \
                .csv(filepath + "IntSetJoin1.csv",inferSchema=True)
masterdf.show()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1001|       Akash|123456|
|   1002|         Anu|903675|
|   1003|     Prakash| 23456|
|   1004|        Mani|337456|
|   1005|       Manju|820456|
|   1006|      Magesh|987456|
|   1007|        Balu|654321|
|   1008|      Barath|765456|
|   1009|     Patrick| 98765|
+-------+------------+------+



In [7]:
dailydf = spark.read.option("header",True) \
                .option("delimiter","|") \
                .csv(filepath + "IntSetJoin2.csv",inferSchema=True)
dailydf.show()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1001|       Akash|123456|
|   1002|         Anu|903675|
|   1003|     Prakash| 23456|
|   1004|        Mani|337456|
|   1008|      Barath|765456|
|   1009|     Patrick| 98765|
+-------+------------+------+



In [17]:
#Set operators - union, unionAll, intersect, intersectAll, subtract, exceptAll
#Join operators - inner, outer, leftOuter, rightOuter, leftSemi, leftAnti, cartesian (we dont have except, only exceptAll)

#union, unionAll is same , duplicates are accounted in unionAll
#intersect will indirectly call leftSemi through broadcast join
masterdf.intersect(dailydf).show()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1001|       Akash|123456|
|   1004|        Mani|337456|
|   1009|     Patrick| 98765|
|   1002|         Anu|903675|
|   1003|     Prakash| 23456|
|   1008|      Barath|765456|
+-------+------------+------+



In [19]:
masterdf.intersectAll(dailydf).show()
masterdf.intersectAll(dailydf).explain()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1001|       Akash|123456|
|   1004|        Mani|337456|
|   1009|     Patrick| 98765|
|   1002|         Anu|903675|
|   1003|     Prakash| 23456|
|   1008|      Barath|765456|
+-------+------------+------+

== Physical Plan ==
*(5) Project [Roll_no#40, Student_name#41, Mobile#42]
+- Generate replicaterows(min_count#381L, Roll_no#40, Student_name#41, Mobile#42), [Roll_no#40, Student_name#41, Mobile#42], false, [Roll_no#40, Student_name#41, Mobile#42]
   +- *(4) Project [Roll_no#40, Student_name#41, Mobile#42, if ((vcol1_count#378L > vcol2_count#380L)) vcol2_count#380L else vcol1_count#378L AS min_count#381L]
      +- *(4) Filter ((vcol1_count#378L >= 1) AND (vcol2_count#380L >= 1))
         +- *(4) HashAggregate(keys=[Roll_no#40, Student_name#41, Mobile#42], functions=[count(vcol1#373), count(vcol2#376)])
            +- Exchange hashpartitioning(Roll_no#40, Student_name#41, Mobile#42, 3), ENSUR

In [20]:
#leftSemi - whatever is common in leftside of table is returned
masterdf.join(dailydf, on=["Roll_no","Student_name","Mobile"],how="leftSemi").show()

masterdf.join(dailydf, on=["Roll_no","Student_name","Mobile"],how="leftSemi").explain()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1001|       Akash|123456|
|   1002|         Anu|903675|
|   1003|     Prakash| 23456|
|   1004|        Mani|337456|
|   1008|      Barath|765456|
|   1009|     Patrick| 98765|
+-------+------------+------+

== Physical Plan ==
*(2) BroadcastHashJoin [Roll_no#40, Student_name#41, Mobile#42], [Roll_no#120, Student_name#121, Mobile#122], LeftSemi, BuildRight, false
:- *(2) Filter ((isnotnull(Roll_no#40) AND isnotnull(Student_name#41)) AND isnotnull(Mobile#42))
:  +- FileScan csv [Roll_no#40,Student_name#41,Mobile#42] Batched: false, DataFilters: [isnotnull(Roll_no#40), isnotnull(Student_name#41), isnotnull(Mobile#42)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/venka/PycharmProjects/pythonProject/dataset/IntSetJoin1..., PartitionFilters: [], PushedFilters: [IsNotNull(Roll_no), IsNotNull(Student_name), IsNotNull(Mobile)], ReadSchema: struct<Roll_no:int,Student_name:string,Mob

In [21]:
#get records from one file

#subtract and itersect to be used, have same number of columns otherwise exception is shown
masterdf.exceptAll(dailydf).show()
masterdf.exceptAll(dailydf).explain()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1005|       Manju|820456|
|   1006|      Magesh|987456|
|   1007|        Balu|654321|
+-------+------------+------+

== Physical Plan ==
*(5) Project [Roll_no#40, Student_name#41, Mobile#42]
+- Generate replicaterows(sum#448L, Roll_no#40, Student_name#41, Mobile#42), [Roll_no#40, Student_name#41, Mobile#42], false, [Roll_no#40, Student_name#41, Mobile#42]
   +- *(4) Filter (isnotnull(sum#448L) AND (sum#448L > 0))
      +- *(4) HashAggregate(keys=[Roll_no#40, Student_name#41, Mobile#42], functions=[sum(vcol#445L)])
         +- Exchange hashpartitioning(Roll_no#40, Student_name#41, Mobile#42, 3), ENSURE_REQUIREMENTS, [plan_id=962]
            +- *(3) HashAggregate(keys=[Roll_no#40, Student_name#41, Mobile#42], functions=[partial_sum(vcol#445L)])
               +- Union
                  :- *(1) Project [1 AS vcol#445L, Roll_no#40, Student_name#41, Mobile#42]
                  :  +- FileScan csv 

In [22]:
#get record from one file, indirectly calls leftanti
masterdf.subtract(dailydf).show()
masterdf.subtract(dailydf).explain()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1005|       Manju|820456|
|   1006|      Magesh|987456|
|   1007|        Balu|654321|
+-------+------------+------+

== Physical Plan ==
*(2) HashAggregate(keys=[Roll_no#40, Student_name#41, Mobile#42], functions=[])
+- Exchange hashpartitioning(Roll_no#40, Student_name#41, Mobile#42, 3), ENSURE_REQUIREMENTS, [plan_id=1067]
   +- *(1) HashAggregate(keys=[Roll_no#40, Student_name#41, Mobile#42], functions=[])
      +- *(1) BroadcastHashJoin [coalesce(Roll_no#40, 0), isnull(Roll_no#40), coalesce(Student_name#41, ), isnull(Student_name#41), coalesce(Mobile#42, 0), isnull(Mobile#42)], [coalesce(Roll_no#120, 0), isnull(Roll_no#120), coalesce(Student_name#121, ), isnull(Student_name#121), coalesce(Mobile#122, 0), isnull(Mobile#122)], LeftAnti, BuildRight, false
         :- FileScan csv [Roll_no#40,Student_name#41,Mobile#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 

In [24]:
#get record from one file
masterdf.join(dailydf,on=["Roll_no","Student_name","Mobile"],how="leftanti").show()
masterdf.join(dailydf,on=["Roll_no","Student_name","Mobile"],how="leftanti").explain()

+-------+------------+------+
|Roll_no|Student_name|Mobile|
+-------+------------+------+
|   1005|       Manju|820456|
|   1006|      Magesh|987456|
|   1007|        Balu|654321|
+-------+------------+------+

== Physical Plan ==
*(2) BroadcastHashJoin [Roll_no#40, Student_name#41, Mobile#42], [Roll_no#120, Student_name#121, Mobile#122], LeftAnti, BuildRight, false
:- FileScan csv [Roll_no#40,Student_name#41,Mobile#42] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/venka/PycharmProjects/pythonProject/dataset/IntSetJoin1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Roll_no:int,Student_name:string,Mobile:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, int, false], input[1, string, false], input[2, int, false]),false), [plan_id=1150]
   +- *(1) Filter ((isnotnull(Roll_no#120) AND isnotnull(Student_name#121)) AND isnotnull(Mobile#122))
      +- FileScan csv [Roll_no#120,Student_name#121,Mobile#122] B