# Joins

Spark applications can bring together a large number of different datasets. For this reason, joins are an essential part of nearly all Spark workloads. Spark’s ability to talk to different data means that you gain the ability to tap into a variety of data sources across your company. This notebook covers not just what types of joins exist in Spark and how to use them, but also some of the basic internals so that you can think about how Spark actually goes about executing the join on the cluster. This basic knowledge can help you avoid running out of memory and tackle problems that you could not solve before.

## Join Expressions

A join brings together two sets of data, the left and the right, by comparing the value of one or more keys of the left and right and evaluating the result of a join expression that determines whether Spark should bring together the left set of data with the right set of data. The most common join expression, an equi-join, compares whether the specified keys in your left and right datasets are equal. If they are equal, Spark will combine the left and right datasets. The opposite is true for keys that do not match; Spark discards the rows that do not have matching keys. Spark also allows for much more sophsticated join policies in addition to equi-joins. We can even use complex types and perform something like checking whether a key exists within an array when you perform a join.

## Join Types

Whereas the join expression determines whether two rows should join, the join type determines what should be in the result set. There are a variety of different join types available in Spark for you to use:

* Inner joins (keep rows with keys that exist in the left and right datasets)
* Outer joins (keep rows with keys in either the left or right datasets)
* Left outer joins (keep rows with keys in the left dataset)
* Right outer joins (keep rows with keys in the right dataset)
* Left semi joins (keep the rows in the left, and only the left, dataset where the key appears in the right dataset)
* Left anti joins (keep the rows in the left, and only the left, dataset where they do not appear in the right dataset)
* Natural joins (perform a join by implicitly matching the columns between the two datasets with the same names)
* Cross (or Cartesian) joins (match every row in the left dataset with every row in the right dataset)

If you have ever interacted with a relational database system, or even an Excel spreadsheet, the concept of joining different datasets together should not be too abstract. Let’s move on to showing examples of each join type. This will make it easy to understand exactly how you can apply these to your own problems. To do this, let’s create some simple datasets that we can use in our examples:

In [1]:
person = spark.createDataFrame([
    (0, "Bill Chambers", 0, [100]),
    (1, "Matei Zaharia", 1, [500, 250, 100]),
    (2, "Michael Armbrust", 1, [250, 100])])\
  .toDF("id", "name", "graduate_program", "spark_status")

graduateProgram = spark.createDataFrame([
    (0, "Masters", "School of Information", "UC Berkeley"),
    (2, "Masters", "EECS", "UC Berkeley"),
    (1, "Ph.D.", "EECS", "UC Berkeley")])\
  .toDF("id", "degree", "department", "school")

person.show()
graduateProgram.show()

+---+----------------+----------------+---------------+
| id|            name|graduate_program|   spark_status|
+---+----------------+----------------+---------------+
|  0|   Bill Chambers|               0|          [100]|
|  1|   Matei Zaharia|               1|[500, 250, 100]|
|  2|Michael Armbrust|               1|     [250, 100]|
+---+----------------+----------------+---------------+

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  2|Masters|                EECS|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



Next, let’s register these as tables so that we use them:

In [2]:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")

## Inner Joins

Inner joins evaluate the keys in both of the DataFrames or tables and include (and join together) only the rows that evaluate to true. In the following example, we join the graduateProgram DataFrame with the person DataFrame to create a new DataFrame:

In [3]:
joinExpression = person["graduate_program"] == graduateProgram['id']

Inner joins are the default join, so we just need to specify our left DataFrame and join the right in the JOIN expression:

In [4]:
person.join(graduateProgram, joinExpression).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



We can also specify this explicitly by passing in a third parameter, the `how`:

In [5]:
person.join(graduateProgram, 
            on = person["graduate_program"] == graduateProgram['id'],
            how = "inner").show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+



General join format:

```python
leftDF.join(rightDF, 
            on = leftDF["abc"] == rightDF["xyz"], 
            how = "joinType")
```

In SQL

```sql
SELECT * FROM person INNER 
JOIN graduateProgram
ON person.graduate_program = graduateProgram.id
```

## Outer Joins

Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insert null:

