In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, customer_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
orders_df.rdd.getNumPartitions() 

9

The groupBy dataframe will be saving in HDFS

In [5]:
orders_df.groupBy("order_status").count().write.format("csv").mode("overwrite").save("output101") 

If i don't want write the file in the disk but i want the entire flow for testing purpose

noop = no operation 

In [6]:
orders_df.groupBy("order_status").count().write.format("noop").mode("overwrite").save() 

Plz, Check the result in the UI

Join

In [7]:
customer_schema = "customerid long , customer_fname string , customer_lname string , user_name string,password string , address string, city string, state string, pincode long "

In [8]:
customers_df = spark.read \
.format("csv") \
.schema(customer_schema) \
.load("/public/trendytech/retail_db/customers")

In [9]:
customers_df.show()

+----------+--------------+--------------+---------+---------+--------------------+-------------+-----+-------+
|customerid|customer_fname|customer_lname|user_name| password|             address|         city|state|pincode|
+----------+--------------+--------------+---------+---------+--------------------+-------------+-----+-------+
|         1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|  Brownsville|   TX|  78521|
|         2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|    Littleton|   CO|  80126|
|         3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|       Caguas|   PR|    725|
|         4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common|   San Marcos|   CA|  92069|
|         5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|       Caguas|   PR|    725|
|         6|          Mary|         Smith|XXXXXXXXX|XXXXXXXXX|3151 Sleepy Quail...|      Passaic|   NJ| 

In [10]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [11]:
spark.conf.get('spark.sql.autoBroadcastJoinThreshold')

'10485760b'

In [12]:
10485760 / (1024 * 1024) 

10.0

Disable the BroadcastJoinThreshold even one of table is smaller

In [13]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '-1') 

Enable the broadcast join

In [None]:
spark.conf.set('spark.sql.autoBroadcastJoinThreshold', '10485760b') 

In [14]:
orders_df.join(customers_df, orders_df.customer_id == customers_df.customerid, "inner").show() 

+--------+--------------------+-----------+------------+----------+--------------+--------------+---------+---------+--------------------+--------+-----+-------+
|order_id|          order_date|customer_id|order_status|customerid|customer_fname|customer_lname|user_name| password|             address|    city|state|pincode|
+--------+--------------------+-----------+------------+----------+--------------+--------------+---------+---------+--------------------+--------+-----+-------+
|   64185|2014-03-28 00:00:...|         26|     PENDING|        26|        Johnny|          Hood|XXXXXXXXX|XXXXXXXXX|9576 Middle Hills...|Glenview|   IL|  60025|
|   21730|2013-12-05 00:00:...|         26|      CLOSED|        26|        Johnny|          Hood|XXXXXXXXX|XXXXXXXXX|9576 Middle Hills...|Glenview|   IL|  60025|
|   26441|2014-01-05 00:00:...|         26|    COMPLETE|        26|        Johnny|          Hood|XXXXXXXXX|XXXXXXXXX|9576 Middle Hills...|Glenview|   IL|  60025|
|   30480|2014-01-30 00:00:.

Inner Join

In [15]:
orders_df.join(customers_df, orders_df.customer_id == customers_df.customerid, "inner").write.format("noop").mode("overwrite").save()

Right Join

In [16]:
orders_df.join(customers_df, orders_df.customer_id == customers_df.customerid, "right").write.format("noop").mode("overwrite").save()

Left Join

In [17]:
orders_df.join(customers_df, orders_df.customer_id == customers_df.customerid, "left").write.format("noop").mode("overwrite").save()

In [18]:
customers_df.join(orders_df, orders_df.customer_id == customers_df.customerid, "left").write.format("noop").mode("overwrite").save()

BroadCast Join

If want to broadcast join explicitly

In [19]:
from pyspark.sql.functions import *

In [2]:
customers_df.join(broadcast(orders_df), orders_df.customer_id == customers_df.customerid, "left").write.format("noop").mode("overwrite").save()