# CHAPTER 7. Optimizing and Tuning Spark Applications
> 6장에서는 스파크가 어떻게 메모리 관리를 하고, 고급 API 를 통해서 데이터셋을 구성하는 지에 대해 학습했으며, 이번 장에서는 최적화를 위한 스파크 설정과, 조인 전략들을 살펴보고, 스파크 UI 를 통해 안좋은 영향을 줄 수 있는 것들에 대한 힌트를 얻고자 합니다.

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)


## 7.1 Optimizing and Tuning Spark for Efficiency
> 스파크는 [튜닝](https://spark.apache.org/docs/latest/tuning.html)을 위한 다양한 설정을 제공하며, [설정](https://spark.apache.org/docs/latest/configuration.html)값을 통해 확인할 수 있습니다

### 7.1.1 Viewing and Setting Apache Spark Configurations
> 아래의 순서대로 스파크는 설정값을 읽어들이며, 가장 마지막에 변경된 값이 반영됩니다

#### 1. 설치된 스파크 경로의 conf/spark-default.conf 파일을 생성 및 수정

#### 2. 스파크 실행 시에 옵션을 지정하는 방법
```bash
$ spark-submit --conf spark.sql.shuffle.partitions=5 --conf "spark.executor.memory=2g" --class main.scala.chapter7.SparkConfig_7_1 jars/mainscala-chapter7_2.12-1.0.jar
```

#### 3. 스파크 코드 내에서 직접 지정하는 방법
```scala
SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memroy", "2g")
...
```

In [4]:
# 파이스파크 내에서는 sparkContext 를 통해서 해당 정보를 가져올 수 있습니다
def printConfigs(session):
    for x in sorted(session.sparkContext.getConf().getAll()):
        print(x)

printConfigs(spark)

('spark.app.id', 'local-1619576347079')
('spark.app.name', 'pyspark-shell')
('spark.driver.host', 'ddd9c0cd39a3')
('spark.driver.port', '42483')
('spark.executor.id', 'driver')
('spark.master', 'local[*]')
('spark.rdd.compress', 'True')
('spark.serializer.objectStreamReset', '100')
('spark.sql.session.timeZone', 'Asia/Seoul')
('spark.submit.deployMode', 'client')
('spark.submit.pyFiles', '')
('spark.ui.showConsoleProgress', 'true')


In [5]:
# SparkSQL의 경우 내부적으로 사용되는 설정값이 다르기 때문에 더 많은 정보가 출력됩니다
spark.sql("SET -v").select("key", "value").where("key like '%spark.sql%'").show(n=5, truncate=False)

+---------------------------------------------------------+----------------------------------------------------------------+
|key                                                      |value                                                           |
+---------------------------------------------------------+----------------------------------------------------------------+
|spark.sql.adaptive.advisoryPartitionSizeInBytes          |<value of spark.sql.adaptive.shuffle.targetPostShuffleInputSize>|
|spark.sql.adaptive.coalescePartitions.enabled            |true                                                            |
|spark.sql.adaptive.coalescePartitions.initialPartitionNum|<undefined>                                                     |
|spark.sql.adaptive.coalescePartitions.minPartitionNum    |<undefined>                                                     |
|spark.sql.adaptive.enabled                               |false                                                           |


* 스파크 UI 를 통해서도 확인이 가능합니다

![spakr-ui](images/spark-ui.png)

In [6]:
# 스파크 기본 설정 spark.sql.shuffle.partitions 값을 확인하고, 프로그램 상에서 변경 후 테스트 합니다
num_partitions = spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.shuffle.partitions", 5)
mod_partitions = spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.shuffle.partitions", num_partitions)
print(num_partitions, mod_partitions)

200 5


### 7.1.2 Scaling Spark for Large Workloads

#### 1. 정적 vs 동적 리소스 할당의 선택
> CPU 및 Memory 사용을 애플리케이션에 따라 지정하는 정적 리소스 할당과 동적 리소스 할당은 처리해야 할 데이터의 특성에 따라 선택할 수 있으며, 환경설정을 다르게 구성해야 합니다.

* 데이터의 크기가 일정하지 않고, 유동적
* 특히 데이터의 크기가 고르지 않은 스트리밍 처리
* 멀티테넌시 환경의 분석용 클러스터의 데이터 리소스 관리

#### 2. 동적 리소스 할당 설정 가이드
* 기본 설정은 false 이므로 아래의 값들에 대한 설정이 별도로 되어야 하며, REPL 환경에서 지원하지 않는 값들도 존재하므로, 프로그램을 통한 수정이 필요합니다
```
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
```
* 아래의 과정을 통해 동적 리소스를 관리합니다
  - 1. 스파크 드라이버가 클러스터 매니저에 2개(minExecutors)의 익스큐터를 요청합니다
  - 2. 작업 큐의 백로그가 증가하여, 백로그 타임아웃(schedulerBacklogTimeout)이 발생하는 경우 새로운 익스큐터 요청이 발생합니다
  - 3. 스케줄링 된 작업들이 1분 이상 지연되는 경우 드라이버는 새로운 익스큐터를 최대 20개(maxExecutors) 까지 요청합니다
  - 4. 스파크 드라이버는 2분 이상 (executorIdleTimeout) 작업이 할당되지 않는 익스큐터 들을 종료시킵니다

#### 3. 스파크 익스큐터의 메모리와 셔플 서비스의 설정 가이드
![external-memory-layout](images/external-memory-layout.png)
* 맵, 스필 그리고 병합 프로세스들이 I/O 부족에 따른 문제점을 갖지 않으며, 최종 셔플 파티션이 디스크에 저장되기 전에 버퍼 메모리를 확보할 수 있도록 설정을 아래와 같이 조정할 수 있습니다
![spark-conf-io](images/spark-conf-io.png)

#### 4. 스파크 병렬성을 최대화
> 스파크가 데이터를 어떻게 저장소로부터 메모리에 적재하는지, 스파크에 있어서 파티션이 어떻게 활용되는지를 이해해야 합니다

* 매 스테이지 마다 많은 타스크들이 존재하지만, 스파크는 기껏해야 코어당 작업당 하나의 스레드만 할당하며, 개별 타스크는 독립된 파티션 하나를 처리합니다.
* 리소스 사용을 최적화하고, 병렬성을 최대화 하려면 익스큐터에 존재하는 코어수들 만큼 많은 파티션들이 존재해야 합니다. (유휴 코어를 두지 않기 위함)
![figure.7-3](images/figure.7-3.png)


#### 5. 파티션은 구성에 대한 이해와 재구성
* 분산 저장소에 저장시에 구성되는 경우
  - HDFS, S3 등의 저장소의 기본 파일블록의 크기는 64mb, 128mb 이며, 파일 크기가 작고 많아질 수록 파티션당 할당해야 하는 코어수가 모자라기 때문에 "small file problem" 을 피해야 합니다
* 스파크의 셔플링을 통해 생성되는 경우
  - 집계함수나 조인과 같은 Wide Transformation 과정에서 셔플링이 발생 (Network & Disk I/O 비용)
  - 기본 셔플 파티션 수는 200개인데 작은 데이터집합이나, 스트리밍 워크로드 등에는 **충분히 많은 수이기 때문에 조정이 필요**합니다
  - 최종 결과 테이블의 용량 및 사용 용도에 따라 의도적인 파티션 수를 조정할 수 있습니다 (repartition, coalesce)

#### 질문과 답변
* 대부분 dynamic allocation 을 쓰면 좋을거 아닐까?
  - 워크로드가 예상된다면 동적할당은 필요없는 리소스 및 관리 비용이 더 들어가기 때문에 성능에 영향을 줄 수 있습니다
* REPL 이 뭔가?
  - Read-Evaluate-Print Loop 의 약자
* dynamic allocation 은 수시로 변경할 수 없는가? 왜 그런가?
* off-heap 이 좋으면 모두 off-heap 사용하지 왜 jvm 메모리를 이렇게나 많이 사용하는가?
  - 자바에서 사용하는 구조화된 API의 장점과 네이티브 라이브러리의 데이터 송수신 및 읽고 쓰기의 장점을 모두 취하기 위함
* execution vs storage 메모리의 비율을 어떻게 확인할 수 있는가? 오히려 삽질 아닌가?
  - 직접 셋팅하기 보다는 관련 옵션을 조정하면서 튜닝합니다
* spark 작업에서의 spill 절차는 무엇이고 왜 발생하며 어떻게 해결할 수 있는가?
  - 스파크 익스큐터가 위의 각 레이어에 할당된 메모리를 모두 사용한 경우 디스크로 저장하는 경우를 Spill 이라고 합니다
  - Disk I/O 는 성능에 큰 영향을 미치기 때문에 SSD 를 사용한다면 좋은 성능을 효과를 기대할 수 있습니다
```
operations, the shuffle will spill results to executors’ local disks at the location specified in spark.local.directory. Having performant SSD disks for this operation will boost the performance.
```

In [7]:
numDF = spark.range(1000).repartition(16)
numDF.rdd.getNumPartitions()

16

## 7.2 Caching and Persistence of Data
> cache() 와 persist() 는 거의 동일하지만, persist() 의 경우 persistent level 을 결정할 수 있습니다 (메모리, 디스크, 직렬화, 비직렬화 등)

### 7.2.1 DataFrame.cache()
* DataFrame 은 부분적으로 캐시가 가능하지만, 파티션은 그렇지 못 합니다. 예를 들어 8개의 파티션 중 4.5개 정도를 사용할 메모리가 있는 경우 4개의 파티션만 캐시됩니다
  - 캐시되지 않은 데이터를 읽는 데에는 문제가 없지만, 모두 다시 계산되어야 하는 비용이 발생합니다
* cache() 혹은 persist() 호출 시에 DataFrame 은 take(1) 같은 경우 첫 번째 파티션만 캐싱이 이루어지고, count() 같은 action 수행 시에는 모든 데이터가 캐싱이 된다는 점을 알고 있어야 합니다
  - rdd.cache()는 persist(StorageLevel.MEMORY_ONLY) 로 
  - df.cache()는 persist(StorageLevel.MEMORY_AND_DISK) 로 동작합니다

![persist-storage](images/persist-storage.png)

In [8]:
# 반복적으로 수행하는 경우 노트북 프로그램의 캐싱될 수 있기 때문에, 매번 다른 프로그램 수행을 위해서 랜덤 시드숫자를 매번 더해줍니다.
import random
seed = random.randint(1,100)
print("seed number is {}".format(seed))
cached = spark.range(10 * 1000 * 1000 + seed).toDF("id").withColumn("square", expr("id * id"))
import time
start = time.time()
cached.cache() # 데이터를 캐싱
cached.count() # Materialize the cache
print(time.time()-start)

seed number is 18
3.8046398162841797


In [9]:
start = time.time()
cached.count()
print(time.time()-start)

0.1505875587463379


### 7.2.2 DataFrame.persist()
![table.7-2](images/table.7-2.png)
![figure.7-5](images/figure.7-5.png)

* 테이블 캐시를 사용하는 경우도 cache() 와 동일한 결과를 보여줍니다
![figure.7-5-1](images/figure.7-5-1.png)

In [10]:
# 반복적으로 수행하는 경우 노트북 프로그램의 캐싱될 수 있기 때문에, 매번 다른 프로그램 수행을 위해서 랜덤 시드숫자를 매번 더해줍니다.
import random
seed = random.randint(1,100)
print("seed number is {}".format(seed))
persisted = spark.range(10 * 1000 * 1000 + seed).toDF("id").withColumn("square", expr("id * id"))
import time
start = time.time()
from pyspark import StorageLevel
persisted.persist(StorageLevel.DISK_ONLY) # 데이터를 캐싱
persisted.count() # Materialize the cache
print(time.time()-start)

seed number is 37
2.6302409172058105


In [11]:
start = time.time()
persisted.count()
print(time.time()-start)

0.19655108451843262


In [12]:
# 반복적으로 수행하는 경우 노트북 프로그램의 캐싱될 수 있기 때문에, 매번 다른 프로그램 수행을 위해서 랜덤 시드숫자를 매번 더해줍니다.
import random
seed = random.randint(1,100)
print("seed number is {}".format(seed))
table_cached = spark.range(10 * 1000 * 1000 + seed).toDF("id").withColumn("square", expr("id * id"))
table_cached.createOrReplaceTempView("square")
import time
start = time.time()
spark.sql("CACHE TABLE square") # 데이터를 캐싱
spark.sql("SELECT COUNT(1) FROM square") # Materialize the cache
print(time.time()-start)

seed number is 39
2.4470584392547607


In [13]:
start = time.time()
spark.sql("SELECT COUNT(1) FROM square")
print(time.time()-start)

0.02222752571105957


### 7.2.3 When to Cache and Persist
> 대용량 테이블을 자주 쿼리하는 경우 혹은 변환에 활용되는 경우에 사용합니다
* 기계학습 훈련 시와 같이 반복 적인 데이터프레임의 조회
* ETL 데이터 파이프라인의 변환작업에 빈번하게 사용되는 공통 테이블의 사용

### 7.2.4 When Not to Cache and Persist
> 너무 크거나, 자주 사용되지 않는 테이블의 경우는 지양합니다. 왜냐하면 데이터의 직렬화, 역직렬화에 따른 비용이 상당하기 때문에 오히려 전체적인 처리시간에 악영향을 줄 수 있습니다.
* 메모리에 들어가지 않을 만큼 큰 데이터
* 크기에 비해서 자주 사용되지 않는 데이터


## 7.3 A Family of Spark Joins

```text
/**
   * Select the proper physical plan for join based on join strategy hints, the availability of
   * equi-join keys and the sizes of joining relations. Below are the existing join strategies,
   * their characteristics and their limitations.
   *
   * - Broadcast hash join (BHJ):
   *     Only supported for equi-joins, while the join keys do not need to be sortable.
   *     Supported for all join types except full outer joins.
   *     BHJ usually performs faster than the other join algorithms when the broadcast side is
   *     small. However, broadcasting tables is a network-intensive operation and it could cause
   *     OOM or perform badly in some cases, especially when the build/broadcast side is big.
   *
   * - Shuffle hash join:
   *     Only supported for equi-joins, while the join keys do not need to be sortable.
   *     Supported for all join types.
   *     Building hash map from table is a memory-intensive operation and it could cause OOM
   *     when the build side is big.
   *
   * - Shuffle sort merge join (SMJ):
   *     Only supported for equi-joins and the join keys have to be sortable.
   *     Supported for all join types.
   *
   * - Broadcast nested loop join (BNLJ):
   *     Supports both equi-joins and non-equi-joins.
   *     Supports all the join types, but the implementation is optimized for:
   *       1) broadcasting the left side in a right outer join;
   *       2) broadcasting the right side in a left outer, left semi, left anti or existence join;
   *       3) broadcasting either side in an inner-like join.
   *     For other cases, we need to scan the data multiple times, which can be rather slow.
   *
   * - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join):
   *     Supports both equi-joins and non-equi-joins.
   *     Supports only inner like joins.
   */
```
### 7.3.1 Broadcast Hash Join
> 드라이버 혹은 익스큐터의 메모리 보다 충분히 작은 경우에 해당 데이터를 broadcast 변수에 담아, 상대적으로 큰 데이터가 존재하는 노드로 변수를 전달하기 때문에 map 단계에서 join 이 일어나게 되어 *map-side-join* 이라고 부르며, 조인 성능에 가장 큰 영향을 미치는 셔플이 발생하지 않게 되어 성능이 좋습니다.

#### When to use a broadcast hash join
* 작고 큰 데이터 집합의 개별 키가 스파크에 의해서 같은 파티션에 해시되어 있는 경우
  - 버킷 등을 통해 이미 동일한 노드에 저장되어 있는 경우로 추측
  - hash-join 과 같이 hash table 을 사용하는 것처럼 보이지는 않으나 확인이 필요함
* 하나의 데이터 집합이 다른 데이터 집합에 비해 훨씬 작을 때 (그리고 기본 구성 메모리가 충분한 경우 10MB 이상)
* 정렬되지 않은 키들의 매칭을 기반으로 두 데이터집합 들을 결합하기 위해서, 동등 조인을 수행하기를 워한는 경우
  - 해시 조인이기 때문에 정렬되지 않은 상태의 Equi-join 이 가능하기 때문
  - [non equi-join](https://www.essentialsql.com/non-equi-join-sql-purpose/) 은 anti-join 혹은 range-join 이 있다
* 모든 스파크 익스큐터들에 작은 데이터가 브로드캐스트 될 것이 명확해서, 네트워크 밴드나 OOM 오류를 걱정할 필요가 없을때

#### spark.sql.autoBroadcastJoinThreshold 값으로 설정을 변경할 수 있으며, 기본 값은 10m 입니다
![figure.7-6](images/figure.7-6.png)

In [14]:
from pyspark.sql.functions import *
animal = spark.createDataFrame([("Cat", 1), ("Dog", 1), ("Monkey", 2), ("Lion", 3), ("Tiger", 3)], ["name", "type"])
animal.show(truncate=False)
category = spark.createDataFrame([("Fat", 1), ("Animal", 2), ("Beast", 3)], ["category", "id"])
animal.join(category, animal.type == category.id, "left_outer").select("name", "category").show()

+------+----+
|name  |type|
+------+----+
|Cat   |1   |
|Dog   |1   |
|Monkey|2   |
|Lion  |3   |
|Tiger |3   |
+------+----+

+------+--------+
|  name|category|
+------+--------+
|   Cat|     Fat|
|   Dog|     Fat|
|  Lion|   Beast|
| Tiger|   Beast|
|Monkey|  Animal|
+------+--------+



In [15]:
animal.join(broadcast(category), animal.type == category.id, "left_outer").select("name", "category").show()

+------+--------+
|  name|category|
+------+--------+
|   Cat|     Fat|
|   Dog|     Fat|
|Monkey|  Animal|
|  Lion|   Beast|
| Tiger|   Beast|
+------+--------+



* 스파크 3.0 에서 추가된 기능으로 explain 모드를 입력할 수 있으며 simple, extended, codegen, cost, formatted 등의 옵션을 제공합니다

In [16]:
animal.join(category, animal.type == category.id, "left_outer").explain("simple")
animal.join(broadcast(category), animal.type == category.id, "left_outer").explain("formatted")

== Physical Plan ==
SortMergeJoin [type#282L], [id#295L], LeftOuter
:- *(2) Sort [type#282L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(type#282L, 200), true, [id=#277]
:     +- *(1) Scan ExistingRDD[name#281,type#282L]
+- *(4) Sort [id#295L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#295L, 200), true, [id=#282]
      +- *(3) Filter isnotnull(id#295L)
         +- *(3) Scan ExistingRDD[category#294,id#295L]


== Physical Plan ==
* BroadcastHashJoin LeftOuter BuildRight (5)
:- * Scan ExistingRDD (1)
+- BroadcastExchange (4)
   +- * Filter (3)
      +- * Scan ExistingRDD (2)


(1) Scan ExistingRDD [codegen id : 2]
Output [2]: [name#281, type#282L]
Arguments: [name#281, type#282L], MapPartitionsRDD[65] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Scan ExistingRDD [codegen id : 1]
Output [2]: [category#294, id#295L]
Arguments: [category#294, id#295L], MapPartitionsRDD[72] at applySchemaToPythonRDD at 

| 소트머지조인 | 브로드캐스트 조인 |
| --- | --- |
| ![join_shuffle](images/join_shuffle.png) | ![join_broadcast](images/join_broadcast.png) |


### 7.3.2 Shuffle Sort Merge Join
> 두개의 대용량 데이터 집합을 조인하는 가장 효과적인 알고리즘이며, 기본 설정은 spark.sql.join.preferSortMergeJoin 은 enabled 된 상태입니다.


In [17]:
spark.conf.set("spark.sql.autoBroadcastJoinThreashold", "10485760b") # default value
spark.conf.set("spark.sql.autoBroadcastJoinThreashold", "-1") # force sortMergeJoin

In [18]:
states = spark.createDataFrame([(0, "AZ"), (1, "CO"), (3, "TX"), (4, "N"), (5, "MI")], ["id", "state"])
items = spark.createDataFrame([(0, "SKU-0"), (1, "SKU-1"), (2, "SKU-2"), (3, "SKU-3"), (4, "SKU-4"), (5, "SKU-5")], ["id", "item"])
animal.show()
items.show()

+------+----+
|  name|type|
+------+----+
|   Cat|   1|
|   Dog|   1|
|Monkey|   2|
|  Lion|   3|
| Tiger|   3|
+------+----+

+---+-----+
| id| item|
+---+-----+
|  0|SKU-0|
|  1|SKU-1|
|  2|SKU-2|
|  3|SKU-3|
|  4|SKU-4|
|  5|SKU-5|
+---+-----+



In [19]:
import random

spark.conf.set("spark.sql.autoBroadcastJoinThreashold", "-1") # force sortMergeJoin

states = {0:"AZ", 1:"CO", 2:"CA", 3: "TX", 4: "NY", 5:"MI"}
items = {0:"SKU-0", 1:"SKU-1", 2:"SKU-2", 3: "SKU-3", 4: "SKU-4", 5:"SKU-5"}

usersDF = spark.range(0, 10000).rdd.map(lambda id: (id[0], "user_{}".format(id[0]), "user_{}@databricks.com".format(id[0]), states[random.randint(0, 5)])).toDF(["uid", "login", "email", "user_state"])
ordersDF = spark.range(0, 10000).rdd.map(lambda r: (r[0], r[0], random.randint(0, 10000), 10 * r[0] * 0.2, states[random.randint(0, 5)], items[random.randint(0,5)])).toDF(["transaction_id", "quantity", "users_id", "amount", "state", "items"])

# usersDF.show(truncate=False)
# ordersDF.show(truncate=False)

usersOrdersDF = ordersDF.join(usersDF, ordersDF.users_id == usersDF.uid)
usersOrdersDF.show(truncate=False)

+--------------+--------+--------+-------+-----+-----+----+---------+------------------------+----------+
|transaction_id|quantity|users_id|amount |state|items|uid |login    |email                   |user_state|
+--------------+--------+--------+-------+-----+-----+----+---------+------------------------+----------+
|8590          |8590    |964     |17180.0|AZ   |SKU-5|964 |user_964 |user_964@databricks.com |TX        |
|4464          |4464    |1677    |8928.0 |NY   |SKU-2|1677|user_1677|user_1677@databricks.com|CO        |
|9042          |9042    |1677    |18084.0|CO   |SKU-1|1677|user_1677|user_1677@databricks.com|CO        |
|5549          |5549    |1697    |11098.0|MI   |SKU-5|1697|user_1697|user_1697@databricks.com|CO        |
|7950          |7950    |1806    |15900.0|MI   |SKU-3|1806|user_1806|user_1806@databricks.com|MI        |
|2924          |2924    |2040    |5848.0 |CA   |SKU-1|2040|user_2040|user_2040@databricks.com|TX        |
|8071          |8071    |2040    |16142.0|CA  

In [20]:
usersOrdersDF.explain("formatted")

== Physical Plan ==
* SortMergeJoin Inner (9)
:- * Sort (4)
:  +- Exchange (3)
:     +- * Filter (2)
:        +- * Scan ExistingRDD (1)
+- * Sort (8)
   +- Exchange (7)
      +- * Filter (6)
         +- * Scan ExistingRDD (5)


(1) Scan ExistingRDD [codegen id : 1]
Output [6]: [transaction_id#406L, quantity#407L, users_id#408L, amount#409, state#410, items#411]
Arguments: [transaction_id#406L, quantity#407L, users_id#408L, amount#409, state#410, items#411], MapPartitionsRDD[123] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(2) Filter [codegen id : 1]
Input [6]: [transaction_id#406L, quantity#407L, users_id#408L, amount#409, state#410, items#411]
Condition : isnotnull(users_id#408L)

(3) Exchange
Input [6]: [transaction_id#406L, quantity#407L, users_id#408L, amount#409, state#410, items#411]
Arguments: hashpartitioning(users_id#408L, 200), true, [id=#417]

(4) Sort [codegen id : 2]
Input [6]: [transaction_id#406L, quantity#407L, user

#### Optimizing the shuffle sort merge join
> Sort-Merge 조인의 가장 큰 비용인 Exchange Stage 를 제거하여 성능향상을 도모할 수 있습니다. 이는 버킷을 통해 해당 데이터를 생성하는 시점에 미리 정렬해 두는 접근입니다. 즉 자주 사용되는 equi-join 의 컬럼을 기준으로 버킷 수준에서 정렬해둔다고 보시면 됩니다.

![join_exchange](images/join_exchange.png)
![join_bucket](images/join_bucket.png)

In [27]:
# spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","true")
%rm -rf "spark-warehouse/userstbl"
%rm -rf "spark-warehouse/orderstbl"

from pyspark.sql.types import *
(
    usersDF.orderBy(asc("uid"))
    .write
    .mode("overwrite")
    .format("parquet")
    .bucketBy(8, "uid")
    .saveAsTable("UsersTbl")
)

(
    ordersDF.orderBy(asc("users_id"))
    .write
    .mode("overwrite")
    .format("parquet")
    .bucketBy(8, "users_id")
    .saveAsTable("OrdersTbl")
)

spark.sql("cache table UsersTbl")
spark.sql("cache table OrdersTbl")

usersBucketDF = spark.table("UsersTbl")
ordersBucketDF = spark.table("OrdersTbl")

In [28]:
joinUsersOrdersBucketDF = ordersBucketDF.join(usersBucketDF, ordersBucketDF.users_id == usersBucketDF.uid)
joinUsersOrdersBucketDF.show(truncate=False)

+--------------+--------+--------+-------+-----+-----+---+--------+-----------------------+----------+
|transaction_id|quantity|users_id|amount |state|items|uid|login   |email                  |user_state|
+--------------+--------+--------+-------+-----+-----+---+--------+-----------------------+----------+
|4959          |4959    |2       |9918.0 |AZ   |SKU-4|2  |user_2  |user_2@databricks.com  |CA        |
|1458          |1458    |2       |2916.0 |NY   |SKU-3|2  |user_2  |user_2@databricks.com  |CA        |
|3986          |3986    |12      |7972.0 |CA   |SKU-5|12 |user_12 |user_12@databricks.com |AZ        |
|9767          |9767    |28      |19534.0|CA   |SKU-1|28 |user_28 |user_28@databricks.com |TX        |
|4833          |4833    |29      |9666.0 |MI   |SKU-1|29 |user_29 |user_29@databricks.com |CO        |
|4368          |4368    |30      |8736.0 |CO   |SKU-2|30 |user_30 |user_30@databricks.com |NY        |
|8507          |8507    |42      |17014.0|CO   |SKU-1|42 |user_42 |user_4

#### When to use a shuffle sort merge join
* 두 개의 큰 데이터 집합의 각 키를 정렬하고 동일한 파티션으로 해시 할 수 있는 경우
* 정렬된 키들의 매칭을 기반으로 두 데이터집합 들을 결합하기 위해서, 동등 조인을 수행하기를 워한는 경우
  - SortMerge 조인이기 때문에 이미 정렬된 상태의 Equi-join 이 가능하기 때문
* 네트워크를 통해 대용량 셔플 파일을 저장 시에 Exchange 와 Sort 연산을 피하고 싶을 때
  - Bucket 기법을 활용하는 예제를 고려하라는 말로 추측


## 7.4 Inspecting the Spark UI
### 7.4.1 Journey Through the Spark UI Tabs


#### Jobs and Stages
> Duration 항목을 기준으로 문제가 되는 Job, Stage 및 Task 를 추측합니다

* 확인 및 모니터링 대상 지표
  - Average Duration 시간
  - GC 에 소모되는 시간
  - Shuffle bytes/records 정보

![ch7-ui-1](images/ch7-ui-1.png)
![ch7-ui-2](images/ch7-ui-2.png)

![ch7-ui-3](images/ch7-ui-3.png)
![ch7-ui-4](images/ch7-ui-4.png)
![ch7-ui-5](images/ch7-ui-5.png)
![ch7-ui-6](images/ch7-ui-6.png)
![ch7-ui-7](images/ch7-ui-7.png)
![ch7-ui-8](images/ch7-ui-8.png)

## 7.5 Summary


## 추가로 학습할 내용들
* [Tuning Apache Spark for Large Scale Workloads - Sital Kedia & Gaoxiang Liu](https://www.youtube.com/watch?v=5dga0UT4RI8)
* [Hive Bucketing in Apache Spark - Tejas Patil](https://www.youtube.com/watch?v=6BD-Vv-ViBw)
* [How does Facebook tune Apache Spark for Large-Scale Workloads?](https://towardsdatascience.com/how-does-facebook-tune-apache-spark-for-large-scale-workloads-3238ddda0830)
* [External Shuffle Service in Apache Spark](https://www.waitingforcode.com/apache-spark/external-shuffle-service-apache-spark/read)
* [Spark Internal Part 2. Spark의 메모리 관리(2)](https://medium.com/@leeyh0216/spark-internal-part-2-spark%EC%9D%98-%EB%A9%94%EB%AA%A8%EB%A6%AC-%EA%B4%80%EB%A6%AC-2-db1975b74d2f)
* [Why You Should Care about Data Layout in the Filesystem](https://databricks.com/session/why-you-should-care-about-data-layout-in-the-filesystem)
* [Five distinct join strategies](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L111)