**JOINS**

Basic knowledge can help you avoid running out of memory and tackle problems

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CSV_Read").getOrCreate()

In [5]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")

person.show()


+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+



In [6]:
graduateProgram = spark.createDataFrame([
  (0, "Masters", "School of Information", "UC Berkeley"),
  (2, "Masters", "EECS", "UC Berkeley"),
  (1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")

graduateProgram.show()


+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [7]:
sparkStatus = spark.createDataFrame([
  (500, "Vice President"),
  (250, "PMC Member"),
  (100, "Contributor")])\
.toDF("id", "status")

sparkStatus.show()

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



In [9]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")

In [11]:
joinExpression = person["graduate_program"] == graduateProgram['id']
wrongJoinExpression = person["name"] == graduateProgram["school"]

In [12]:
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [14]:
spark.sql("""
SELECT * FROM person JOIN graduateProgram
 ON person.graduate_program = graduateProgram.id

""").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [15]:
joinType = "inner"

person.join(graduateProgram, joinExpression, joinType).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [18]:
spark.sql("""
SELECT * FROM person INNER JOIN graduateProgram
 ON person.graduate_program = graduateProgram.id

""").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



**Outer Joins**

Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insert null

In [19]:
joinType = "outer"

person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|NULL|            NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



**Left Outer Joins**

Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame

In [20]:
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Masters|                EECS|UC Berkeley|NULL|            NULL|            NULL|           NULL|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



**Right Outer Joins**

Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame

In [21]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|NULL|            NULL|            NULL|           NULL|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In [25]:
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [graduate_program#35L], [id#54L], RightOuter
   :- Sort [graduate_program#35L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(graduate_program#35L, 200), ENSURE_REQUIREMENTS, [plan_id=1425]
   :     +- Project [_1#29L AS id#33L, _2#30 AS name#34, _3#31L AS graduate_program#35L, _4#32 AS spark_status#36]
   :        +- Filter isnotnull(_3#31L)
   :           +- Scan ExistingRDD[_1#29L,_2#30,_3#31L,_4#32]
   +- Sort [id#54L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#54L, 200), ENSURE_REQUIREMENTS, [plan_id=1426]
         +- Project [_1#50L AS id#54L, _2#51 AS degree#55, _3#52 AS department#56, _4#53 AS school#57]
            +- Scan ExistingRDD[_1#50L,_2#51,_3#52,_4#53]




**Left Semi Join**

They do not actually include any values from the right DataFrame.
They only compare values to see if the value exists in the second DataFrame.
If the value does exist, those rows will be kept in the result

In [22]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In [24]:
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [_1#50L AS id#54L, _2#51 AS degree#55, _3#52 AS department#56, _4#53 AS school#57]
   +- SortMergeJoin [_1#50L], [graduate_program#35L], LeftSemi
      :- Sort [_1#50L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(_1#50L, 200), ENSURE_REQUIREMENTS, [plan_id=1394]
      :     +- Filter isnotnull(_1#50L)
      :        +- Scan ExistingRDD[_1#50L,_2#51,_3#52,_4#53]
      +- Sort [graduate_program#35L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(graduate_program#35L, 200), ENSURE_REQUIREMENTS, [plan_id=1395]
            +- Project [_3#31L AS graduate_program#35L]
               +- Filter isnotnull(_3#31L)
                  +- Scan ExistingRDD[_1#29L,_2#30,_3#31L,_4#32]




**Cross (Cartesian) Joins**

Cross-joins in simplest terms are inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. This will cause an absolute explosion in the number of
 rows contained in the resulting DataFrame.

 If you have 1,000 rows in each DataFrame, the cross join of these will result in 1,000,000 (1,000 x 1,000) rows. For this reason, you must very
 explicitly state that you want a cross-join by using the cross join keyword:

In [None]:
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()

**Joins on Complex Types**

In [23]:
from pyspark.sql.functions import expr

person.withColumnRenamed("id", "personId")\
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()

+--------+----------------+----------------+---------------+---+--------------+
|personId|            name|graduate_program|   spark_status| id|        status|
+--------+----------------+----------------+---------------+---+--------------+
|       0|   Bill Chambers|               0|          [100]|100|   Contributor|
|       1|   Matei Zaharia|               1|[500, 250, 100]|500|Vice President|
|       1|   Matei Zaharia|               1|[500, 250, 100]|250|    PMC Member|
|       1|   Matei Zaharia|               1|[500, 250, 100]|100|   Contributor|
|       2|Michael Armbrust|               1|     [250, 100]|250|    PMC Member|
|       2|Michael Armbrust|               1|     [250, 100]|100|   Contributor|
+--------+----------------+----------------+---------------+---+--------------+



-- in SQL

 SELECT * FROM
 (select id as personId, name, graduate_program, spark_status FROM person)
 INNER JOIN sparkStatus ON array_contains(spark_status, id)

**How Spark Performs Joins**

two core resources at play:

The node-to-node communication strategy and
per node computation strategy

Comprehending how Spark performs joins can mean the difference between a job that completes quickly and one that never completes at all.

**Communication Strategies**

Spark approaches cluster communication in two different ways during joins

1. Shuffle join, which results in an all-to-all communication
2. Broadcast join

In [29]:
from pyspark.sql.functions import col

joinexpr = person["graduate_program"] == graduateProgram["id"]
person.join(graduateProgram, joinexpr).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [graduate_program#35L], [id#54L], Inner
   :- Sort [graduate_program#35L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(graduate_program#35L, 200), ENSURE_REQUIREMENTS, [plan_id=1459]
   :     +- Project [_1#29L AS id#33L, _2#30 AS name#34, _3#31L AS graduate_program#35L, _4#32 AS spark_status#36]
   :        +- Filter isnotnull(_3#31L)
   :           +- Scan ExistingRDD[_1#29L,_2#30,_3#31L,_4#32]
   +- Sort [id#54L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#54L, 200), ENSURE_REQUIREMENTS, [plan_id=1460]
         +- Project [_1#50L AS id#54L, _2#51 AS degree#55, _3#52 AS department#56, _4#53 AS school#57]
            +- Filter isnotnull(_1#50L)
               +- Scan ExistingRDD[_1#50L,_2#51,_3#52,_4#53]




The SQL interface also includes the ability to provide hints to perform joins. These are not enforced, however, so the optimizer might choose to ignore them. You can set one of these hints by using a special comment syntax. MAPJOIN, BROADCAST, and BROADCASTJOIN all do the same thing and are all supported

-- in SQL


 SELECT /*+ MAPJOIN(graduateProgram) */  * FROM person JOIN graduateProgram
 ON person.graduate_program = graduateProgram.id