In [1]:
# Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Optimizing Skewness and Spillage")
    #.master("spark://197e20b418a6:7077")
    .config("spark.cores.max", 8)
    .config("spark.executor.cores", 4)
    .config("spark.executor.memory", "512M")
    .getOrCreate()
)

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/04 22:14:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/02/04 22:14:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/02/04 22:14:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/02/04 22:14:02 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [45]:
from pyspark.sql.types import  StructType,StructField,StringType,DoubleType, IntegerType,TimestampType, _parse_datatype_string, _parse_datatype_json_string
import pyspark.sql.functions as F

In [2]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [3]:
# Read Employee data
#_schema = "first_name string, last_name string, job_title string, dob string, email string, phone string, salary double, department_id int"
_schema ="employee_id int,first_name string ,last_name string,email string,phone string ,hire_date Timestamp,job_id int ,salary double, commition_cp string, manager_id int, departement_id int"

emp = spark.read.format("csv").schema(_schema).option("header", False).load("file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/employees.csv")

In [4]:


emp=emp.filter(emp.departement_id.isNotNull())
#emp.groupBy("departement_id").agg(F.count(F.lit(1))).show()

In [5]:
# Read DEPT CSV data
_dept_schema = "departement_id int, departement_name string,manager_id int ,location_id int"
dept = spark.read.format("csv").schema(_dept_schema).option("header", True).load("file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv")

In [6]:
# Join Datasets

df_joined = emp.join(dept, on=emp.departement_id==dept.departement_id, how="left_outer")

In [7]:
df_joined.write.format("noop").mode("overwrite").save()

24/02/04 22:14:29 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID, DEPARTMENT_NAME, MANAGER_ID, LOCATION_ID
 Schema: departement_id, departement_name, manager_id, location_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
                                                                                

In [8]:
#Explain Plan

df_joined.explain()

