In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
#import pyarrow as pa

In [None]:
spark = SparkSession \
  .builder \
  .appName("studying perf optimization") \
  .enableHiveSupport() \
  .config("spark.sql.warehousr.dir","/user/hive/warehouse") \
  .config("spark.sql.shuffle.partitions",10) \
  .config("spark.sql.execution.arrow.enabled", "true") \
  .master("yarn") \
  .getOrCreate()

In [74]:
spark

In [None]:
sc = spark.sparkContext

In [None]:
sc.getConf().getAll()

In [None]:
file_name = '/user/forgcpmak/data/data/hr_db/employees'


employee_schema = spark.read \
    .options(sep = '\\t') \
    .csv(file_name,
        header= False,
        inferSchema=True
       ).cache().checkpoint()

In [None]:
employee_schema.printSchema()

In [None]:
employee_schema.cache()
employee_schema.storageLevel

In [None]:
employee_schema.unpersist()
employee_schema.storageLevel

In [None]:
.storageLevel

In [None]:
employee_schema.rdd.getNumPartitions()

In [None]:
data_distribution = employee_schema.rdd.glom().map(len).collect()

In [None]:
print(data_distribution)

In [None]:
employee_schema.bucketby(8,col('_c0')).sortBy(_c0)

In [None]:
employee_schema.write.saveAsTable('EmpTblDF')

In [61]:
spark.table('EmpTblDF').write.bucketBy(10,"_c0").saveAsTable("EmpTblDF_Bucketed")

In [63]:
spark.sql("drop table if exists EmpTblDF")

In [68]:
employee_schema_salted = employee_schema.withColumn('_c1_salted', concat(employee_schema['_c1'], lit('_'), lit(floor(rand(seed=17) * 5) + 1)))

In [69]:
employee_schema_salted.show()

+---+-----------+----------+--------+------------+----------+----------+-------+----+----+----+-------------+
|_c0|        _c1|       _c2|     _c3|         _c4|       _c5|       _c6|    _c7| _c8| _c9|_c10|   _c1_salted|
+---+-----------+----------+--------+------------+----------+----------+-------+----+----+----+-------------+
|100|     Steven|      King|   SKING|515.123.4567|1987-06-17|   AD_PRES|24000.0|null|null|  90|     Steven_5|
|101|      Neena|   Kochhar|NKOCHHAR|515.123.4568|1989-09-21|     AD_VP|17000.0|null| 100|  90|      Neena_1|
|102|        Lex|   De Haan| LDEHAAN|515.123.4569|1993-01-13|     AD_VP|17000.0|null| 100|  90|        Lex_1|
|103|  Alexander|    Hunold| AHUNOLD|590.423.4567|1990-01-03|   IT_PROG| 9000.0|null| 102|  60|  Alexander_3|
|104|      Bruce|     Ernst|  BERNST|590.423.4568|1991-05-21|   IT_PROG| 6000.0|null| 103|  60|      Bruce_2|
|105|      David|    Austin| DAUSTIN|590.423.4569|1997-06-25|   IT_PROG| 4800.0|null| 103|  60|      David_4|
|106|     

In [70]:
df_medium = employee_schema.withColumn('_c1_exploded', explode(array([lit(i) for i in range(1,6)])))


In [71]:
df_medium.show()

+---+---------+-------+--------+------------+----------+-------+-------+----+----+----+------------+
|_c0|      _c1|    _c2|     _c3|         _c4|       _c5|    _c6|    _c7| _c8| _c9|_c10|_c1_exploded|
+---+---------+-------+--------+------------+----------+-------+-------+----+----+----+------------+
|100|   Steven|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|           1|
|100|   Steven|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|           2|
|100|   Steven|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|           3|
|100|   Steven|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|           4|
|100|   Steven|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|           5|
|101|    Neena|Kochhar|NKOCHHAR|515.123.4568|1989-09-21|  AD_VP|17000.0|null| 100|  90|           1|
|101|    Neena|Kochhar|NKOCHHAR|515.123.4568|1989-09-21|  AD_VP|17000.0|null| 100|  90|    

In [72]:
df_medium_1 = df_medium.withColumn('_c1_exploded', concat(df_medium['_c1'], lit('_'), df_medium['_c1_exploded'])). \
            drop('_c1').withColumnRenamed('_c1_exploded', '_c1')

