In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Joins").getOrCreate()

In [2]:
spark

In [7]:
from pyspark.sql.functions import col, lit

customers = [
    (1,"Lewis","Uk"),
    (2,"Max", "NL"),
    (3,"Charles","MC"),
    (4,"Lando", "UK"),
    (5,"Seb", "DE"),

]

orders = [

    (101,1, "2020-07-10", 250.0, "UK"),
    (102,1, "2020-08-12",99.5, "UK"),
    (103, 2,  "2020-07-12", 300.0, "NL"),
    (104, 2,  "2021-03-03", 150.0, "NL"),
    (105, 6,  "2021-01-01",  50.0, "CZ"),

]

custDF = spark.createDataFrame(customers, ["customerId","name","country"])
orderDF = spark.createDataFrame(orders, ["orderId","customerId","orderDate","amount","country"])
    

In [8]:
custDF.show()

+----------+-------+-------+
|customerId|   name|country|
+----------+-------+-------+
|         1|  Lewis|     Uk|
|         2|    Max|     NL|
|         3|Charles|     MC|
|         4|  Lando|     UK|
|         5|    Seb|     DE|
+----------+-------+-------+



In [9]:
orderDF.show()

+-------+----------+----------+------+-------+
|orderId|customerId| orderDate|amount|country|
+-------+----------+----------+------+-------+
|    101|         1|2020-07-10| 250.0|     UK|
|    102|         1|2020-08-12|  99.5|     UK|
|    103|         2|2020-07-12| 300.0|     NL|
|    104|         2|2021-03-03| 150.0|     NL|
|    105|         6|2021-01-01|  50.0|     CZ|
+-------+----------+----------+------+-------+



In [10]:
df1 = orderDF.join(custDF, "customerId", "inner")
df1.show()

+----------+-------+----------+------+-------+-----+-------+
|customerId|orderId| orderDate|amount|country| name|country|
+----------+-------+----------+------+-------+-----+-------+
|         1|    101|2020-07-10| 250.0|     UK|Lewis|     Uk|
|         1|    102|2020-08-12|  99.5|     UK|Lewis|     Uk|
|         2|    103|2020-07-12| 300.0|     NL|  Max|     NL|
|         2|    104|2021-03-03| 150.0|     NL|  Max|     NL|
+----------+-------+----------+------+-------+-----+-------+



In [11]:
df1 = orderDF.join(custDF, orderDF.customerId == custDF.customerId, "inner")
df1.show()

+-------+----------+----------+------+-------+----------+-----+-------+
|orderId|customerId| orderDate|amount|country|customerId| name|country|
+-------+----------+----------+------+-------+----------+-----+-------+
|    101|         1|2020-07-10| 250.0|     UK|         1|Lewis|     Uk|
|    102|         1|2020-08-12|  99.5|     UK|         1|Lewis|     Uk|
|    103|         2|2020-07-12| 300.0|     NL|         2|  Max|     NL|
|    104|         2|2021-03-03| 150.0|     NL|         2|  Max|     NL|
+-------+----------+----------+------+-------+----------+-----+-------+



In [15]:
df1 = orderDF.join(custDF, ["customerId", "country"], "inner")  
df1.show()

+----------+-------+-------+----------+------+----+
|customerId|country|orderId| orderDate|amount|name|
+----------+-------+-------+----------+------+----+
|         2|     NL|    103|2020-07-12| 300.0| Max|
|         2|     NL|    104|2021-03-03| 150.0| Max|
+----------+-------+-------+----------+------+----+



In [18]:
df1 =orderDF.join(custDF, (orderDF.customerId == custDF.customerId) & (orderDF.country == custDF.country), "inner")
df1.show()

+-------+----------+----------+------+-------+----------+----+-------+
|orderId|customerId| orderDate|amount|country|customerId|name|country|
+-------+----------+----------+------+-------+----------+----+-------+
|    103|         2|2020-07-12| 300.0|     NL|         2| Max|     NL|
|    104|         2|2021-03-03| 150.0|     NL|         2| Max|     NL|
+-------+----------+----------+------+-------+----------+----+-------+



In [19]:
df1 = orderDF.join(custDF, (orderDF.customerId == custDF.customerId) & (orderDF.country == custDF.country), "inner") \
.select(orderDF.orderId,orderDF.customerId,orderDF.orderDate,orderDF.amount,orderDF.country,custDF.country.alias('country_Customer'))
df1.show()