== Physical Plan ==
*(5) SortMergeJoin [departement_id#10], [departement_id#23], LeftOuter
:- *(2) Sort [departement_id#10 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(departement_id#10, 200), ENSURE_REQUIREMENTS, [plan_id=94]
:     +- *(1) Filter isnotnull(departement_id#10)
:        +- FileScan csv [employee_id#0,first_name#1,last_name#2,email#3,phone#4,hire_date#5,job_id#6,salary#7,commition_cp#8,manager_id#9,departement_id#10] Batched: false, DataFilters: [isnotnull(departement_id#10)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/p..., PartitionFilters: [], PushedFilters: [IsNotNull(departement_id)], ReadSchema: struct<employee_id:int,first_name:string,last_name:string,email:string,phone:string,hire_date:tim...
+- *(4) Sort [departement_id#23 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(departement_id#23, 200), ENSURE_REQUIREMENTS, [plan_id=102]
      +- *(3) Filter isnotnull(depa

In [9]:
# Check the partition details to understand distribution
from pyspark.sql.functions import spark_partition_id, count, lit

part_df = df_joined.withColumn("partition_num", spark_partition_id()).groupBy("partition_num").agg(count(lit(1)).alias("count"))

part_df.show()

24/02/04 22:18:49 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID
 Schema: departement_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
                                                                                

+-------------+-----+
|partition_num|count|
+-------------+-----+
|           53|    3|
|          103|    6|
|          122|    1|
|          164|    2|
|          136|    1|
|          181|    6|
|          150|    8|
|          124|   23|
+-------------+-----+



In [11]:
# Verify Employee data based on department_id
from pyspark.sql.functions import count, lit, desc, col

emp.groupBy("departement_id").agg(count(lit(1))).show()

+--------------+--------+
|departement_id|count(1)|
+--------------+--------+
|            20|       2|
|            40|       1|
|           100|       6|
|            10|       1|
|            50|      23|
|            70|       1|
|            90|       3|
|            60|       5|
|           110|       2|
|            30|       6|
+--------------+--------+



In [12]:
# Set shuffle partitions to a lesser number - 16

spark.conf.set("spark.sql.shuffle.partitions", 32)

In [55]:
# Let prepare the salt
import random
from pyspark.sql.functions import udf

# UDF to return a random number every time and add to Employee as salt
@udf
def salt_udf():
    return random.randint(0, 32)

# Salt Data Frame to add to department
salt_df = spark.range(0, 32)

#salt_df.withColumn("random", F.round(F.rand()*(32),0).cast(IntegerType())).show()

In [57]:
# Salted Employee
from pyspark.sql.functions import lit, concat
#F.round(F.rand()*(32),0).cast(IntegerType())
salted_emp = emp.withColumn("salted_dept_id", concat("departement_id", lit("_"), salt_udf()))

salted_emp.show()                                                     

+-----------+----------+---------+--------+------------+---------+------+-------+------------+----------+--------------+--------------+
|employee_id|first_name|last_name|   email|       phone|hire_date|job_id| salary|commition_cp|manager_id|departement_id|salted_dept_id|
+-----------+----------+---------+--------+------------+---------+------+-------+------------+----------+--------------+--------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|     NULL|  NULL| 2600.0|          - |       124|            50|         50_19|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|     NULL|  NULL| 2600.0|          - |       124|            50|         50_22|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|     NULL|  NULL| 4400.0|          - |       101|            10|         10_23|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|     NULL|  NULL|13000.0|          - |       100|            20|          20_5|
|        202|       Pat|      Fay|    PFAY|603.1

In [60]:
# Salted Department

salted_dept = dept.join(salt_df, how="cross").withColumn("salted_dept_id", concat("departement_id", lit("_"), "id"))

#salted_dept.where("departement_id = 50").show()
salted_dept.count()

864

In [49]:
# Lets make the salted join now
salted_joined_df = salted_emp.join(salted_dept, on=salted_emp.salted_dept_id==salted_dept.salted_dept_id, how="left_outer")


In [50]:
salted_joined_df.write.format("noop").mode("overwrite").save()

24/02/05 10:09:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID, DEPARTMENT_NAME, MANAGER_ID, LOCATION_ID
 Schema: departement_id, departement_name, manager_id, location_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
24/02/05 10:09:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID, DEPARTMENT_NAME, MANAGER_ID, LOCATION_ID
 Schema: departement_id, departement_name, manager_id, location_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
24/02/05 10:09:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID, DEPARTMENT_NAME, MANAGER_ID, LOCATION_ID
 Schema: departement_id, departement_name, manager_id, location_i

In [53]:
# Check the partition details to understand distribution
from pyspark.sql.functions import spark_partition_id, count

part_df = salted_joined_df.withColumn("partition_num", spark_partition_id()).groupBy("partition_num").agg(count(lit(1)).alias("count"))

part_df.show()

24/02/05 10:09:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID
 Schema: departement_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
24/02/05 10:09:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID
 Schema: departement_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
24/02/05 10:09:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: DEPARTMENT_ID
 Schema: departement_id
Expected: departement_id but found: DEPARTMENT_ID
CSV file: file:///Users/rahmanidriss/Desktop/dataScience/spark_cluster/notebooks/pyspark-zero-to-hero/datasets/departments.csv
24/02/05 10:09:58 WARN CSVHeaderChecker: CSV header does not conform to the 

+-------------+-----+
|partition_num|count|
+-------------+-----+
|           12|    4|
|           18|    1|
|           10|    6|
|            1|    2|
|           20|    3|
|           29|    1|
|           13|    1|
|            6|    1|
|            9|    1|
|           23|    1|
|            7|    2|
|           11|    3|
|           26|    1|
|           30|    2|
|           28|    1|
|            0|    2|
|           24|    2|
|           16|    1|
|            5|    1|
|           22|    4|
+-------------+-----+
only showing top 20 rows

