In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840629 sha256=ea10d8c8966ced1c3a6c72ccbe00d80717fa556c9e123a08a3624ba10f22d3f6
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark_practice_4").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/25 08:09:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.setLogLevel("ERROR")

## SHUFFLING - SKEWNESS - data distribution across the partitions

### **1. Run a schuffle *groupByKey* to see how the skew effect computation resources**

In [4]:
data_sample = [(1,4),(2,2),(2,1),(3,5),(2,5),(2,10),(2,7),(3,4),(2,1),(2,4),(4,4)]
rdd_sample = spark.sparkContext.parallelize(data_sample, 3)
rdd_sample.glom().collect()

                                                                                

[[(1, 4), (2, 2), (2, 1)],
 [(3, 5), (2, 5), (2, 10)],
 [(2, 7), (3, 4), (2, 1), (2, 4), (4, 4)]]

In [5]:
rdd_sample_grouped = rdd_sample.groupByKey()

for item in rdd_sample_grouped.collect():
    print(item[0], [value for value in item[1]])

# Output:

# 3 [5, 4]
# 1 [4]
# 4 [4]
# 2 [2, 1, 5, 10, 7, 1, 4] --> Skewness uneven data distribution


[Stage 1:>                                                          (0 + 3) / 3]

3 [5, 4]
1 [4]
4 [4]
2 [2, 1, 5, 10, 7, 1, 4]


                                                                                

In [6]:
# show partitions (result = 4)
rdd_sample_grouped.glom().collect()

# output:
# [[(3, <pyspark.resultiterable.ResultIterable at 0x7cb6b85026e0>)],
#  [(1, <pyspark.resultiterable.ResultIterable at 0x7cb6b8502ec0>),
#   (4, <pyspark.resultiterable.ResultIterable at 0x7cb6b8503760>)],
#  [(2, <pyspark.resultiterable.ResultIterable at 0x7cb6b85031f0>)]]

[[(3, <pyspark.resultiterable.ResultIterable at 0x7fe6633085b0>)],
 [(1, <pyspark.resultiterable.ResultIterable at 0x7fe6633089a0>),
  (4, <pyspark.resultiterable.ResultIterable at 0x7fe663308a00>)],
 [(2, <pyspark.resultiterable.ResultIterable at 0x7fe663308a60>)]]

### **Solution use *SALTING***

In [7]:
# redistribute data by adding random value

import numpy as np
import random

# generate skew values
key_1 = ['a'] * 10
key_2 = ['b'] * 6000000
key_3 = ['c'] * 800
key_4 = ['d'] * 10000

keys = key_1 + key_2 + key_3 + key_4

random.shuffle(keys)

value_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
value_2 = list(np.random.randint(low = 1, high = 100, size = len(key_2)))
value_3 = list(np.random.randint(low = 1, high = 100, size = len(key_3)))
value_4 = list(np.random.randint(low = 1, high = 100, size = len(key_4)))

values = value_1 + value_2 + value_3 + value_4

pair_skew = list(zip(keys, values))

In [8]:
# load skew values into RDD
rdd = spark.sparkContext.parallelize(pair_skew, 8)

In [9]:
%time
rdd_group = rdd.groupByKey().cache()
# run a simple data transformation on skewed data and see the result on cluster DAG
rdd_group.map(lambda pair: (pair[0], [i for i in pair[1]])).count()

# Output:
# 4 --> we have 4 partitions with value that represent number of keys in groupByKey

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 7.15 µs


                                                                                

4

### *groupByKey* **Migrate data skewness: *SALTING***

In [10]:
#  We define the salting method for data redistribution
def salting(val):
    tmp = val + "_" + str(random.randint(0,5)) # increasing the random ('high') we have better results 
    return tmp

salting("10")

'10_0'

In [11]:
# Apply salting method to skewed data for data redistribution
%time
rdd_salting = rdd.map(lambda x: (salting(x[0]), x[1]))

rdd_grouped = rdd_salting.groupByKey().cache()
# run a simple data transformation on skewed data and see the result on cluster (job/DAG/stage)
rdd_grouped.map(lambda pair: (pair[0], [i for i in pair[1]])).count()

# Output:
# 23 --> we get 23 partitions after repartitioning with randaom data
# increasing the random value ('high') in 'salting' method, we can have better results

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 7.15 µs


                                                                                

23

### **2. Run a schuffle *sortByKey* to see how the skew effect computation resources**

In [12]:
rdd_sort = rdd.sortByKey(ascending=False, numPartitions=4)
rdd_sort.count()

                                                                                

6010810

### *sortByKey* **Migrate data skewness: *SALTING***

In [13]:
rdd_sorted = rdd_salting.sortByKey(ascending=False, numPartitions=4)
rdd_sorted.count()

# Output:

