# Join optimization

In this notebook we will cover examples of inefficient joins in Apache Spark and how they can be optimized. The difference can be pretty drastic - from waiting _days_ for the job to finish to _minutes_. Most often this involves a "trick" of splitting the processing or salting part of the data. However, as you get to know _what is Spark actually doing_ those steps will become pretty obvious and logical.

### Housekeeping

Run below cells to import necessary packages and mount the data.

In [0]:
import pyspark.sql.functions as pysparkf

def write_display_parquet(df, path): 
    df.write.mode("overwrite").parquet(path)
    display(spark.read.parquet(path))

In [None]:
s3_bucket = "us-west-2-eternalsh-com-workshop-spark"
access_key = ""
secret_key = ""

current_mounts = [mount.mountPoint for mount in dbutils.fs.mounts()]
expected_mounts = ["invoices", "invoices-items", "stores"]

secret_key = secret_key.replace("+", "%2B")

for mount in expected_mounts:
    mount_path = f"/mnt/{mount}"

    if mount_path in current_mounts:    
        dbutils.fs.unmount(mount_path)

    dbutils.fs.mount(f"s3a://{access_key}:{secret_key}@{s3_bucket}/{mount}/", mount_path)

display(dbutils.fs.mounts())

In [0]:
invoices       = spark.read.option("header", True).csv("/mnt/invoices/")
invoices_items = spark.read.option("header", True).csv("/mnt/invoices-items/10000000/")
stores         = spark.read.option("header", True).csv("/mnt/stores/")

## Complex join predicate
Our first example will cover joins with non-trivial predicate - one with anything beyond AND clauses and equality operations. Those may happen from time to time in your data pipelines.

### Our example
Our pipeline is joining _invoices_ with _stores_ to enrich data with stores' metadata. Some invoices are not generated in any of the stores, but online - we do not have any "store" for them, but also do not want to lose them. Doing a simple right outer join is also not an option - to ensure data quality we want to drop all not-null and not matched invoices. The current approach is very SQL-like: we specify join with condition described above:

In [0]:
write_display_parquet(invoices
    .join(stores, (invoices.store_id == stores.store_id) | invoices.store_id.isNull())
    .drop(invoices.store_id)
, "/FileStore/joins-conditions/join-condition-or")


### NestedLoopJoin is slow
This job takes a massive amount of time to finish (it does not complete in over an hour), despite using broadcast. The main reason the the _NestedLoopJoin_, the slowest joining algorithm - every row from each partition is compared to each row from the broadcasted dataset. This algorithm is the slowest, but most flexible (in terms of available join conditions) join. 

### Other join strategies are faster
If the NestedLoopJoin is so slow, how can we persuade Spark to use more efficient strategy? The deiciding factor is the join condition. In the previous case, the join had OR clause, making this step not eligible for HashJoin or ShuffleSortMergeJoin. Other reasons to pick the slowest algorithm may be using less / greater operators or UDFs within the condition. 

### Kicking the OR outside the predicate
In this particular case we can kick out the OR clause by doing an _union_ on the joined (without nulls) dataset and null-stores dataset. Despite reading the invoices data twice this implementation works way faster:

In [0]:
write_display_parquet(invoices
    .join(stores, "store_id")
    .unionByName(invoices
        .filter(invoices.store_id.isNull())
        .withColumns({ 
            "name": pysparkf.lit(None), 
            "location_lat": pysparkf.lit(None), 
            "location_long": pysparkf.lit(None)
        })
    )
, "/FileStore/joins-conditions/join-condition-union")

store_id,invoice_id,client_id,chain_id,issue_date,name,location_lat,location_long
50528,2000000,230,2,2019-03-14,gtvrnqkc,-42.31509855380381,-176.4245706939443
1639,2000001,28,0,2019-03-14,nefgubph,-9.566980663688597,-11.912545435725631
48,2000002,-1,3,2019-03-14,lcxqeofp,-4.838814593315021,129.75379749615223
62447,2000003,69,2,2019-03-14,pfehgnws,34.22950093529241,-82.66108748415616
24602,2000004,-1,7,2019-03-14,glnqjtmr,78.34053378002955,-151.79615751067607
68358,2000005,29,4,2019-03-14,pazkvmrc,49.35159034529056,145.93012125135527
12398,2000006,-1,-1,2019-03-14,kwporsix,57.724454984591176,-39.42519116148628
90899,2000007,-1,-1,2019-03-14,uxvhzeat,-15.090105069161892,-24.77821246143995
28334,2000008,150,2,2019-03-14,ofmtyagk,3.997061011470621,-21.6762818126017
90045,2000009,-1,-1,2019-03-14,izpvtlgh,-88.28651174981053,5.864864860220365


This time the whole job took ~ 3 minutes to finish. The biggest difference we see is the _BroadcastHashJoin_ instead of full cross product. 


### Is it the only option? Nope.

There is a number of solutions that we can apply here - for example:
1. Filter out all invoices with incorrect _store_id_ with anti-join, then do right outer join.
2. Append a placeholder for online (ones with null _store_id_) invoices and do full inner join.

All of those allow you to escape from the OR clause and non-equality comparator traps - it is up to you to decide how to solve this puzzle. The key point here is that you understand:
1. How to identify slow join strategy?
2. What can be done to implement the logic efficiently?

# Skewed data in the join