# Join
T1.join(T2) = {(k, (v1, v2))}

T2.join(T1) = {(k, (v2, v1))}

where k = k1 = k2,

(k, v1) in T1,

(k, v2) in T2


In [2]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("pyspark-dataframe-join").master("local[*]").getOrCreate()

# Dataset 1

In [2]:
d1 = [('a', 10), ('a', 11), ('a', 12), ('b', 100), ('b', 200), ('c', 80)]
T1 = spark.createDataFrame(d1, ['id', 'v1'])
T1.show()

+---+---+
| id| v1|
+---+---+
|  a| 10|
|  a| 11|
|  a| 12|
|  b|100|
|  b|200|
|  c| 80|
+---+---+



# Dataset 2

In [3]:
d2 = [('a', 40), ('a', 50), ('b', 300), ('b', 400), ('d', 90)]
T2 = spark.createDataFrame(d2, ['id', 'v2'])
T2.show()

+---+---+
| id| v2|
+---+---+
|  a| 40|
|  a| 50|
|  b|300|
|  b|400|
|  d| 90|
+---+---+



# Join Operation

In [4]:
joined = T1.join(T2, (T1.id == T2.id))
joined.show(100, truncate=False)

+---+---+---+---+
|id |v1 |id |v2 |
+---+---+---+---+
|a  |10 |a  |40 |
|a  |10 |a  |50 |
|a  |11 |a  |40 |
|a  |11 |a  |50 |
|a  |12 |a  |40 |
|a  |12 |a  |50 |
|b  |100|b  |300|
|b  |100|b  |400|
|b  |200|b  |300|
|b  |200|b  |400|
+---+---+---+---+



# Inner join

# left join

# right join

# Join in MapReduce

Assume to have two relations: R(K, B) and S(K, C) where K is a common key between relations R and S. Also assume that B and C represents attributes of R and S respectively. How do we find the join of R and S? The goal of join operation is find tuples that agree on their key K. A MapReduce implementation of Natural Join for R and S can implemented as:

Map:
For a tuple (k, b) in R emit a (key, value) pair as (k, ("R", b))

For a tuple (k, c) in S, emit a (key, value) pair as (k, ("S", c))

Once mappers are done, then we can execute the reducer as:

Reduce:
If a reducer key k has value list [("R", v),("S", w)], then emit a single (key, value) pair as (k, (v, w)). Note that join(R, S) will produce (k, (v, w)), while join(S, R) will produce (k, (w, v)).

If a reducer key k has value list [("R", v1), ("R", v2), ("S", w1), ("S", w2)], then we will emit four (key, value) pairs as

(k, (v1, w1))
(k, (v1, w2))
(k, (v2, w1))
(k, (v2, w2))
Therefore, to perform a natural join between two relations R and S, we need to perform 2 map functions and one reducer.


# Map Phase

1.
key: relation R
value: (k, b) tuple in R

map(key, value) {

  emit(k, ("R", b))
  
}

2.
key: relation S

value: (k, c) tuple in S

map(key, value) {

  emit(k, ("S", c))
  
}


The output of mappers (as input to “sort and shuffle” phase) will be:

(k1, "R", r1)

(k1, "R", r2)

...

(k1, "S", s1)

(k1, "S", s2)

...

(k2, "R", r3)

(k2, "R", r4)

...

(k2, "S", s3)

(k2, "S", s4)

# Reducer Phase

Before, we write a reducer function, we need to understand the magic of MapReduce, which is called a “sort and shuffle” phase (similar to SQL’s GROUP BY function).

Once all mappers are done, the "sort and shuffle" phase will be applied to the output of mappers and it will produce input for reducers.

The output of “sort and shuffle” phase will be:

