### Joins
* A join brings together two sets of data, the left and the right, by comparing the value of one or more keys of the left and right and <evaluating the result of a join expression that determines whether Spark should bring together the left set of data with the right set of data.

`Syntax`

`join(other, on=None, how=None)[source]`

##### Joins with another DataFrame, using the given join expression.

##### Parameters
* other – Right side of the join

* on – a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

* how – str, default inner. Must be one of: `inner`, `cross`, `outer`, `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti` and `left_anti`.

### Join Types
* Whereas the join expression determines whether two rows should join, the join type determines what should be in the result set.
##### INNER JOIN
* The inner join is the default join in Spark SQL. It selects rows that have matching values in both relations.
##### CROSS JOIN
* A cross join returns the Cartesian product of two relations.
##### LEFT JOIN
* A left join returns all values from the left relation and the matched values from the right relation, or appends NULL if there is no match. It is also referred to as a left outer join.
##### RIGHT JOIN
* A right join returns all values from the right relation and the matched values from the left relation, or appends NULL if there is no match. It is also referred to as a right outer join
##### FULL JOIN
* A full join returns all values from both relations, appending NULL values on the side that does not have a match. It is also referred to as a full outer join.
##### SEMI JOIN
* A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join.
##### ANTI JOIN
* An anti join returns values from the left relation that has no match with the right. It is also referred to as a left anti join.

In [0]:
emp_csv=spark.read.csv("/FileStore/tables/emp.csv",header=True,inferSchema=True)
dept_csv=spark.read.csv("/FileStore/tables/dept.csv",header=True,inferSchema=True)

In [0]:
dept_csv.show()

#### Natural Joins
* Natural joins make implicit guesses at the columns on which you would like to join. It finds matching columns and returns the results. Left, right, and outer natural joins are all supported.

* Natural joins (perform a join by implicitly matching the columns between the two datasets with the same names)

In [0]:
#emp_csv.join(dept_csv,on =emp_csv['deptno'] == dept_csv['deptno'],how='inner').show()
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'inner').show()

#### Outer Joins
* Outer joins evaluate the keys in both of the DataFrames or tables and includes (and joins together) the rows that evaluate to true or false. If there is no equivalent row in either the left or right DataFrame, Spark will insert null:

* Outer joins (keep rows with keys in either the left or right datasets)

In [0]:
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'full_outer').show()

#### Left Outer Joins
* Left outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the left DataFrame as well as any rows in the right DataFrame that have a match in the left DataFrame.

* Left outer joins (keep rows with keys in the left dataset)

In [0]:
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'left_outer').show()

#### Right Outer Joins
* Right outer joins evaluate the keys in both of the DataFrames or tables and includes all rows from the right DataFrame as well as any rows in the left DataFrame that have a match in the right DataFrame.

* Right outer joins (keep rows with keys in the right dataset)

In [0]:
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'right').show()

#### Left Semi Joins
* Semi joins are a bit of a departure from the other joins. They do not actually include any values from the right DataFrame. They only compare values to see if the value exists in the second DataFrame. If the value does exist, those rows will be kept in the result, even if there are duplicate keys in the left DataFrame.

* Left semi joins (keep the rows in the left, and only the left, dataset where the key appears in the right dataset)


##### The essential differences between a semi join and a regular join are:

* Semi join either returns each row from input A, or it does not. No row duplication can occur.
* Regular join duplicates rows if there are multiple matches on the join predicate.
* Semi join is defined to only return columns from input A.
* Regular join may return columns from either (or both) join inputs.

In [0]:
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'semi').show()

In [0]:
%sql
select * from joins.emp e where exists (select * From joins.dept d where e.deptno = d.deptno)

#### Left Anti Joins
* Left anti joins are the opposite of left semi joins. Like left semi joins, they do not actually include any values from the right DataFrame.
* They only compare values to see if the value exists in the second DataFrame.
* However, rather than keeping the values that exist in the second DataFrame, they keep only the values that do not have a corresponding key in the second DataFrame.

* Left anti joins (keep the rows in the left, and only the left, dataset where they do not appear in the right dataset)