In [73]:
df_medium_1.show()

+---+-------+--------+------------+----------+-------+-------+----+----+----+-----------+
|_c0|    _c2|     _c3|         _c4|       _c5|    _c6|    _c7| _c8| _c9|_c10|        _c1|
+---+-------+--------+------------+----------+-------+-------+----+----+----+-----------+
|100|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|   Steven_1|
|100|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|   Steven_2|
|100|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|   Steven_3|
|100|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|   Steven_4|
|100|   King|   SKING|515.123.4567|1987-06-17|AD_PRES|24000.0|null|null|  90|   Steven_5|
|101|Kochhar|NKOCHHAR|515.123.4568|1989-09-21|  AD_VP|17000.0|null| 100|  90|    Neena_1|
|101|Kochhar|NKOCHHAR|515.123.4568|1989-09-21|  AD_VP|17000.0|null| 100|  90|    Neena_2|
|101|Kochhar|NKOCHHAR|515.123.4568|1989-09-21|  AD_VP|17000.0|null| 100|  90|    Neena_3|
|101|Kochh

In [76]:
import pandas as pd
import numpy as np
from pyspark.sql import functions as F

In [91]:
# set smaller number of partitions so they can fit the screen
spark.conf.set('spark.sql.shuffle.partitions', 8)
# disable broadcast join to see the shuffle
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")

In [77]:
length = 100
names = np.random.choice(['Bob', 'James', 'Marek', 'Johannes', None], length)
amounts = np.random.randint(0, 1000000, length)

In [79]:
names

array(['Marek', 'Marek', 'Marek', 'Bob', None, 'James', 'Marek', 'Bob',
       'Johannes', 'James', 'Johannes', 'Marek', 'James', 'Marek',
       'Johannes', 'James', 'James', 'James', 'James', 'Johannes', None,
       'Johannes', 'Johannes', 'James', None, None, 'Marek', 'James',
       'Bob', 'Marek', 'Bob', None, 'Johannes', 'Marek', 'James', 'James',
       'Marek', None, None, 'Marek', 'Johannes', 'Marek', 'Marek', 'Bob',
       None, 'Bob', 'James', None, 'Bob', 'Bob', 'Johannes', 'James',
       None, 'Johannes', 'Marek', 'Marek', 'Johannes', 'Marek',
       'Johannes', 'Bob', 'Johannes', 'James', 'Bob', 'Bob', 'Bob', 'Bob',
       None, 'Marek', None, 'Bob', 'Marek', 'Bob', 'James', 'Marek', None,
       'James', 'Johannes', 'Bob', 'Johannes', None, 'Marek', 'Johannes',
       None, 'James', 'Johannes', 'Bob', 'Bob', 'Marek', None, None,
       'Marek', 'Bob', 'Johannes', None, 'Johannes', 'James', 'James',
       'Bob', None, 'Marek'], dtype=object)

In [80]:
amounts

array([625428, 354905, 786862, 329732, 385746, 883413, 327843, 776204,
       344419,  33482, 849905, 318942, 699952, 482184, 287383, 928853,
       897561, 502373, 274584, 537028, 842104, 154472, 114269, 905650,
       803617, 840777, 999541, 511334,  13475, 532444, 224162, 262648,
       410714, 232624, 632641, 495081, 824554,  12672, 942438, 735861,
       278370, 753731,  57537, 527781, 925971, 369108, 181062, 939915,
       728786, 769452, 415113, 222877, 476512, 659694, 694243, 732471,
       289787, 643635, 518212, 850919, 564583, 899643, 379627, 912322,
       445168,  34704, 191905, 568279, 913129, 596860, 618421, 532874,
       321354, 849430, 810338, 818161, 489120, 450119, 325739, 543400,
       877071,  39297,  49298, 923425, 473428, 334309, 593521, 770562,
       336717, 210286, 865504, 118600, 506354, 829402, 429917, 499996,
       877848, 142872, 748436, 851767])

In [81]:
country = np.random.choice(
    ['United Kingdom', 'Poland', 'USA', 'Germany', 'Russia'], 
    length,
    p = [0.05, 0.05, 0.8, 0.05, 0.05]
)

In [82]:
country

