### Joins & Higher Order Functions in Spark

In [1]:
import findspark
findspark.init()

In [None]:
## Create a spark Session
### Spark Session is the gateway for creating a spark Program.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("HOFs").getOrCreate()
## * all the cores on the machine local - local environment 
### local[2] - It should use 2 cores of the CPU for running the program
### URL of the master - yarn://<ip-address>:<port>

In [None]:
import os
os.getcwd()

In [5]:
custDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("file:///C:\\Users\\Jayanth\\Python_DataAnalysis\\PySpark-lessons\\datasets\\Customers.csv")
salesDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("file:///C:\\Users\\Jayanth\\Python_DataAnalysis\\PySpark-lessons\\datasets\\Sales.csv")

In [9]:
custDF = custDF.withColumnRenamed('ID','customer_id')

In [10]:
custDF.show(3)

+-----------+----------+---------+------+-------+--------------------+
|customer_id|First Name|Last Name|Gender|Company|          Occupation|
+-----------+----------+---------+------+-------+--------------------+
|          1|    Joseph|  Perkins|  Male|Dynazzy|Community Outreac...|
|          2|  Jennifer|  Alvarez|Female|   DabZ|Senior Quality En...|
|          3|     Roger|    Black|  Male|Tagfeed|   Account Executive|
+-----------+----------+---------+------+-------+--------------------+
only showing top 3 rows



In [11]:
salesDF = salesDF.withColumnRenamed('Customer ID','customer_id')
salesDF.show(3)

+-----------+-------+
|customer_id|Food ID|
+-----------+-------+
|        537|      9|
|         97|      4|
|        658|      1|
+-----------+-------+
only showing top 3 rows



#### BroadCast Join Hint in Spark 2.x and 3.x

- Working : 
   - Broadcast join is famous join for joining small table(dimension table) with big table(fact table) by avoiding costly data shuffling.
- When to use Broadcast Join:
  - Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame.
  - Broadcast joins are easier to run on a cluster.
  - Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster.
  - After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.
  - let’s check the below example