In [0]:
emp_csv.join(dept_csv,emp_csv['deptno'] == dept_csv['deptno'],'anti').show()

#### Cross (Cartesian) Joins
* The last of our joins are cross-joins or cartesian products. Cross-joins in simplest terms are inner joins that do not specify a predicate. Cross joins will join every single row in the left DataFrame to ever single row in the right DataFrame

* Cross (or Cartesian) joins (match every row in the left dataset with every row in the right dataset)

In [0]:
emp_csv.crossJoin(dept_csv).show()

## SQL JOINS

###### Creating database name as `JOINS`  if not exists for validatinng joins for both `EMP` and `DEPT` tables

In [0]:
%sql
create database if not exists joins

##### Creating `EMP` and `DEPT` tables using EMP_CSV and DEPT_CSV Dataframes which is created from files.

In [0]:
emp_csv=spark.read.csv("/FileStore/tables/emp.csv",header=True,inferSchema=True)
dept_csv=spark.read.csv("/FileStore/tables/dept.csv",header=True,inferSchema=True)
emp_csv.write.mode('overwrite').saveAsTable('joins.emp')
dept_csv.write.mode('overwrite').saveAsTable('joins.dept')

In [0]:
%sql
-- Use emp and dept tables to demonstrate different type of joins.
-- Use emp and dept tables to demonstrate inner join.

SELECT * FROM joins.emp  INNER JOIN joins.dept on emp.deptno = dept.deptno 

In [0]:
%sql
-- Use emp and dept tables to demonstrate left join.
SELECT *  FROM joins.emp LEFT JOIN joins.dept ON emp.deptno = dept.deptno;

In [0]:
%sql
-- Use emp and dept tables to demonstrate right join.
SELECT *  FROM joins.emp RIGHT JOIN joins.dept ON emp.deptno = dept.deptno;

In [0]:
%sql
-- Use emp and dept tables to demonstrate full join.
SELECT *  FROM joins.emp FULL JOIN joins.dept ON emp.deptno = dept.deptno;

In [0]:
%sql
-- Use emp and dept tables to demonstrate cross join.
SELECT empno, ename, emp.deptno, dname FROM joins.emp CROSS JOIN joins.dept;

In [0]:
%sql
-- Use emp and dept tables to demonstrate semi join. we can use LEFT SEMI JOIN or   SEMI JOIN
SELECT * FROM joins.emp  SEMI JOIN joins.dept ON emp.deptno = dept.deptno; 

In [0]:
%sql
-- Use emp and dept tables to demonstrate anti join. we can use LEFT ANTI JOIN or ANTI JOIN
SELECT * FROM joins.emp  ANTI JOIN joins.dept ON emp.deptno = dept.deptno; 

#### Broadcast join
* Little table–to–little table
* When performing joins with small tables, it’s usually best to let Spark decide how to join them.
* You can always force a broadcast join if you’re noticing strange behavior.
* Broadcast joins are easier to run on a cluster. Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster. After the small DataFrame is broadcasted, Spark can perform a join without shuffling any of the data in the large DataFrame.

In [0]:

from pyspark.sql.functions import broadcast 

emp_csv.join(broadcast(dept_csv),emp_csv['deptno']==dept_csv['deptno']).explain() # Marks a DataFrame as small enough for use in broadcast joins.

In [0]:
import pyspark.sql.functions as f
emp_csv.persist()
emp_csv.filter(f.col('comm').isNull()).select('*').show() # Marks a DataFrame as small enough for use in broadcast joins.

In [0]:
%sql
--We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

#### Coalesce Hints for SQL Queries
* Coalesce hints allows the Spark SQL users to control the number of output files just like the coalesce, repartition and repartitionByRange in Dataset API, they can be used for performance tuning and reducing the number of output files. The “COALESCE” hint only has a partition number as a parameter. The “REPARTITION” hint has a partition number, columns, or both of them as parameters. The “REPARTITION_BY_RANGE” hint must have column names and a partition number is optional.

In [0]:
emp_csv.write.format('parquet').mode('overwrite').partitionBy('DEPTNO').saveAsTable('joins.emp')