array(['USA', 'USA', 'Russia', 'USA', 'Russia', 'USA', 'USA', 'USA',
       'USA', 'USA', 'Germany', 'Russia', 'USA', 'USA', 'USA', 'USA',
       'USA', 'USA', 'Germany', 'USA', 'USA', 'United Kingdom', 'USA',
       'USA', 'USA', 'USA', 'Germany', 'USA', 'USA', 'USA', 'USA', 'USA',
       'USA', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA', 'United Kingdom',
       'United Kingdom', 'USA', 'Germany', 'USA', 'USA', 'USA', 'USA',
       'USA', 'USA', 'Poland', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA',
       'USA', 'Russia', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA',
       'USA', 'Poland', 'USA', 'USA', 'USA', 'Germany', 'Russia', 'USA',
       'Poland', 'USA', 'USA', 'USA', 'USA', 'United Kingdom', 'USA',
       'USA', 'USA', 'USA', 'USA', 'USA', 'Germany', 'USA', 'USA',
       'Russia', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA', 'USA',
       'USA', 'USA', 'USA'], dtype='<U14')

In [83]:
##this is python pada data frame
data = pd.DataFrame({'name': names, 'amount': amounts, 'country': country})

In [88]:
type(data)

pandas.core.frame.DataFrame

In [89]:
data

Unnamed: 0,name,amount,country
0,Marek,625428,USA
1,Marek,354905,USA
2,Marek,786862,Russia
3,Bob,329732,USA
4,,385746,Russia
...,...,...,...
95,James,499996,USA
96,James,877848,USA
97,Bob,142872,USA
98,,748436,USA


In [93]:
#this is spark dataframe
transactions = spark.createDataFrame(data).repartition('country')

In [111]:
transactions.rdd.getNumPartitions()

8

In [94]:
transactions.show()

+--------+------+--------------+
|    name|amount|       country|
+--------+------+--------------+
|   Marek|643635|        Russia|
|     Bob|532874|        Russia|
|    null|336717|        Russia|
|   Marek|786862|        Russia|
|    null|385746|        Russia|
|   Marek|318942|        Russia|
|Johannes|154472|United Kingdom|
|   Marek|735861|United Kingdom|
|Johannes|278370|United Kingdom|
|Johannes|325739|United Kingdom|
|Johannes|415113|           USA|
|   James|222877|           USA|
|    null|476512|           USA|
|Johannes|659694|           USA|
|   Marek|694243|           USA|
|   Marek|732471|           USA|
|Johannes|289787|           USA|
|Johannes|518212|           USA|
|     Bob|850919|           USA|
|Johannes|564583|           USA|
+--------+------+--------------+
only showing top 20 rows



In [95]:
countries = spark.createDataFrame(pd.DataFrame({
    'id': [11, 12, 13, 14, 15], 
    'country': ['United Kingdom', 'Poland', 'USA', 'Germany', 'Russia']
}))

In [96]:
countries.show()

+---+--------------+
| id|       country|
+---+--------------+
| 11|United Kingdom|
| 12|        Poland|
| 13|           USA|
| 14|       Germany|
| 15|        Russia|
+---+--------------+



In [97]:
df = transactions.join(countries, 'country')

In [98]:
df.show()

+--------------+--------+------+---+
|       country|    name|amount| id|
+--------------+--------+------+---+
|        Russia|   Marek|786862| 15|
|        Russia|    null|385746| 15|
|        Russia|   Marek|318942| 15|
|        Russia|   Marek|643635| 15|
|        Russia|     Bob|532874| 15|
|        Russia|    null|336717| 15|
|United Kingdom|Johannes|325739| 11|
|United Kingdom|Johannes|154472| 11|
|United Kingdom|   Marek|735861| 11|
|United Kingdom|Johannes|278370| 11|
|       Germany|Johannes|849905| 14|
|       Germany|   James|274584| 14|
|       Germany|   Marek|999541| 14|
|       Germany|   Marek| 57537| 14|
|       Germany|   Marek|618421| 14|
|       Germany|     Bob|334309| 14|
|        Poland|     Bob|769452| 12|
|        Poland|    null|191905| 12|
|        Poland|   Marek|849430| 12|
|           USA|   Marek|625428| 13|
+--------------+--------+------+---+
only showing top 20 rows



