# Join of Spark Dataframes: Study and Optimization

In this notebook, we are going to study how to optimize one of the most common operations in Data Science: inner join of two tables



## Data Generator

In [1]:
import org.apache.spark.sql.SparkSession

In [2]:
val spark = SparkSession.builder.appName("JoinExample").master("spark://spark-master:7077").getOrCreate()
val sc = spark.sparkContext

spark = org.apache.spark.sql.SparkSession@68e0a774
sc = org.apache.spark.SparkContext@df72a2d


In [3]:
import spark.implicits._

In [4]:
println(spark)

org.apache.spark.sql.SparkSession@68e0a774


In [5]:
import org.apache.spark.sql.types.{StructField, StringType, IntegerType, StructType, DoubleType}
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.Row

object DataFrameGenerator extends Serializable{
    
    val r = scala.util.Random
    
    def getRandomInt(maxValue: Int) = {
        r.nextInt(maxValue)
    }
    
    def getRandomDouble(maxValue: Float) = {
        r.nextDouble()*maxValue
        
    }
    
    def getIdsRdd(numObs: Int) = {
        sc.parallelize((for(id <- 1 to numObs) yield id).toSeq)
    }
    
    def getPairIntDf(numObs: Int, maxValue: Int, columnName: String) = {
        
        val schema = new StructType(Array(StructField("id", IntegerType, true),
                                          StructField(columnName, IntegerType, true)))
        
        val rddRows = getIdsRdd(numObs).map(id => Row(id, getRandomInt(maxValue)))
        
        spark.createDataFrame(rowRDD = rddRows, schema = schema).orderBy(rand())
        
    }
    
    def getPairDoubleDf(numObs: Int, maxValue: Float, columnName: String) = {
        
        val schema = new StructType(Array(StructField("id", IntegerType, true),
                                          StructField(columnName, DoubleType, true)))
        
        val rddRows = getIdsRdd(numObs).map(id => Row(id, getRandomDouble(maxValue)))
        
        spark.createDataFrame(rowRDD = rddRows, schema = schema).orderBy(rand())
        
    }
    
}

defined object DataFrameGenerator


In [6]:
DataFrameGenerator.getPairIntDf(100, 75, "age").show()

+---+---+
| id|age|
+---+---+
|  3| 20|
| 54| 17|
| 14| 22|
| 35| 21|
| 93| 31|
| 26|  1|
| 15| 42|
| 51| 30|
| 92|  2|
| 87| 38|
| 81|  5|
| 13| 34|
|  5| 36|
| 86|  0|
| 78| 23|
| 32| 10|
|  7| 65|
| 46| 74|
| 25| 51|
| 22| 23|
+---+---+
only showing top 20 rows



In [7]:
DataFrameGenerator.getPairDoubleDf(100, 125, "balance").show()

+---+------------------+
| id|           balance|
+---+------------------+
| 84| 66.62744825515291|
| 20| 83.92155102965894|
|  3| 40.71240944885693|
|100|119.78321397507078|
| 69|  4.88308752545051|
| 88| 15.48028558840793|
| 22|15.929961755244928|
| 40|116.11776635989675|
| 32| 92.46336330395428|
| 11|28.828790918351956|
| 31|106.12498589051039|
| 64| 66.21497372059905|
| 41|114.94860346168521|
| 93| 32.09598493807166|
| 54|3.4428875504493166|
| 30|  39.3428290650875|
| 43| 46.13014463237125|
| 36| 89.44160888039372|
| 25|16.425128380903626|
| 77|25.073698534942054|
+---+------------------+
only showing top 20 rows



## Inner Join Mode 1: No Partitioning

In [11]:
val numObsMode1 = 250000
val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age")
val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1, 125, "balance").select("balance", "id")
val iniTime = System.currentTimeMillis()/1000.0
val dfJoined = df1.join(df2, Seq("id"), "inner")
dfJoined.count()
val computationTimeMode1 = System.currentTimeMillis()/1000.0 - iniTime

numObsMode1 = 250000
df1 = [id: int, age: int]
df2 = [balance: double, id: int]
iniTime = 1.532382462584E9
dfJoined = [id: int, age: int ... 1 more field]
computationTimeMode1 = 7.1579999923706055


7.1579999923706055

In [12]:
println("Total computational time (s) - Inner Join Mode 1: No Partitioning - No Cache: " + computationTimeMode1)

Total computational time (s) - Inner Join Mode 1: No Partitioning - No Cache: 7.1579999923706055


We can check the number of partitions

In [13]:
dfJoined.rdd.getNumPartitions

200

In [14]:
dfJoined.rdd.partitioner

None

## Inner Join Mode 2: Tunning Number of Partitions

In [20]:
def getExeTimeNumPartitions(numPartitions: Int) = {
    val numObsMode1 = 2500000
    val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age").repartition(numPartitions)
    val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1, 125, "balance").select("balance", "id").repartition(numPartitions)
    val iniTime = System.currentTimeMillis()/1000.0
    val dfJoined = df1.join(df2, Seq("id"), "inner")
    dfJoined.count()
    System.currentTimeMillis()/1000.0 - iniTime 
}

getExeTimeNumPartitions: (numPartitions: Int)Double


In [21]:
val timeMap = Map()

timeMap = Map()