In [0]:
%sql
show create table joins.emp

In [0]:
%sql
--EXPLAIN   SELECT /*+ COALESCE(3) */ * FROM joins.emp where deptno=10;
--EXPLAIN SELECT /*+ REPARTITION(3) */ * FROM joins.emp where deptno=10;
EXPLAIN SELECT /*+ REPARTITION(deptno) */ * FROM joins.emp where deptno=20;
--SELECT /*+ REPARTITION(3, deptno) */ * FROM joins.emp;
--SELECT /*+ REPARTITION_BY_RANGE(deptno) */ * FROM joins.emp;
--SELECT /*+ REPARTITION_BY_RANGE(3, deptno) */ * FROM joins.emp;

#### Join Hints Types

#### BROADCAST

* Suggests that Spark use broadcast join. The join side with the hint will be broadcast regardless of autoBroadcastJoinThreshold. If both sides of the join have the broadcast hints, the one with the smaller size (based on stats) will be broadcast. The aliases for BROADCAST are BROADCASTJOIN and MAPJOIN.

#### MERGE

* Suggests that Spark use shuffle sort merge join. The aliases for MERGE are SHUFFLE_MERGE and MERGEJOIN.

#### SHUFFLE_HASH

* Suggests that Spark use shuffle hash join. If both sides have the shuffle hash hints, Spark chooses the smaller side (based on stats) as the build side.

#### SHUFFLE_REPLICATE_NL

* Suggests that Spark use shuffle-and-replicate nested loop join.

In [0]:
%sql
ANALYZE TABLE joins.dept COMPUTE STATISTICS noscan

In [0]:
%sql
SELECT * FROM joins.emp   JOIN joins.dept ON emp.deptno = dept.deptno;


EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO,Deptno,Dname,Loc
7499,ALLEN,SALESMAN,7698.0,20-02-81,1600,300.0,30,30,SALES,CHICAGO
7521,WARD,SALESMAN,7698.0,22-02-81,1250,500.0,30,30,SALES,CHICAGO
7654,MARTIN,SALESMAN,7698.0,28-09-81,1250,1400.0,30,30,SALES,CHICAGO
7698,SGR,MANAGER,7839.0,01-05-81,2850,,30,30,SALES,CHICAGO
7844,TURNER,SALESMAN,7698.0,08-09-81,1500,0.0,30,30,SALES,CHICAGO
7900,JAMES,CLERK,7698.0,03-12-81,950,,30,30,SALES,CHICAGO
7369,SMITH,CLERK,7902.0,17-12-80,800,,20,20,RESEARCH,DALLAS
7566,JONES,MANAGER,7839.0,02-04-81,2975,,20,20,RESEARCH,DALLAS
7788,SCOTT,ANALYST,7566.0,19-04-87,3000,,20,20,RESEARCH,DALLAS
7876,ADAMS,CLERK,7788.0,23-05-87,1100,,20,20,RESEARCH,DALLAS


In [0]:
%sql
-- Join Hints for broadcast join
SELECT /*+ BROADCAST(joins.dept) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;
SELECT /*+ BROADCASTJOIN (joins.emp) */ * FROM joins.emp left JOIN joins.dept ON emp.deptno = dept.deptno;
SELECT /*+ MAPJOIN(joins.dept) */ * FROM joins.emp right JOIN joins.dept ON emp.deptno = dept.deptno;

-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(joins.emp) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;
SELECT /*+ MERGEJOIN(joins.dept) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;
SELECT /*+ MERGE(joins.emp) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;

-- Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(joins.emp) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;

-- Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(joins.emp) */ * FROM joins.emp INNER JOIN joins.dept ON emp.deptno = dept.deptno;

-- When different join strategy hints are specified on both sides of a join, Spark
-- prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
-- over the SHUFFLE_REPLICATE_NL hint.
-- Spark will issue Warning in the following example
-- org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
-- is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(joins.dept), MERGE(joins.emp, joins.dept) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;



In [0]:
df1 = spark.range(2, 1000000, 2)
df2 = spark.range(2, 1000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.explain()