(k1, [("R", r1), ("R", r2), ..., ("S", s1), ("S", s2), ...]

(k2, [("R", r3), ("R", r4), ..., ("S", s3), ("S", s4), ...]

The reducer function is presented below: per key k, we build 2 lists: 

list_R (which will hold R values/attributes)

list_S (which will hold S values/attributes)

then we perform a cartesian product of list_R and list_S to find the join tuples.


#key: a unique key
#values: [(relation, attrs)] where relation in {"R", "S"}
#and  attrs are the relation attributes

map(key, values) {

  list_R = []

  list_S = []

  for (tuple in values) {

    relation = tuple[0]

    attributes = tuple[1]

    if (relation == "R") {

       list_R.append(attributes)

    }

    else {

       list_S.append(attributes)

    }

  }

  if (len(list_R) == 0) OR (len(list_S) == 0) {

     # no common key
     return

  }

  #len(list_R) > 0 AND len(list_S) > 0
  #perform cartesian product of list_R and list_S
  for (r in list_R) {

    for (s in list_S) {

       emit(key, (r, s))

    }
    
  }

}


# Implementation in PySpark

In [5]:
d1 = [('a', 10), ('a', 11), ('a', 12), ('b', 100), ('b', 200), ('c', 80)]
d2 = [('a', 40), ('a', 50), ('b', 300), ('b', 400), ('d', 90)]
T1 = spark.sparkContext.parallelize(d1)
T2 = spark.sparkContext.parallelize(d2)

# Next, we map these RDDs to include the name of the relation:

In [6]:
t1_mapped = T1.map(lambda x: (x[0], ("T1", x[1])))
t2_mapped = T2.map(lambda x: (x[0], ("T2", x[1])))

# Perform a reduction of the same key.

In [10]:
combined  = t1_mapped.union(t2_mapped)

# Perform the groupByKey() transformation on a single combined data set:

In [11]:
grouped  = combined.groupByKey()

# finally perform a cartesian product for the values of each grouped entry:

In [12]:
# entry[0]: key
# entry[1]: values as:
# [("T1", t11), ("T1", t12), ..., ("T2", t21), ("T2", t22), ...]

def cartesian_product(entry):
  T1 = []
  T2 = []
  key = entry[0]
  values = entry[1]
  for tuple in values:
    relation = tuple[0]
    attributes = tuple[1]
    if (relation == "T1"): T1.append(attributes)
    else: T2.append(attributes)
  #end-for

  if (len(T1) == 0) or (len(T2) == 0):
     # no common key
     return []


  # len(T1) > 0 AND len(T2) > 0
  joined_elements = []
  # perform cartesian product of T1 and T2
  for v in T1:
    for w in T2:
       joined_elements.append((key, (v, w)))

    #end-for
  #end-for
  return joined_elements
#end-def

joined = grouped.flatMap(cartesian_product)

# Map-Side Join by RDDs

I introduce a map-side join, which can reduce the cost of join between two tables.

Using this pattern you can completely eliminate the need to shuffle and sort all the data to the reduce phase.

Map-side join is a process where two data sets are joined by the mapper rather than using the actual join function (which can happen by combination of a mapper and reducer)

What are the advantages of using map-side join? The advantages of using map side join are as follows:

1. Map-side join might help in minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.

2. Map-side join might help in improving the performance of the task by decreasing the time to finish the task.


Given two tables A and B, the map-side join will be mostly suitable when table A is large and table B is a small to meduim.

Since table B is not very big, then we create a hash table from B and then broadcast it to all nodes.

Next, we iterate all elements of table A by a mapper and then accessing a broadcasted hash table (which denotes table B).


# Creating RDD


In [3]:
EMP = spark.sparkContext.parallelize(
[
  (10, (1000, 'alex')),
  (10, (2000, 'ted')),
  (20, (3000, 'mat')),
  (20, (4000, 'max')),
  (10, (5000, 'joe'))
])

In [5]:
DEPT= spark.sparkContext.parallelize(
[ (10, ('ACCOUNTING', 'NEW YORK, NY')),
  (20, ('RESEARCH', 'DALLAS, TX')),
  (30, ('SALES', 'CHICAGO, IL')),
  (40, ('OPERATIONS', 'BOSTON, MA')),
  (50, ('MARKETING', 'Sunnyvale, CA')),
  (60, ('SOFTWARE', 'Stanford, CA'))
])

In [6]:
sorted(EMP.join(DEPT).collect())

[(10, ((1000, 'alex'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (10, ((2000, 'ted'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (10, ((5000, 'joe'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (20, ((3000, 'mat'), ('RESEARCH', 'DALLAS, TX'))),
 (20, ((4000, 'max'), ('RESEARCH', 'DALLAS, TX')))]

How will the map-side join optimize the task?

Here, I assume that EMM is a large data set and DEPT is a relatively small dat set. Using map side join, to join EMP with DEPT on dept_id, we will create a brodcast varibale from a small table (this is a custom hash builder from an RDD):

In [9]:
def to_hash_table(dept_as_list):
  hast_table = {}
  for d in dept_as_list:
    dept_id = d[0]
    dept_name_location = d[1]
    hast_table[dept_id] = dept_name_location
  return hast_table

In [10]:
dept_hash_table = to_hash_table(DEPT.collect())

Alternatively, you may build the same hash table by using an Spark action collectAsMap(), which returns the key-value pairs in this RDD (DEPT) to the master as a dictionary.

In [11]:
dept_hash_table = DEPT.collectAsMap()

Now, using pyspark.SparkContext.broadcast, you can broadcast a read-only variable (dept_hash_table) to the Spark cluster, which will be available to all kinds of transformations (including mappers and reducers).

In [12]:
sc = spark.sparkContext
hash_table_broadcasted = sc.broadcast(dept_hash_table)

Next, I show the map side join by using a map() transformation:



In [13]:
def map_side_join(e):
  dept_id = e[0]
  # get hash_table from broadcasted object
  hash_table = hash_table_broadcasted.value
  dept_name_location = hash_table[dept_id]
  return (e, dept_name_location)
  
joined = EMP.map(map_side_join)

Therefore, with map side join, you can avoid shuffling which can cost a lot and you avoid significant network I/O.
With a map-side join, we just used a map() function for each row of the EMP table, and retrieved dimension values (such as dept_name and dept_location) from the broadcasted hash table.

If EMP table is very large, the map() function will be executed concurrently for each partition that has own copy of hash table.
The map side join approach allows us not to shuffle the fact table (i.e., DEPT) and to get quite good join performance.

Advantages of using map-side join
The true join cost can be reduced since we are minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.

Map-side join improves the performance of the join task by decreasing the time to finish the join operation.

Disadvantages of map-side join
Map-side join data pattern is proper when one of the RDDs/tables on which you perform map-side join operation is small enough to fit into the memory.

If both RDDs/tables are not small at all, then map-side join is not suitable to perform map-side join on the RDDs/tables.