The broadcasted dataset should fit in the driver as well as executor nodes. The driver first gets the dataset from the executor side and then broadcasts the datasets to all the worker nodes where the partitions for the larger dataset are present.</br>
spark.sql.autoBroadcastJoinThreshold (Default Value 10485760(10 MB)</br>
The property is configurable and has a max limit of 8GB.

In [14]:
broadcastJoin = custDF.hint("broadcast").join(salesDF,"customer_id")
broadcastJoin.show(5)

+-----------+----------+---------+------+---------+--------------------+-------+
|customer_id|First Name|Last Name|Gender|  Company|          Occupation|Food ID|
+-----------+----------+---------+------+---------+--------------------+-------+
|        537|    Cheryl|  Carroll|Female| Zoombeat|    Registered Nurse|      9|
|         97|    Amanda|  Watkins|Female|      Ozu| Account Coordinator|      4|
|        658|   Patrick|     Webb|  Male|Browsebug|Community Outreac...|      1|
|        202|     Louis| Campbell|  Male|Rhynoodle|Account Represent...|      2|
|        155|   Carolyn|     Diaz|Female| Gigazoom|Database Administ...|      9|
+-----------+----------+---------+------+---------+--------------------+-------+
only showing top 5 rows



#### Shuffle hash join spark

- Shuffle hash join shuffles the data based on join keys and then perform the join
   - When the table is relatively large, the use of broadcast may cause driver- as well as executor-side memory issues, then shuffle Hash Join is the right choice. 
   - It is an expensive join as it involves both shuffling and hashing. Also, it requires memory and computation for maintaining a hash table.
   - Shuffle Hash Join is performed in two steps :
       - Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions.
       - Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.

It follows the classic map-reduce pattern:

    First it maps through two tables(dataframes)
    Uses the join keys as output key
    Shuffles both dataframes by the output key, So that rows related to same keys from both tables will be moved on to same machine.
    In reducer phase, join the two datasets.

When to use:

Shuffle hash join works well-
1. when the dataframe are distributed evenly with the keys you are used to join and
2. when dataframes has enough number of keys for parallelism.

Shuffle Hash Join is a join where both dataframe are partitioned using same partitioner. Here join keys will fall in the same partitions.

In [15]:
shuffleHashJoin = custDF.hint("shuffle_hash").join(salesDF,"customer_id")
shuffleHashJoin.show(5)

+-----------+----------+---------+------+------------+--------------------+-------+
|customer_id|First Name|Last Name|Gender|     Company|          Occupation|Food ID|
+-----------+----------+---------+------+------------+--------------------+-------+
|        833|   Carolyn|     Dunn|Female|       Yadel|           Recruiter|      7|
|        833|   Carolyn|     Dunn|Female|       Yadel|           Recruiter|      3|
|        148|  Patricia|     Reid|Female|Shuffledrive|Analog Circuit De...|      5|
|        737|  Patricia|      Cox|Female|     Camimbo| Geological Engineer|      3|
|        540|   Jessica|    Hicks|Female|     Cogibox|Structural Analys...|      3|
+-----------+----------+---------+------+------------+--------------------+-------+
only showing top 5 rows



#### SortMerge Join

- Sort merge join perform the sort operation first and then merges the datasets
   - Shuffle Sort-merge Join (SMJ) involves shuffling of data to get the same Join key with the same worker, and then performing the Sort-merge Join operation at the partition level in the worker nodes.
   - This is Spark’s default join strategy, Since Spark 2.3 the default value of spark.sql.join.preferSortMergeJoin has been changed to true.

It has 3 phases:

    Shuffle Phase: Both large tables will be repartitioned as per the Join keys across the partitions in the cluster.
    Sort Phase: Sort the data within each partition parallelly.
    Merge Phase: Join the sorted and partitioned data. It is merging the dataset by iterating over the elements and joining the rows having the same value for the Join keys.

#### Cartesian Product

Cartesian Product is one type of join where two dataframe are joined using all rows.

This join can be forced using shuffle_replicate_nl hint.

In [18]:
cartesianProduct = custDF.hint("shuffle_replicate_nl").join(salesDF)

In [19]:
cartesianProduct.count()

250000

#### Adaptive Query Execution
- AQE takes care of the number of partitions and the number of broadcast join it makes
- Spark checks the stage statistics and determines if there are any Skew joins and optimizes it by splitting the bigger partitions into smaller (matching partition size on other table/dataframe).

In [None]:
### run the code before AQE
df1 = custDF.groupBy("company").count()
df1.rdd.getNumPartitions()

In [21]:
spark.conf.set("spark.sql.adaptive.enabled",True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled",True)

In [22]:
df1 = custDF.groupBy("company").count()
df1.rdd.getNumPartitions()

1

#### Higher Order Functionsm

**transform**

Returns an array of elements after applying a transformation to each element in the input array.

   - col: name of column or expression
   - f: a function that is applied to each element of the input array.
     Can take one of the following forms:
     1. Unary `(x: Column) -> Column: …`
     2. Binary `(x: Column, i: Column) -> Column…`, where the second argument is a 0-based index of the element.



In [29]:
from pyspark.sql.functions import transform,when

In [26]:
df = spark.createDataFrame([(1, [1, 2, 3, 4])], ("key", "values"))
df.select(transform("values", lambda x: x * 2).alias("doubled")).show()

+------------+
|     doubled|
+------------+
|[2, 4, 6, 8]|
+------------+



In [30]:
def alternate(x, i):

    return when(i % 2 == 0, x).otherwise(-x)

In [31]:
df.select(transform("values", alternate).alias("alternated")).show()

+--------------+
|    alternated|
+--------------+
|[1, -2, 3, -4]|
+--------------+



- Example 3

In [32]:
spark.sql("select array(1,2,3) as orig_array,transform(array(1,2,3),x->x+1) as new_array").show() 

+----------+---------+
|orig_array|new_array|
+----------+---------+
| [1, 2, 3]|[2, 3, 4]|
+----------+---------+



- Example 4

In [34]:
arrayData = [(3,[1,2,3]),(4,[5,6,7,8])] 

df = spark.createDataFrame(data=arrayData, schema = ['col1','col2']) 

df.printSchema() 

df.createOrReplaceTempView("mytesttable") 

spark.sql("select * from mytesttable").show() 


root
 |-- col1: long (nullable = true)
 |-- col2: array (nullable = true)
 |    |-- element: long (containsNull = true)

+----+------------+
|col1|        col2|
+----+------------+
|   3|   [1, 2, 3]|
|   4|[5, 6, 7, 8]|
+----+------------+



- example 5

In [39]:
spark.sql("select col1 ,col2 as orig_array,transform(filter(col2,(x,i)->i=2),x->x+5) as new_array from mytesttable").show(truncate=False) 

+----+------------+---------+
|col1|orig_array  |new_array|
+----+------------+---------+
|3   |[1, 2, 3]   |[8]      |
|4   |[5, 6, 7, 8]|[12]     |
+----+------------+---------+



- example 6

In [40]:
spark.sql("select col1,col2 as orig_array,transform(col2,(x,i)->x+col1) as new_array from mytesttable").show(truncate=False) 

+----+------------+---------------+
|col1|orig_array  |new_array      |
+----+------------+---------------+
|3   |[1, 2, 3]   |[4, 5, 6]      |
|4   |[5, 6, 7, 8]|[9, 10, 11, 12]|
+----+------------+---------------+



In [41]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

                                                     
ws_schema = StructType() \
.add("ws_id", StringType()) \
.add("source", MapType(StringType(), StructType() \
.add("description", StringType()) \
.add("id", IntegerType()) \
.add("temp", ArrayType(IntegerType()))))


In [46]:
sc = spark.sparkContext

In [47]:
def convertJson(json,schema):
    reader = spark.read.schema(schema)

    return reader.json(sc.parallelize([json]))


In [48]:
ws_DF = convertJson( """{


    "ws_id": "ws_southeast",
    "source": {
        "weather-station": {
        "id": 1,
        "description": "Weather station located at heathrow Airport",                                          
        "temp":[15,16,17,20,22,25,22,20,19,18,17,16]            
      },
      "thermostat": {
        "id": 5,
        "description": "Thermometer reading at Kew Gardens",
        "temp": [15,15,15,18,21,22,21,21,20,18,17,17]
      }
    }
  }""", ws_schema)




DataFrame[ws_id: string, source: map<string,struct<description:string,id:int,temp:array<int>>>]

In [51]:
display(ws_DF)
ws_DF.show(truncate=False)

DataFrame[ws_id: string, source: map<string,struct<description:string,id:int,temp:array<int>>>]

+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ws_id       |source                                                                                                                                                                                                                          |
+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ws_southeast|{weather-station -> {Weather station located at heathrow Airport, 1, [15, 16, 17, 20, 22, 25, 22, 20, 19, 18, 17, 16]}, thermostat -> {Thermometer reading at Kew Gardens, 5, [15, 15, 15, 18, 21, 22, 21, 21, 20, 18, 17, 17]}}|
+------------+--------------------------

In [52]:
exploded_DF = ws_DF.select("ws_id", explode("source") )
exploded_DF.show(truncate=False)

+------------+---------------+--------------------------------------------------------------------------------------------------+
|ws_id       |key            |value                                                                                             |
+------------+---------------+--------------------------------------------------------------------------------------------------+
|ws_southeast|weather-station|{Weather station located at heathrow Airport, 1, [15, 16, 17, 20, 22, 25, 22, 20, 19, 18, 17, 16]}|
|ws_southeast|thermostat     |{Thermometer reading at Kew Gardens, 5, [15, 15, 15, 18, 21, 22, 21, 21, 20, 18, 17, 17]}         |
+------------+---------------+--------------------------------------------------------------------------------------------------+



In [53]:
tempDataDF = exploded_DF.select("ws_id","key", col("value.id").alias("id"),"value.temp")
tempDataDF.createOrReplaceTempView("temp_table")

spark.sql("select * from temp_table").show(truncate=False)  

+------------+---------------+---+------------------------------------------------+
|ws_id       |key            |id |temp                                            |
+------------+---------------+---+------------------------------------------------+
|ws_southeast|weather-station|1  |[15, 16, 17, 20, 22, 25, 22, 20, 19, 18, 17, 16]|
|ws_southeast|thermostat     |5  |[15, 15, 15, 18, 21, 22, 21, 21, 20, 18, 17, 17]|
+------------+---------------+---+------------------------------------------------+



- exists - Returns whether a predicate holds for one or more elements in the array.

In [54]:
from pyspark.sql.functions import exists

In [55]:
df = spark.createDataFrame([(1, [1, 2, 3, 4]), (2, [3, -1, 0])],("key", "values"))
df.select(exists("values", lambda x: x < 0).alias("any_negative")).show()

+------------+
|any_negative|
+------------+
|       false|
|        true|
+------------+



- forall- Returns whether a predicate holds for every element in the array.

In [56]:
df = spark.createDataFrame(
    [(1, ["bar"]), (2, ["foo", "bar"]), (3, ["foobar", "foo"])],
    ("key", "values")
)
df.select(forall("values", lambda x: x.rlike("foo")).alias("all_foo")).show()

+-------+
|all_foo|
+-------+
|  false|
|  false|
|   true|
+-------+

