In [1]:
from __future__ import print_function
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Create Dataframes").getOrCreate()

In [4]:
from collections import namedtuple

user_row = namedtuple('user_row', 'dob age is_fan'.split())
data = [
    user_row('1990-05-03', 29, True),
    user_row('1994-09-23', 25, False)
]

user_df = spark.createDataFrame(data)
user_df.printSchema()

root
 |-- dob: string (nullable = true)
 |-- age: long (nullable = true)
 |-- is_fan: boolean (nullable = true)



In [25]:
from collections import namedtuple
UserData = namedtuple('UserData',['user_id','age']) 
user_data = [
    UserData('user_1', 30),
    UserData('user_2', 34),
    UserData('user_4', 36)
]
userData_df = spark.createDataFrame(user_data)
userData_df.printSchema()
userData_df.show()

root
 |-- user_id: string (nullable = true)
 |-- age: long (nullable = true)

+-------+---+
|user_id|age|
+-------+---+
| user_1| 30|
| user_2| 34|
| user_4| 36|
+-------+---+



In [26]:
TransactionData = namedtuple('TransactionData',['user_id','amount']) 
transaction_data = [
    TransactionData('user_1', 3000),
    TransactionData('user_2', 5000),
    TransactionData('user_3', 10000)
]
transactionData_df = spark.createDataFrame(transaction_data)
transactionData_df.printSchema()
transactionData_df.show()

root
 |-- user_id: string (nullable = true)
 |-- amount: long (nullable = true)

+-------+------+
|user_id|amount|
+-------+------+
| user_1|  3000|
| user_2|  5000|
| user_3| 10000|
+-------+------+



In [27]:
user_transaction_df = userData_df.join(transactionData_df, userData_df.user_id == transactionData_df.user_id)

userData will stay on the current executors, but the transactionData needs to be send to those executors according to joining column causing shuffle

In [28]:
user_transaction_df.explain()

== Physical Plan ==
*(5) SortMergeJoin [user_id#150], [user_id#163], Inner
:- *(2) Sort [user_id#150 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(user_id#150, 200), true, [id=#388]
:     +- *(1) Filter isnotnull(user_id#150)
:        +- *(1) Scan ExistingRDD[user_id#150,age#151L]
+- *(4) Sort [user_id#163 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(user_id#163, 200), true, [id=#394]
      +- *(3) Filter isnotnull(user_id#163)
         +- *(3) Scan ExistingRDD[user_id#163,amount#164L]




In [29]:
import datetime; 
start = datetime.datetime.now().timestamp() 
user_transaction_df.show(truncate=False)
end = datetime.datetime.now().timestamp() 

print(end-start)

+-------+---+-------+------+
|user_id|age|user_id|amount|
+-------+---+-------+------+
|user_1 |30 |user_1 |3000  |
|user_2 |34 |user_2 |5000  |
+-------+---+-------+------+

7.367485046386719


In [30]:
userData_df1 = userData_df.repartition('user_id')
transactionData_df1 = transactionData_df.repartition('user_id')

In [31]:
userData_df1.explain()

== Physical Plan ==
Exchange hashpartitioning(user_id#150, 200), false, [id=#459]
+- *(1) Scan ExistingRDD[user_id#150,age#151L]




In [32]:
transactionData_df1.explain()

== Physical Plan ==
Exchange hashpartitioning(user_id#163, 200), false, [id=#468]
+- *(1) Scan ExistingRDD[user_id#163,amount#164L]




In [33]:
user_transaction_df1 = userData_df1.join(transactionData_df1, userData_df1.user_id == transactionData_df1.user_id)

In [34]:
user_transaction_df1.explain()

== Physical Plan ==
*(5) SortMergeJoin [user_id#150], [user_id#163], Inner
:- *(2) Sort [user_id#150 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(user_id#150, 200), false, [id=#501]
:     +- *(1) Filter isnotnull(user_id#150)
:        +- *(1) Scan ExistingRDD[user_id#150,age#151L]
+- *(4) Sort [user_id#163 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(user_id#163, 200), false, [id=#507]
      +- *(3) Filter isnotnull(user_id#163)
         +- *(3) Scan ExistingRDD[user_id#163,amount#164L]




In [35]:
start = datetime.datetime.now().timestamp() 
user_transaction_df1.show(truncate=False)
end = datetime.datetime.now().timestamp() 

print(end-start)

+-------+---+-------+------+
|user_id|age|user_id|amount|
+-------+---+-------+------+
|user_1 |30 |user_1 |3000  |
|user_2 |34 |user_2 |5000  |
+-------+---+-------+------+

7.313449144363403