+-------+----------+----------+------+-------+----------------+
|orderId|customerId| orderDate|amount|country|country_Customer|
+-------+----------+----------+------+-------+----------------+
|    103|         2|2020-07-12| 300.0|     NL|              NL|
|    104|         2|2021-03-03| 150.0|     NL|              NL|
+-------+----------+----------+------+-------+----------------+



In [24]:
orderDF.createOrReplaceTempView("orders")
custDF.createOrReplaceTempView("customers")

spark.sql("""
    SELECT o.orderId, o.customerId, c.name, c.country
    FROM orders o
    JOIN customers c
    ON o.customerId = c.customerId
    """).show()


+-------+----------+-----+-------+
|orderId|customerId| name|country|
+-------+----------+-----+-------+
|    101|         1|Lewis|     Uk|
|    102|         1|Lewis|     Uk|
|    103|         2|  Max|     NL|
|    104|         2|  Max|     NL|
+-------+----------+-----+-------+



In [25]:
leftDF = orderDF.join(custDF, "customerId","left")
leftDF.show()

+----------+-------+----------+------+-------+-----+-------+
|customerId|orderId| orderDate|amount|country| name|country|
+----------+-------+----------+------+-------+-----+-------+
|         1|    101|2020-07-10| 250.0|     UK|Lewis|     Uk|
|         1|    102|2020-08-12|  99.5|     UK|Lewis|     Uk|
|         2|    103|2020-07-12| 300.0|     NL|  Max|     NL|
|         2|    104|2021-03-03| 150.0|     NL|  Max|     NL|
|         6|    105|2021-01-01|  50.0|     CZ| NULL|   NULL|
+----------+-------+----------+------+-------+-----+-------+



In [27]:
rightDF = orderDF.join(custDF, "customerId","right")
rightDF.show()

+----------+-------+----------+------+-------+-------+-------+
|customerId|orderId| orderDate|amount|country|   name|country|
+----------+-------+----------+------+-------+-------+-------+
|         1|    102|2020-08-12|  99.5|     UK|  Lewis|     Uk|
|         1|    101|2020-07-10| 250.0|     UK|  Lewis|     Uk|
|         2|    104|2021-03-03| 150.0|     NL|    Max|     NL|
|         2|    103|2020-07-12| 300.0|     NL|    Max|     NL|
|         3|   NULL|      NULL|  NULL|   NULL|Charles|     MC|
|         4|   NULL|      NULL|  NULL|   NULL|  Lando|     UK|
|         5|   NULL|      NULL|  NULL|   NULL|    Seb|     DE|
+----------+-------+----------+------+-------+-------+-------+



In [28]:
fullDF = orderDF.join(custDF, "customerId","full")
fullDF.show()

+----------+-------+----------+------+-------+-------+-------+
|customerId|orderId| orderDate|amount|country|   name|country|
+----------+-------+----------+------+-------+-------+-------+
|         1|    101|2020-07-10| 250.0|     UK|  Lewis|     Uk|
|         1|    102|2020-08-12|  99.5|     UK|  Lewis|     Uk|
|         2|    103|2020-07-12| 300.0|     NL|    Max|     NL|
|         2|    104|2021-03-03| 150.0|     NL|    Max|     NL|
|         3|   NULL|      NULL|  NULL|   NULL|Charles|     MC|
|         4|   NULL|      NULL|  NULL|   NULL|  Lando|     UK|
|         5|   NULL|      NULL|  NULL|   NULL|    Seb|     DE|
|         6|    105|2021-01-01|  50.0|     CZ|   NULL|   NULL|
+----------+-------+----------+------+-------+-------+-------+



In [29]:
crossDF = orderDF.join(custDF,how = "cross")
crossDF.show(30)

+-------+----------+----------+------+-------+----------+-------+-------+
|orderId|customerId| orderDate|amount|country|customerId|   name|country|
+-------+----------+----------+------+-------+----------+-------+-------+
|    101|         1|2020-07-10| 250.0|     UK|         1|  Lewis|     Uk|
|    101|         1|2020-07-10| 250.0|     UK|         2|    Max|     NL|
|    101|         1|2020-07-10| 250.0|     UK|         3|Charles|     MC|
|    101|         1|2020-07-10| 250.0|     UK|         4|  Lando|     UK|
|    101|         1|2020-07-10| 250.0|     UK|         5|    Seb|     DE|
|    102|         1|2020-08-12|  99.5|     UK|         1|  Lewis|     Uk|
|    102|         1|2020-08-12|  99.5|     UK|         2|    Max|     NL|
|    102|         1|2020-08-12|  99.5|     UK|         3|Charles|     MC|
|    102|         1|2020-08-12|  99.5|     UK|         4|  Lando|     UK|
|    102|         1|2020-08-12|  99.5|     UK|         5|    Seb|     DE|
|    103|         2|2020-07-12| 300.0|