In [112]:
df.rdd.glom().collect()

[[],
 [],
 [Row(country='Russia', name='Marek', amount=786862, id=15),
  Row(country='Russia', name=None, amount=385746, id=15),
  Row(country='Russia', name='Marek', amount=318942, id=15),
  Row(country='Russia', name='Marek', amount=643635, id=15),
  Row(country='Russia', name='Bob', amount=532874, id=15),
  Row(country='Russia', name=None, amount=336717, id=15)],
 [Row(country='United Kingdom', name='Johannes', amount=325739, id=11),
  Row(country='United Kingdom', name='Johannes', amount=154472, id=11),
  Row(country='United Kingdom', name='Marek', amount=735861, id=11),
  Row(country='United Kingdom', name='Johannes', amount=278370, id=11)],
 [],
 [],
 [Row(country='Germany', name='Johannes', amount=849905, id=14),
  Row(country='Germany', name='James', amount=274584, id=14),
  Row(country='Germany', name='Marek', amount=999541, id=14),
  Row(country='Germany', name='Marek', amount=57537, id=14),
  Row(country='Germany', name='Marek', amount=618421, id=14),
  Row(country='Germany'

In [99]:
for i, part in enumerate(df.rdd.glom().collect()):
    print({i: part})

{0: []}
{1: []}
{2: [Row(country='Russia', name='Marek', amount=643635, id=15), Row(country='Russia', name='Bob', amount=532874, id=15), Row(country='Russia', name=None, amount=336717, id=15), Row(country='Russia', name='Marek', amount=786862, id=15), Row(country='Russia', name=None, amount=385746, id=15), Row(country='Russia', name='Marek', amount=318942, id=15)]}
{3: [Row(country='United Kingdom', name='Johannes', amount=154472, id=11), Row(country='United Kingdom', name='Marek', amount=735861, id=11), Row(country='United Kingdom', name='Johannes', amount=278370, id=11), Row(country='United Kingdom', name='Johannes', amount=325739, id=11)]}
{4: []}
{5: []}
{6: [Row(country='Germany', name='Marek', amount=618421, id=14), Row(country='Germany', name='Bob', amount=334309, id=14), Row(country='Germany', name='Johannes', amount=849905, id=14), Row(country='Germany', name='James', amount=274584, id=14), Row(country='Germany', name='Marek', amount=999541, id=14), Row(country='Germany', name

In [100]:
df.rdd.getNumPartitions()

8

In [104]:
replication_high = 7
high = broadcast(spark.range(replication_high).withColumnRenamed('id', 'replica_id'))
replication_low = 2
low = F.broadcast(spark.range(replication_low).withColumnRenamed('id', 'replica_id'))

In [107]:
type(high)
type(low)

pyspark.sql.dataframe.DataFrame

In [106]:
high.show()

+----------+
|replica_id|
+----------+
|         0|
|         1|
|         2|
|         3|
|         4|
|         5|
|         6|
+----------+



In [121]:
# determine which keys are highly over-represented, broadcast them
skewed_keys = broadcast(
	transactions.freqItems(['country'], 0.6)
)

In [122]:
type(skewed_keys)

pyspark.sql.dataframe.DataFrame

In [123]:
skewed_keys.show()

+-----------------+
|country_freqItems|
+-----------------+
|            [USA]|
+-----------------+



In [124]:
# determine which keys are highly over-represented, broadcast them
skewed_keys = F.broadcast(
	transactions.freqItems(['country'], 0.6)
	.select(F.explode('country_freqItems').alias('country_freqItems'))
)

In [117]:
skewed_keys.show()

+-----------------+
|country_freqItems|
+-----------------+
|              USA|
+-----------------+



In [125]:
# there is data frame named high above 

countries_skewed_keys = (
    countries
    .join(
        skewed_keys, 
        countries.country == skewed_keys.country_freqItems, 
        how='inner'
    )
    .crossJoin(high)
    .withColumn('composite_key', F.concat('country', F.lit('@'), 'replica_id'))
)

In [126]:
countries_skewed_keys.show()

+---+-------+-----------------+----------+-------------+
| id|country|country_freqItems|replica_id|composite_key|
+---+-------+-----------------+----------+-------------+
| 13|    USA|              USA|         0|        USA@0|
| 13|    USA|              USA|         1|        USA@1|
| 13|    USA|              USA|         2|        USA@2|
| 13|    USA|              USA|         3|        USA@3|
| 13|    USA|              USA|         4|        USA@4|
| 13|    USA|              USA|         5|        USA@5|
| 13|    USA|              USA|         6|        USA@6|
+---+-------+-----------------+----------+-------------+



In [127]:
countries_rest = (
    countries
    .join(
        skewed_keys, 
        countries.country == skewed_keys.country_freqItems, 
        how='leftanti'
    )
    .crossJoin(low)
    .withColumn('composite_key', F.concat('country', F.lit('@'), 'replica_id'))
    .withColumn('country_freqItems', F.lit(None))
)

In [128]:
countries_rest.show()

+---+--------------+----------+----------------+-----------------+
| id|       country|replica_id|   composite_key|country_freqItems|
+---+--------------+----------+----------------+-----------------+
| 11|United Kingdom|         0|United Kingdom@0|             null|
| 11|United Kingdom|         1|United Kingdom@1|             null|
| 12|        Poland|         0|        Poland@0|             null|
| 12|        Poland|         1|        Poland@1|             null|
| 14|       Germany|         0|       Germany@0|             null|
| 14|       Germany|         1|       Germany@1|             null|
| 15|        Russia|         0|        Russia@0|             null|
| 15|        Russia|         1|        Russia@1|             null|
+---+--------------+----------+----------------+-----------------+



In [129]:
# this is now the entire uniform dataset replicated differently
countries_replicated = countries_skewed_keys.union(countries_rest)

In [130]:
countries_replicated.show()

+---+--------------+-----------------+----------------+-------------+
| id|       country|country_freqItems|      replica_id|composite_key|
+---+--------------+-----------------+----------------+-------------+
| 13|           USA|              USA|               0|        USA@0|
| 13|           USA|              USA|               1|        USA@1|
| 13|           USA|              USA|               2|        USA@2|
| 13|           USA|              USA|               3|        USA@3|
| 13|           USA|              USA|               4|        USA@4|
| 13|           USA|              USA|               5|        USA@5|
| 13|           USA|              USA|               6|        USA@6|
| 11|United Kingdom|                0|United Kingdom@0|         null|
| 11|United Kingdom|                1|United Kingdom@1|         null|
| 12|        Poland|                0|        Poland@0|         null|
| 12|        Poland|                1|        Poland@1|         null|
| 14|       Germany|

In [131]:
transactions_tagged = (
    transactions
    .join(
        skewed_keys, 
        transactions.country == skewed_keys.country_freqItems, 
        how='left'
    )
    .withColumn('replica_id',
        F.when(
            F.isnull(F.col('country_freqItems')), 
            (F.rand() * replication_low).cast('int'),
        )
        .otherwise((F.rand() * replication_high).cast('int'))
    )
    .withColumn('composite_key', F.concat('country', F.lit('@'), 'replica_id'))
)

In [132]:
# now we can join on the composite key
df = transactions_tagged.join(countries_replicated, 'composite_key')

for i, part in enumerate(df.rdd.glom().collect()):
    print({i: part})

{0: []}
{1: [Row(composite_key='USA@0', name='Marek', amount=625428, country='USA', country_freqItems='USA', replica_id=0, id=13, country='USA', country_freqItems='USA', replica_id='0'), Row(composite_key='USA@0', name='Bob', amount=776204, country='USA', country_freqItems='USA', replica_id=0, id=13, country='USA', country_freqItems='USA', replica_id='0'), Row(composite_key='USA@0', name='Marek', amount=482184, country='USA', country_freqItems='USA', replica_id=0, id=13, country='USA', country_freqItems='USA', replica_id='0'), Row(composite_key='USA@0', name='James', amount=905650, country='USA', country_freqItems='USA', replica_id=0, id=13, country='USA', country_freqItems='USA', replica_id='0'), Row(composite_key='USA@0', name='Marek', amount=824554, country='USA', country_freqItems='USA', replica_id=0, id=13, country='USA', country_freqItems='USA', replica_id='0'), Row(composite_key='USA@0', name='James', amount=181062, country='USA', country_freqItems='USA', replica_id=0, id=13, co