Map()

In [22]:
val timeMap = (for(time <- Array(1,2,4,8,16,32,64,128,256,512,1024)) yield time -> getExeTimeNumPartitions(time)).toMap

timeMap = Map(1024 -> 50.712000131607056, 1 -> 5.406999826431274, 512 -> 27.753999948501587, 256 -> 18.930999994277954, 128 -> 14.734999895095825, 2 -> 7.053999900817871, 32 -> 7.3450000286102295, 64 -> 7.319000005722046, 16 -> 7.259999990463257, 8 -> 7.228000164031982, 4 -> 6.940999984741211)


Map(1024 -> 50.712000131607056, 1 -> 5.406999826431274, 512 -> 27.753999948501587, 256 -> 18.930999994277954, 128 -> 14.734999895095825, 2 -> 7.053999900817871, 32 -> 7.3450000286102295, 64 -> 7.319000005722046, 16 -> 7.259999990463257, 8 -> 7.228000164031982, 4 -> 6.940999984741211)

In [23]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, DoubleType, StructType, StructField}

val schema = new StructType(Array(StructField("numPartitions", IntegerType, true), 
                                  StructField("time", DoubleType, true)))

val timeDf = spark.createDataFrame(sc.parallelize(timeMap.toSeq).map{case(key, value) => Row(key,value)}, schema)

schema = StructType(StructField(numPartitions,IntegerType,true), StructField(time,DoubleType,true))
timeDf = [numPartitions: int, time: double]


[numPartitions: int, time: double]

In [24]:
timeDf.orderBy("time").show()

+-------------+------------------+
|numPartitions|              time|
+-------------+------------------+
|            1| 5.406999826431274|
|            4| 6.940999984741211|
|            2| 7.053999900817871|
|            8| 7.228000164031982|
|           16| 7.259999990463257|
|           64| 7.319000005722046|
|           32|7.3450000286102295|
|          128|14.734999895095825|
|          256|18.930999994277954|
|          512|27.753999948501587|
|         1024|50.712000131607056|
+-------------+------------------+



numPartitions = numWorkerNodes * numCpuCoresPerWorker

## Inner Join Mode 3: Pre - Partitioning

In [None]:
val numObsMode1 = 500000
val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age")
val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1, 125, "balance").select("balance", "id")
df1.unpersist()
df2.unpersist()

In [None]:
val iniTime = System.currentTimeMillis()/1000.0
val df1Par = df1.repartition($"id")
val df2Par = df2.repartition($"id")
val partitionTimeMode2 = System.currentTimeMillis()/1000.0 - iniTime

In [None]:
val iniTime = System.currentTimeMillis()/1000.0
val numObsMode1 = 500000
val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age")
val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1, 125, "balance").select("balance", "id")
df1.unpersist()
df2.unpersist()
val df1Par = df1.repartition($"id")
val df2Par = df2.repartition($"id")
df1Par.join(df2Par, Seq("id"), "inner").count()
val joinTimeMode2 = System.currentTimeMillis()/1000.0 - iniTime

In [None]:
df1Par.write.format("parquet").mode("overwrite").save("hdfs://hadoop:9000/data/df1Par.parquet")
df2Par.write.format("parquet").mode("overwrite").save("hdfs://hadoop:9000/data/df2Par.parquet")

In [None]:
val iniTime = System.currentTimeMillis()/1000.0
val df1ParLoaded = spark.read.format("parquet").load("hdfs://hadoop:9000/data/df1Par.parquet")
val df2ParLoaded = spark.read.format("parquet").load("hdfs://hadoop:9000/data/df2Par.parquet")
df1ParLoaded.join(df2ParLoaded, Seq("id"), "inner").count()
val joinTimeMode3 = System.currentTimeMillis()/1000.0 - iniTime

## Inner Join Mode 4: Broadcast Join

In [27]:
val numObsMode1 = 250000
val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age")
val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1/10000, 125, "balance").select("balance", "id")
val iniTime = System.currentTimeMillis()/1000.0
val dfJoined = df1.join(df2, Seq("id"), "inner")
dfJoined.count()
val computationTimeMode1 = System.currentTimeMillis()/1000.0 - iniTime

numObsMode1 = 250000
df1 = [id: int, age: int]
df2 = [balance: double, id: int]
iniTime = 1.532382278788E9
dfJoined = [id: int, age: int ... 1 more field]
computationTimeMode1 = 3.435999870300293


3.435999870300293

In [28]:
import org.apache.spark.sql.functions.broadcast

In [29]:
val numObsMode1 = 250000
val df1 = DataFrameGenerator.getPairIntDf(numObsMode1, 75, "age")
val df2 = DataFrameGenerator.getPairDoubleDf(numObsMode1/10000, 125, "balance").select("balance", "id")
val iniTime = System.currentTimeMillis()/1000.0
val dfJoined = df1.join(broadcast(df2), Seq("id"), "inner")
dfJoined.count()
val computationTimeMode1 = System.currentTimeMillis()/1000.0 - iniTime

numObsMode1 = 250000
df1 = [id: int, age: int]
df2 = [balance: double, id: int]
iniTime = 1.532382286528E9
dfJoined = [id: int, age: int ... 1 more field]
computationTimeMode1 = 1.8939998149871826


1.8939998149871826