# We have the same size data in the result (6010810)
# The redistribution data in the partitions are better than the previous one.
# That is confirmed by the time taken by the computation, 2 sec # 18 sec

                                                                                

6010810

### **3. Run a schuffle *join* to see how the skew effect computation resources**

In [14]:
small_rdd1 = spark.sparkContext.parallelize([(2,3),(1,3),(1,4),(3,1),(5,1)], 3)
small_rdd2 = spark.sparkContext.parallelize([(3,4),(0,1),(1,2),(2,1)], 2)

print(small_rdd1.collect())
print(small_rdd2.collect())

[(2, 3), (1, 3), (1, 4), (3, 1), (5, 1)]
[(3, 4), (0, 1), (1, 2), (2, 1)]


In [15]:
join1 = small_rdd1.join(small_rdd2)
join1.collect()

                                                                                

[(1, (3, 2)), (1, (4, 2)), (2, (3, 1)), (3, (1, 4))]

In [16]:
join1.getNumPartitions()

5

In [17]:
join1.glom().collect()

[[], [(1, (3, 2)), (1, (4, 2))], [(2, (3, 1))], [(3, (1, 4))], []]

In [18]:
# generate skew values for 'join'
key_1 = ['a'] * 5
key_2 = ['b'] * 60
key_3 = ['c'] * 100

keys = key_1 + key_2 + key_3

random.shuffle(keys)

value_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
value_2 = list(np.random.randint(low = 1, high = 100, size = len(key_2)))
value_3 = list(np.random.randint(low = 1, high = 100, size = len(key_3)))

values = value_1 + value_2 + value_3

pair_skew_j = list(zip(keys, values))

In [19]:
small_rdd = spark.sparkContext.parallelize(pair_skew_j, 2)

In [20]:
small_rdd.count()

165

In [21]:
join1.count()

4

In [22]:
# join without salting
%time
rdd_j = rdd.join(small_rdd)
rdd_j.map(lambda x: int(x[1][0] + x[1][1])).reduce(lambda x,y: x+y)

CPU times: user 0 ns, sys: 4 µs, total: 4 µs
Wall time: 7.39 µs


                                                                                

37978350685

### *join* **Migrate data skewness: *SALTING***

In [23]:
# add a random value to the key --> (key,randint)
rdd_new = rdd.map(lambda x: ((x[0], random.randint(0, 10)), x[1])).cache() # increase random value for better results

# Replicate the small data
small_rdd_new = small_rdd.cartesian(spark.sparkContext.parallelize(range(0, 11))).map(lambda x: ((x[0][0], x[1]), x[0][1])).cache()

In [29]:
# join with salting
%time
rdd_join = rdd_new.join(small_rdd_new)
rdd_join.map(lambda x: int(x[1][0] + x[1][1])).reduce(lambda x,y: x+y)

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 6.91 µs


                                                                                

37978350685

### **4. Loading data Skew**

In [30]:
import numpy as np
import pandas as pd
import random

# sale dataset:
# table 1: OrderId, Qty, Sales, Discount, (yes=1, no=0)
# table 2: ProductID, OrderID, Product, Price

#################### Table 1 #####################

key_1 = [101] * 1
key_2 = [201] * 8
key_3 = [301] * 4
key_4 = [401] * 2

OrderId = key_1 + key_2 + key_3 + key_4

random.shuffle(OrderId)

Qty_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
Qty_2 = list(np.random.randint(low = 1, high = 200, size = len(key_2)))
Qty_3 = list(np.random.randint(low = 1, high = 1000, size = len(key_3)))
Qty_4 = list(np.random.randint(low = 1, high = 50, size = len(key_4)))

Qty = Qty_1 + Qty_2 + Qty_3 + Qty_4

Sales_1 = list(np.random.randint(low = 1, high = 100, size = len(key_1)))
Sales_2 = list(np.random.randint(low = 1, high = 3400, size = len(key_2)))
Sales_3 = list(np.random.randint(low = 1, high = 2000, size = len(key_3)))
Sales_4 = list(np.random.randint(low = 1, high = 1000, size = len(key_4)))

Sales = Sales_1 + Sales_2 + Sales_3 + Sales_4

Discount = list(np.random.randint(low = 1, high = 2, size = len(OrderId)))
data1 = list(zip(OrderId, Qty, Sales, Discount))

# create pandas dataFrame
data_skew = pd.DataFrame(data1, columns=['OrderID', 'Qty', 'Sales', 'Discount'])

In [31]:
#################### Table 2 #####################
data2 = [[1, 101, 'pencil', 4.99],
         [2, 101, 'book', 9.5],
         [3, 101, 'scissors', 14],
         [4, 301, 'glue', 7],
         [5, 201, 'marker', 8.49],
         [6, 301, 'label', 2],
         [7, 201, 'calculator', 3.99],
         [8, 501, 'eraser', 1.55],
        ]