In [6]:
person.join(graduateProgram, 
            on = person["graduate_program"] == graduateProgram['id'],
            how = "outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In SQL
```sql
SELECT * FROM person 
FULL OUTER JOIN graduateProgram
ON graduate_program = graduateProgram.id
```

## Left Outer Joins

Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame. If there is no equivalent row in the right DataFrame, Spark will insert null:

In [7]:
graduateProgram.join(person, 
                     on = person["graduate_program"] == graduateProgram['id'],
                     how = "left_outer").show()

+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
| id| degree|          department|     school|  id|            name|graduate_program|   spark_status|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+
|  0|Masters|School of Informa...|UC Berkeley|   0|   Bill Chambers|               0|          [100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   1|   Matei Zaharia|               1|[500, 250, 100]|
|  1|  Ph.D.|                EECS|UC Berkeley|   2|Michael Armbrust|               1|     [250, 100]|
|  2|Masters|                EECS|UC Berkeley|null|            null|            null|           null|
+---+-------+--------------------+-----------+----+----------------+----------------+---------------+



In SQL
```sql
SELECT * FROM graduateProgram 
LEFT OUTER JOIN person
ON person.graduate_program = graduateProgram.id
```

## Right Outer Joins

Right outer joins are exactly the same as left outer joins if we replaced the order of left and right tables. It will result in a different ordering of the columns:

In [8]:
person.join(graduateProgram, 
            on = person["graduate_program"] == graduateProgram['id'],
            how = "right_outer").show()

+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+
|   0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|   1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|   2|Michael Armbrust|               1|     [250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|null|            null|            null|           null|  2|Masters|                EECS|UC Berkeley|
+----+----------------+----------------+---------------+---+-------+--------------------+-----------+



In SQL
```sql
SELECT * FROM person 
RIGHT OUTER JOIN graduateProgram
ON person.graduate_program = graduateProgram.id
```

## Left Semi Joins

Semi joins are a bit of a departure from the other joins. They do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame. **Think of left semi joins as filters on a DataFrame**, as opposed to the function of a conventional join:

In [9]:
graduateProgram.join(person, 
                     on = person["graduate_program"] == graduateProgram['id'],
                     how = "left_semi").show()

+---+-------+--------------------+-----------+
| id| degree|          department|     school|
+---+-------+--------------------+-----------+
|  0|Masters|School of Informa...|UC Berkeley|
|  1|  Ph.D.|                EECS|UC Berkeley|
+---+-------+--------------------+-----------+



In SQL
```sql
SELECT * FROM gradProgram2 
LEFT SEMI JOIN person
ON gradProgram2.id = person.graduate_program
```

## Left Anti Joins

Left anti joins are the opposite of left semi joins. Like left semi joins, they do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. However, rather than keeping the values that exist in the second DataFrame, they keep only the values that do not have a corresponding key in the second DataFrame. Think of anti joins as a NOT IN SQL-style filter:

In [10]:
graduateProgram.join(person, 
                     on = person["graduate_program"] == graduateProgram['id'],
                     how = "left_anti").show()

+---+-------+----------+-----------+
| id| degree|department|     school|
+---+-------+----------+-----------+
|  2|Masters|      EECS|UC Berkeley|
+---+-------+----------+-----------+



In SQL
```sql
SELECT * FROM graduateProgram 
LEFT ANTI JOIN person
ON graduateProgram.id = person.graduate_program
```

## Cross (Cartesian) Joins

The last of our joins are cross-joins or cartesian products. Cross-joins in simplest terms are inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame. This will cause an absolute explosion in the number of rows contained in the resulting DataFrame. If you have 1,000 rows in each DataFrame, the cross-join of these will result in 1,000,000 (1,000 x 1,000) rows. For this reason, you must very explicitly state that you want a cross-join by using the cross join keyword:

In [11]:
person.crossJoin(graduateProgram).show()

+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status| id| degree|          department|     school|
+---+----------------+----------------+---------------+---+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|  0|Masters|School of Informa...|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  2|Masters|                EECS|UC Berkeley|
|  0|   Bill Chambers|               0|          [100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  0|Masters|School of Informa...|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  2|Masters|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|  1|  Ph.D.|                EECS|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|  0|Masters|School of Informa...|UC 

# Challenges When Using Joins

When performing joins, there are some specific challenges and some common questions that arise.

## Handling Duplicate Column Names

One of the tricky things that come up in joins is dealing with duplicate column names in your results DataFrame. In a DataFrame, each column has a unique ID within Spark’s SQL Engine, Catalyst. This unique ID is purely internal and not something that you can directly reference. This makes it quite difficult to refer to a specific column when you have a DataFrame with duplicate column names.

This can occur in two distinct situations:

* The join expression that you specify does not remove one key from one of the input DataFrames and the keys have the same column name

* Two columns on which you are not performing the join have the same name

Let’s create a problem dataset that we can use to illustrate these problems:

In [12]:
gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")

df = person.join(gradProgramDupe,
                 on = person["graduate_program"] == gradProgramDupe["graduate_program"],
                 how = "inner")
df.show()

+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|graduate_program| degree|          department|     school|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|               0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+----------------+-------+--------------------+-----------+



Note that there are now two graduate_program columns, even though we joined on that key:

Now if we run the following command we will get an error because of the ambiguous column name:

```python
df.select("graduate_program").show()
```

Error:
```
AnalysisException: "Reference 'graduate_program' is ambiguous, could be: graduate_program, graduate_program.;
```

### APPROACH 1: DIFFERENT JOIN EXPRESSION

When you have two keys that have the same name, probably the easiest fix is to change the join expression from a Boolean expression to a string or sequence. This automatically removes one of the columns for you during the join:

In [13]:
person.join(gradProgramDupe,"graduate_program").show()

+----------------+---+----------------+---------------+-------+--------------------+-----------+
|graduate_program| id|            name|   spark_status| degree|          department|     school|
+----------------+---+----------------+---------------+-------+--------------------+-----------+
|               0|  0|   Bill Chambers|          [100]|Masters|School of Informa...|UC Berkeley|
|               1|  2|Michael Armbrust|     [250, 100]|  Ph.D.|                EECS|UC Berkeley|
|               1|  1|   Matei Zaharia|[500, 250, 100]|  Ph.D.|                EECS|UC Berkeley|
+----------------+---+----------------+---------------+-------+--------------------+-----------+



### APPROACH 2: DROPPING THE COLUMN AFTER THE JOIN

Another approach is to drop the offending column after the join. When doing this, we need to refer to the column via the original source DataFrame. We can do this if the join uses the same key names or if the source DataFrames have columns that simply have the same name:

In [14]:
person.join(gradProgramDupe, 
           person["graduate_program"] == gradProgramDupe["graduate_program"])\
  .drop(person["graduate_program"]).show()

+---+----------------+---------------+----------------+-------+--------------------+-----------+
| id|            name|   spark_status|graduate_program| degree|          department|     school|
+---+----------------+---------------+----------------+-------+--------------------+-----------+
|  0|   Bill Chambers|          [100]|               0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|     [250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|[500, 250, 100]|               1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+---------------+----------------+-------+--------------------+-----------+



### APPROACH 3: RENAMING A COLUMN BEFORE THE JOIN

We can avoid this issue altogether if we rename one of our columns before the join:

In [15]:
gradProgram3 = graduateProgram.withColumnRenamed("id", "grad_id")

person.join(gradProgram3, 
           person["graduate_program"] == gradProgram3["grad_id"]).show()

+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
| id|            name|graduate_program|   spark_status|grad_id| degree|          department|     school|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+
|  0|   Bill Chambers|               0|          [100]|      0|Masters|School of Informa...|UC Berkeley|
|  2|Michael Armbrust|               1|     [250, 100]|      1|  Ph.D.|                EECS|UC Berkeley|
|  1|   Matei Zaharia|               1|[500, 250, 100]|      1|  Ph.D.|                EECS|UC Berkeley|
+---+----------------+----------------+---------------+-------+-------+--------------------+-----------+



# How Spark Performs Joins

To understand how Spark performs joins, you need to understand the two core resources at play: the node-to-node communication strategy and per node computation strategy. These internals are likely irrelevant to your business problem. However, comprehending how Spark performs joins can mean the difference between a job that completes quickly and one that never completes at all.

## Communication Strategies

Spark approaches cluster communication in three different ways during joins. It either incurs a **sort-merge join**, which results in an all-to-all communication, a **broadcast join**, or **shuffle-hash join**. The core foundation of our simplified view of joins is that in Spark you will have either a big table or a small table. Although this is obviously a spectrum (and things do happen differently if you have a “medium-sized table”), it can help to be binary about the distinction for the sake of this explanation.

### BIG TABLE–TO–BIG TABLE

When you join a big table to another big table, you end up with a sort-merge join. This behavior is standard as Spark 3.0 and as of the writing of this notebook. Prior to this version of Spark shuffle hash join was the default, but it has changed since then. The implementation of sort-merge Join in Spark is similar to any other SQL engine, except the fact that it happens over partitions because of the distributed nature of data. 

The illustration below represents how the data moves around different executors to make the join possible. If the DataFrames sharing the same partitioner are materialized by the same action, they will end up being co-located.

<img src="https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/figs/06-03-Joining-two-big-tables.png?raw=true" width="700" align="center"/>

In a sort-merge join, every node talks to every other node and they share data according to which node has a certain key or set of keys (on which you are joining). These joins are expensive because the network can become congested with traffic, especially if your DataFrames are not co-partitioned.

This join describes taking a big table of data and joining it to another big table of data. An example of this might be a company that receives billions of messages every day from the Internet of Things, and needs to identify the day-over-day changes that have occurred. The way to do this is by joining on *deviceId*, *messageType*, and *date* in one column, and *date - 1 day* in the other column.

In the figure above, DataFrame 1 and DataFrame 2 are both large DataFrames. This means that all worker nodes (and potentially every partition) will need to communicate with one another during the entire join process (with no intelligent partitioning of data).

### BIG TABLE–TO–SMALL TABLE

When the table is small enough to fit into the memory of a single worker node, with some breathing room of course, we can optimize our join. Although we can use a big table–to–big table communication strategy, it can often be more efficient to use a broadcast join. What this means is that we will replicate our small DataFrame onto every worker node in the cluster (be it located on one machine or many). Now this sounds expensive. However, what this does is to prevent us from performing the all-to-all communication during the entire join process. Instead, we perform it only once at the beginning and then let each individual worker node perform the work without having to wait or communicate with any other worker node, as is depicted in the figure below:

<img src="https://github.com/soltaniehha/Big-Data-Analytics-for-Business/blob/master/figs/06-03-A-broadcast-join.png?raw=true" width="700" align="center"/>

At the beginning of this join will be a large communication, just like in the previous type of join. However, immediately after that first, there will be no further communication between nodes. This means that joins will be performed on every single node individually, making CPU the biggest bottleneck. For our current set of data, we can see that Spark has automatically set this up as a sort-merge join by looking at the explain plan:

In [16]:
person.join(graduateProgram, 
           person["graduate_program"] == graduateProgram["id"]).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [graduate_program#10L], [id#24L], Inner
   :- Sort [graduate_program#10L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(graduate_program#10L, 200), ENSURE_REQUIREMENTS, [id=#1552]
   :     +- Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
   :        +- Filter isnotnull(_3#2L)
   :           +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
   +- Sort [id#24L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#24L, 200), ENSURE_REQUIREMENTS, [id=#1553]
         +- Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
            +- Filter isnotnull(_1#16L)
               +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]




With the DataFrame API, we can explicitly give the optimizer a hint that we would like to use a broadcast join by using the correct function around the small DataFrame in question:

In [17]:
from pyspark.sql.functions import broadcast

person.join(broadcast(graduateProgram), 
           person["graduate_program"] == graduateProgram["id"]).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [graduate_program#10L], [id#24L], Inner, BuildRight, false
   :- Project [_1#0L AS id#8L, _2#1 AS name#9, _3#2L AS graduate_program#10L, _4#3 AS spark_status#11]
   :  +- Filter isnotnull(_3#2L)
   :     +- Scan ExistingRDD[_1#0L,_2#1,_3#2L,_4#3]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#1585]
      +- Project [_1#16L AS id#24L, _2#17 AS degree#25, _3#18 AS department#26, _4#19 AS school#27]
         +- Filter isnotnull(_1#16L)
            +- Scan ExistingRDD[_1#16L,_2#17,_3#18,_4#19]




if you try to broadcast something too large, you can crash your driver node (because that collect is expensive). This is likely an area for optimization in the future.

### LITTLE TABLE–TO–LITTLE TABLE

When performing joins with small tables, it’s usually best to let Spark decide how to join them. You can always force a broadcast join if you’re noticing strange behavior.