In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, cust_id long,order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
orders_df.show()

+--------+--------------------+-------+---------------+
|order_id|          order_date|cust_id|   order_status|
+--------+--------------------+-------+---------------+
|       1|2013-07-25 00:00:...|  11599|         CLOSED|
|       2|2013-07-25 00:00:...|    256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|  12111|       COMPLETE|
|       4|2013-07-25 00:00:...|   8827|         CLOSED|
|       5|2013-07-25 00:00:...|  11318|       COMPLETE|
|       6|2013-07-25 00:00:...|   7130|       COMPLETE|
|       7|2013-07-25 00:00:...|   4530|       COMPLETE|
|       8|2013-07-25 00:00:...|   2911|     PROCESSING|
|       9|2013-07-25 00:00:...|   5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|   5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|    918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|   1837|         CLOSED|
|      13|2013-07-25 00:00:...|   9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|   9842|     PROCESSING|
|      15|2013-07-25 00:00:...|   2568|       CO

In [5]:
spark.conf.get("spark.sql.files.maxPartitionBytes")

'134217728b'

In [6]:
134217728 / (1024 * 1024)

128.0

In [7]:
spark.sparkContext.defaultParallelism

2

In [8]:
orders_df.rdd.getNumPartitions()

9

In [9]:
new_orders_df = orders_df.repartition(1)

In [10]:
new_orders_df.rdd.getNumPartitions()

1

In [11]:
new_orders_df.write \
.format("csv") \
.mode("overwrite") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.save("orders_gz")

In [12]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("orders_gz")

In [13]:
orders_df.show()

+--------+--------------------+-------+---------------+
|order_id|          order_date|cust_id|   order_status|
+--------+--------------------+-------+---------------+
|   51049|2014-06-09 00:00:...|   4983|     PROCESSING|
|   51050|2014-06-09 00:00:...|   1840|        ON_HOLD|
|   51051|2014-06-09 00:00:...|   8207|       COMPLETE|
|   51052|2014-06-09 00:00:...|   6254|       COMPLETE|
|   51053|2014-06-09 00:00:...|    348|        PENDING|
|   51054|2014-06-09 00:00:...|   1468|       COMPLETE|
|   51055|2014-06-09 00:00:...|   3843|PENDING_PAYMENT|
|   51056|2014-06-09 00:00:...|   7178|PENDING_PAYMENT|
|   51057|2014-06-09 00:00:...|    749|       COMPLETE|
|   51058|2014-06-09 00:00:...|   5146|        PENDING|
|   51059|2014-06-09 00:00:...|   4645|         CLOSED|
|   51060|2014-06-09 00:00:...|    247|       COMPLETE|
|   51061|2014-06-09 00:00:...|   6551|        PENDING|
|   51062|2014-06-09 00:00:...|   5548|     PROCESSING|
|   51063|2014-06-09 00:00:...|   7020|         

In [14]:
orders_df.rdd.getNumPartitions()

1

In [15]:
new_orders_df.write \
.format("csv") \
.mode("overwrite") \
.option("codec", "snappy") \
.save("orders_snappy")

In [16]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("orders_snappy")

In [17]:
orders_df.show()

+--------+--------------------+-------+---------------+
|order_id|          order_date|cust_id|   order_status|
+--------+--------------------+-------+---------------+
|   33204|2014-02-15 00:00:...|   4512|         CLOSED|
|   33205|2014-02-15 00:00:...|   9436|PENDING_PAYMENT|
|   33206|2014-02-15 00:00:...|   2351|       COMPLETE|
|   33207|2014-02-15 00:00:...|   6009|       COMPLETE|
|   33208|2014-02-15 00:00:...|   2595|       COMPLETE|
|   33209|2014-02-15 00:00:...|    913|PENDING_PAYMENT|
|   33210|2014-02-15 00:00:...|   9204|         CLOSED|
|   33211|2014-02-15 00:00:...|    113|PENDING_PAYMENT|
|   33212|2014-02-15 00:00:...|   7686|PENDING_PAYMENT|
|   33213|2014-02-15 00:00:...|   8638|       COMPLETE|
|   33214|2014-02-15 00:00:...|   7740|PENDING_PAYMENT|
|   33215|2014-02-15 00:00:...|   1216|       COMPLETE|
|   33216|2014-02-15 00:00:...|   4227|       COMPLETE|
|   33217|2014-02-15 00:00:...|   4301|       COMPLETE|
|   33218|2014-02-15 00:00:...|    524|     PROC