data_small = pd.DataFrame(data2, columns=['ProductID', 'OrderID', 'Product', 'Price'])

In [32]:
# create pyspark DF from Pandas
# Optimize conversion between pySpark and Pandas DF: Enable arrow-based columar data transfers

spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df_skew = spark.createDataFrame(data_skew)
df_skew.printSchema()
df_skew.show()
print(df_skew.rdd.getNumPartitions())

root
 |-- OrderID: long (nullable = true)
 |-- Qty: long (nullable = true)
 |-- Sales: long (nullable = true)
 |-- Discount: long (nullable = true)

+-------+---+-----+--------+
|OrderID|Qty|Sales|Discount|
+-------+---+-----+--------+
|    201| 77|    6|       1|
|    301|185|  564|       1|
|    201| 56|  803|       1|
|    301|110| 2743|       1|
|    201|181| 2532|       1|
|    301| 59|  936|       1|
|    101|114|  288|       1|
|    201|167|  138|       1|
|    201|131| 1402|       1|
|    401|215|  209|       1|
|    201|576|  655|       1|
|    301|339|  767|       1|
|    201|256|  203|       1|
|    201| 46|  173|       1|
|    401| 22|  908|       1|
+-------+---+-----+--------+

4


In [33]:
df_small = spark.createDataFrame(data_small)
df_small.printSchema()
df_small.show()
df_small.rdd.getNumPartitions()

root
 |-- ProductID: long (nullable = true)
 |-- OrderID: long (nullable = true)
 |-- Product: string (nullable = true)
 |-- Price: double (nullable = true)

+---------+-------+----------+-----+
|ProductID|OrderID|   Product|Price|
+---------+-------+----------+-----+
|        1|    101|    pencil| 4.99|
|        2|    101|      book|  9.5|
|        3|    101|  scissors| 14.0|
|        4|    301|      glue|  7.0|
|        5|    201|    marker| 8.49|
|        6|    301|     label|  2.0|
|        7|    201|calculator| 3.99|
|        8|    501|    eraser| 1.55|
+---------+-------+----------+-----+



4

### **4.1. Run a schuffle *join* with small size of data**

In [34]:
joined_df = df_skew.join(df_small, df_skew.OrderID == df_small.OrderID, how="inner")

In [35]:
joined_df.count()

27

In [36]:
joined_df.show(30)

+-------+---+-----+--------+---------+-------+----------+-----+
|OrderID|Qty|Sales|Discount|ProductID|OrderID|   Product|Price|
+-------+---+-----+--------+---------+-------+----------+-----+
|    201| 77|    6|       1|        7|    201|calculator| 3.99|
|    201| 77|    6|       1|        5|    201|    marker| 8.49|
|    301|185|  564|       1|        6|    301|     label|  2.0|
|    301|185|  564|       1|        4|    301|      glue|  7.0|
|    201| 56|  803|       1|        7|    201|calculator| 3.99|
|    201| 56|  803|       1|        5|    201|    marker| 8.49|
|    301|110| 2743|       1|        6|    301|     label|  2.0|
|    301|110| 2743|       1|        4|    301|      glue|  7.0|
|    201|181| 2532|       1|        7|    201|calculator| 3.99|
|    201|181| 2532|       1|        5|    201|    marker| 8.49|
|    301| 59|  936|       1|        6|    301|     label|  2.0|
|    301| 59|  936|       1|        4|    301|      glue|  7.0|
|    101|114|  288|       1|        3|  

In [37]:
joined_df.rdd.getNumPartitions()

4

In [38]:
joined_df.rdd.glom().collect()

[[Row(OrderID=201, Qty=77, Sales=6, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=77, Sales=6, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49),
  Row(OrderID=301, Qty=185, Sales=564, Discount=1, ProductID=6, OrderID=301, Product='label', Price=2.0),
  Row(OrderID=301, Qty=185, Sales=564, Discount=1, ProductID=4, OrderID=301, Product='glue', Price=7.0),
  Row(OrderID=201, Qty=56, Sales=803, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=56, Sales=803, Discount=1, ProductID=5, OrderID=201, Product='marker', Price=8.49)],
 [Row(OrderID=301, Qty=110, Sales=2743, Discount=1, ProductID=6, OrderID=301, Product='label', Price=2.0),
  Row(OrderID=301, Qty=110, Sales=2743, Discount=1, ProductID=4, OrderID=301, Product='glue', Price=7.0),
  Row(OrderID=201, Qty=181, Sales=2532, Discount=1, ProductID=7, OrderID=201, Product='calculator', Price=3.99),
  Row(OrderID=201, Qty=181, Sa

In [None]:
# 