In [30]:
crossDF.count()

25

In [31]:
semiDF = orderDF.join(custDF, "customerId", "semi")
semiDF.show()

+----------+-------+----------+------+-------+
|customerId|orderId| orderDate|amount|country|
+----------+-------+----------+------+-------+
|         1|    101|2020-07-10| 250.0|     UK|
|         1|    102|2020-08-12|  99.5|     UK|
|         2|    103|2020-07-12| 300.0|     NL|
|         2|    104|2021-03-03| 150.0|     NL|
+----------+-------+----------+------+-------+



In [32]:
antiDF = orderDF.join(custDF,"customerId","anti")
antiDF.show()

+----------+-------+----------+------+-------+
|customerId|orderId| orderDate|amount|country|
+----------+-------+----------+------+-------+
|         6|    105|2021-01-01|  50.0|     CZ|
+----------+-------+----------+------+-------+



In [33]:
antiDF = custDF.join(orderDF, "customerId","anti")
antiDF.show()

+----------+-------+-------+
|customerId|   name|country|
+----------+-------+-------+
|         3|Charles|     MC|
|         4|  Lando|     UK|
|         5|    Seb|     DE|
+----------+-------+-------+



In [34]:
from pyspark.sql.functions import col, lit

customers = [
    (1, "Lewis",   "UK"),
    (2, "Max",     "NL"),
    (3, "Charles", "MC"),
    (4, "Lando",   "UK"),
    (5, "Seb",     "DE"),
    (None, "Ghost", "IT")       # NULL id to show null join behavior
]
orders = [
    (101, 1,  "2020-07-10", 250.0, "UK"),
    (102, 1,  "2020-08-12",  99.5, "UK"),
    (103, 2,  "2020-07-12", 300.0, "NL"),
    (104, 2,  "2021-03-03", 150.0, "NL"),
    (105, 6,  "2021-01-01",  50.0, "CZ"),  # no matching customer (id=6)
    (106, None, "2021-05-05", 10.0, "SW")  # NULL id to show null join behavior
]

custDF = spark.createDataFrame(customers,  ["customerId", "name", "country"])
orderDF = spark.createDataFrame(orders,    ["orderId", "customerId", "orderDate", "amount", "country"])

In [35]:
df1 = orderDF.join(custDF, orderDF.customerId == custDF.customerId,"inner")
df1.show()

+-------+----------+----------+------+-------+----------+-----+-------+
|orderId|customerId| orderDate|amount|country|customerId| name|country|
+-------+----------+----------+------+-------+----------+-----+-------+
|    101|         1|2020-07-10| 250.0|     UK|         1|Lewis|     UK|
|    102|         1|2020-08-12|  99.5|     UK|         1|Lewis|     UK|
|    103|         2|2020-07-12| 300.0|     NL|         2|  Max|     NL|
|    104|         2|2021-03-03| 150.0|     NL|         2|  Max|     NL|
+-------+----------+----------+------+-------+----------+-----+-------+



In [37]:
orderDF.join(custDF,orderDF.customerId.eqNullSafe(custDF.customerId),"inner").show()

+-------+----------+----------+------+-------+----------+-----+-------+
|orderId|customerId| orderDate|amount|country|customerId| name|country|
+-------+----------+----------+------+-------+----------+-----+-------+
|    106|      NULL|2021-05-05|  10.0|     SW|      NULL|Ghost|     IT|
|    101|         1|2020-07-10| 250.0|     UK|         1|Lewis|     UK|
|    102|         1|2020-08-12|  99.5|     UK|         1|Lewis|     UK|
|    103|         2|2020-07-12| 300.0|     NL|         2|  Max|     NL|
|    104|         2|2021-03-03| 150.0|     NL|         2|  Max|     NL|
+-------+----------+----------+------+-------+----------+-----+-------+