In [18]:
orders_df.rdd.getNumPartitions()

1

In [19]:
new_orders_df.write \
.mode("overwrite") \
.save("orders_parquet_new")

In [20]:
orders_new_df = spark.read \
.load("orders_parquet_new")

In [21]:
orders_new_df.show()

+--------+--------------------+-------+---------------+
|order_id|          order_date|cust_id|   order_status|
+--------+--------------------+-------+---------------+
|   51049|2014-06-09 00:00:...|   4983|     PROCESSING|
|   51050|2014-06-09 00:00:...|   1840|        ON_HOLD|
|   51051|2014-06-09 00:00:...|   8207|       COMPLETE|
|   51052|2014-06-09 00:00:...|   6254|       COMPLETE|
|   51053|2014-06-09 00:00:...|    348|        PENDING|
|   51054|2014-06-09 00:00:...|   1468|       COMPLETE|
|   51055|2014-06-09 00:00:...|   3843|PENDING_PAYMENT|
|   51056|2014-06-09 00:00:...|   7178|PENDING_PAYMENT|
|   51057|2014-06-09 00:00:...|    749|       COMPLETE|
|   51058|2014-06-09 00:00:...|   5146|        PENDING|
|   51059|2014-06-09 00:00:...|   4645|         CLOSED|
|   51060|2014-06-09 00:00:...|    247|       COMPLETE|
|   51061|2014-06-09 00:00:...|   6551|        PENDING|
|   51062|2014-06-09 00:00:...|   5548|     PROCESSING|
|   51063|2014-06-09 00:00:...|   7020|         

In [22]:
orders_new_df.rdd.getNumPartitions()

2

In [23]:
new_orders_df.rdd.getNumPartitions()

1

In [24]:
final_orders_df = new_orders_df.repartition(20)

In [25]:
final_orders_df.rdd.getNumPartitions()

20

In [26]:
final_orders_df.write \
.format("csv") \
.mode("overwrite") \
.save("orders_final")

In [27]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("orders_final")

In [28]:
orders_df.show()

+--------+--------------------+-------+---------------+
|order_id|          order_date|cust_id|   order_status|
+--------+--------------------+-------+---------------+
|   17523|2013-11-11 00:00:...|   9277|         CLOSED|
|   17523|2013-11-11 00:00:...|   9277|         CLOSED|
|   42679|2014-04-14 00:00:...|   1479|        ON_HOLD|
|   42679|2014-04-14 00:00:...|   1479|        ON_HOLD|
|   42679|2014-04-14 00:00:...|   1479|        ON_HOLD|
|   33615|2014-02-17 00:00:...|   8581|       COMPLETE|
|   33615|2014-02-17 00:00:...|   8581|       COMPLETE|
|   32649|2014-02-11 00:00:...|   7926|PENDING_PAYMENT|
|   32649|2014-02-11 00:00:...|   7926|PENDING_PAYMENT|
|    3520|2013-08-14 00:00:...|  12180|       COMPLETE|
|    3520|2013-08-14 00:00:...|  12180|       COMPLETE|
|   39976|2014-03-29 00:00:...|   7426|         CLOSED|
|   39976|2014-03-29 00:00:...|   7426|         CLOSED|
|   39976|2014-03-29 00:00:...|   7426|         CLOSED|
|   27250|2014-01-10 00:00:...|  10025|         

In [29]:
orders_df.rdd.getNumPartitions()

10

In [30]:
spark.conf.get("spark.sql.files.openCostInBytes")

'4194304'

In [31]:
4194304/1024

4096.0

In [32]:
4096.0/1024

4.0

In [33]:
final_orders_df = new_orders_df.repartition(500)

In [34]:
final_orders_df.rdd.getNumPartitions()

500

In [35]:
final_orders_df.write \
.format("csv") \
.mode("overwrite") \
.save("orders_final_new")

In [36]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("orders_final_new")

In [37]:
orders_df.rdd.getNumPartitions()

24