In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.7:4042
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1644049317799)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7214d648


스파크의 저수준 API에는 RDD 인터페이스 외에도 두 번째 유형인 '분산형 공유 변수'가 있음<br/>
**분산형 공유 변수에는 브로드캐스트 변수와 어큐뮬레이터라는 2개의 타입이 존재함**<br/>
클러스터에서 실행할 때 특별한 속성을 가진 사용자 정의 함수(예: RDD나 DataFrame을 다루는 map 함수)에서 이 변수를 사용할 수 있음<br/>
특히 **어큐뮬레이터**를 사용하면 모든 태스크의 데이터를 공유 결과에 추가할 수 있음<br/>
예를 들어 job의 입력 레코드를 파싱하면서 얼마나 많은 오류가 발생했는지 확인하는 카운터를 구현할 수 있음<br/>
반면 **브로드캐스트 변수**를 사용하면 모든 워커 노드에 큰 값을 저장하므로 재전송 없이 많은 스파크 액션에서 재사용할 수 있음<br/>
이 장에서는 분산형 공유 변수 타입이 만들어지게 된 계기와 사용 방법에 대해 알아보겠음<br/>

# 14.1 브로드캐스트 변수

브로드캐스트 변수는 변하지 않는 값을 closure 함수의 변수로 캡슐화하지 않고 클러스터에서 효율적으로 공유하는 방법을 제공함<br/>
태스크에서 드라이버 노드의 변수를 사용할 때는 closure 함수 내부에서 단순하게 참조하는 방법을 사용함<br/>
하지만 이 방법은 비효율적임<br/>
특히 룩업 테이블이나 머신러닝 모델 같은 큰 변수를 사용하는 경우 더 비효율적임<br/>
그 이유는 closure 함수에서 변수를 사용할 때 워커 노드에서 여러 번(태스크당 1번) 역직렬화가 일어나기 때문임<br/>
게다가 여러 스파크 액션과 job에서 동일한 변수를 사용하면 job을 실행할 때마다 워커로 큰 변수를 재전송함<br/>

이런 상황에서는 브로캐스트 변수를 사용해야 함<br/>
브로드캐스트 변수는 모든 테스크마다 직렬화하지 않고 클러스터의 모든 머신에 캐시하는 불변성 공유 변수임<br/>
executor 메모리 크기에 맞는 조회용 테이블을 전달하고 함수에서 사용하는 것이 대표적인 예임<br/>

클로저의 개념은 [이 블로그 글](https://poiemaweb.com/js-closure)을 참고할 것<br/>

예를 들어 다음 예제처럼 단어나 값의 목록을 가지고 가지고 있다고 가정하자<br/>

In [2]:
val myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(" ")
val words = spark.sparkContext.parallelize(myCollection, 2)

myCollection: Array[String] = Array(Spark, The, Definitive, Guide, :, Big, Data, Processing, Made, Simple)
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25


수 킬로바이트, 메가바이트, 기가바이트 크기를 가진 다른 정보와 함께 단어 목록을 추가해야 할 수도 있음<br/>
이 처리는 SQL의 오른쪽 조인에 해당함<br/>

In [3]:
val supplementalData = Map("Spark" -> 1000, "Definitive" -> 200, "Big" -> -300, "Simple" -> 100)

supplementalData: scala.collection.immutable.Map[String,Int] = Map(Spark -> 1000, Definitive -> 200, Big -> -300, Simple -> 100)


이 구조체를 스파크에 브로드캐스트할 수 있으며 suppBroadcast 변수를 이용해 참조함<br/>
이 값은 불변성이며 액션을 실행할 때 클러스터의 모든 노드에 지연 처리 방식으로 복제됨<br/>

In [4]:
val suppBroadcast = spark.sparkContext.broadcast(supplementalData)

suppBroadcast: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Int]] = Broadcast(0)


suppBroadcast 변수의 value 메서드를 사용해 위 예제에서 브로드캐스트된 supplementalData 값을 참조할 수 있음<br/>
value 메서드는 직렬화된 함수에서 브로드캐스트된 데이터를 직렬화하지 않아도 접근할 수 있음<br/>
**스파크는 브로드캐스트 기능을 이용해 데이터를 보다 효율적으로 전송하므로 직렬화와 역직렬화에 대한 부하를 크게 줄일 수 있음**<br/>

In [5]:
suppBroadcast.value

res1: scala.collection.immutable.Map[String,Int] = Map(Spark -> 1000, Definitive -> 200, Big -> -300, Simple -> 100)


이제 브로드캐스트된 데이터를 사용해 RDD를 변환할 수 있음<br/>
다음 예제에서는 맵 연산의 처리 과정에 따라 키-값 쌍 데이터를 생성함<br/>
값이 비어 있는 경우 간단하게 0으로 치환함<br/>

In [6]:
words.map(word => (word, suppBroadcast.value.getOrElse(word, 0)))
    .sortBy(wordPair => wordPair._2)
    .collect()

res2: Array[(String, Int)] = Array((Big,-300), (The,0), (Guide,0), (:,0), (Data,0), (Processing,0), (Made,0), (Simple,100), (Definitive,200), (Spark,1000))


브로드캐스트 변수를 사용한 방식이 클로저에 담아 전달하는 방식보다 훨씬 더 효율적<br/>
물론 데이터의 총량과 executor 수에 따라 다를 수 있으며 아주 작은 데이터(수 KB 정도)를 작은 클러스터에서 돌린다면 크게 차이 나지 않을 수 있음<br/>
브로드캐스트 변수에 작은 크기의 dictionary 타입을 사용한다면 큰 부하가 발생하지 않음<br/>
하지만 훨씬 큰 크기의 데이터를 사용하는 경우, 전체 태스크에서 데이터를 직렬화하는 데 발생하는 부하가 매우 커질 수 있음<br/>

여기서 한 가지 더 주목할 점은 RDD 영역에서 브로드캐스트 변수를 사용했다는 것<br/>
그리고 UDF나 Dataset에서도 사용할 수 있으며 동일한 효과를 얻을 수 있음<br/>

# 14.2 어큐뮬레이터

어큐뮬레이터는 스파크의 2번째 공유 변수 타입임<br/>
어큐뮬레이터는 transformation 내부의 다양한 값을 갱신하는 데 사용함<br/>
그리고 내결함성을 보장하면서 효율적인 방식으로 드라이버에 값을 전달할 수 있음<br/>

**어큐뮬레이터는 스파크 클러스터에서 row 단위로 안전하게 값을 갱신할 수 있는 변경 가능한 변수를 제공함**<br/>
그리고 디버깅용이나 저수준 집계 생성용으로 사용할 수 있음<br/>
ex) 파티션별로 특정 변수의 값을 추적하는 용도로 사용할 수 있으며 시간이 흐를수록 더 유용하게 사용됨<br/>
어큐뮬레이터는 결합성과 가환성을 가진 연산을 통해서만 더할 수 있는 변수이므로 병렬 처리 과정에서 효율적으로 사용할 수 있음<br/>
어큐뮬레이터는 카운터(맵리듀스의 카운터와 같은)나 합계를 구하는 용도로 사용할 수 있음<br/>
스파크는 기본적으로 수치형 어큐뮬레이터를 지원하며 사용자 정의 어큐뮬레이터를 만들어 사용할 수도 있음<br/>

어큐뮬레이터의 값은 **액션**을 처리하는 과정에서만 갱신됨<br/>
*스파크는 각 태스크에서 어큐뮬레이터를 한 번만 갱신하도록 제어함<br/>
따라서 재시작한 태스크는 어큐뮬레이터값을 갱신할 수 없음*<br/>
transformation에서 태스크나 job 스테이지를 재처리하는 경우 각 태스크의 갱신 작업이 두 번 이상 적용될 수 있음<br/>

어큐뮬레이터는 스파크의 지연 연산 모델에 영향을 주지 않음<br/>
어큐뮬레이터가 RDD 처리 중에 갱신되면 RDD 연산이 실제로 수행된 시점, 즉 특정 RDD나 그 RDD의 부모 RDD에 액션을 실행하는 시점에 딱 한 번만 값을 갱신함<br/>
따라서 map 함수 같은 지연 처리 형태의 transformation에서 어큐뮬레이터 갱신 작업 수행하는 경우 실제 실행 전까지는 어큐뮬레이터가 갱신되지 않음<br/>

어큐뮬레이터의 이름은 선택적으로 지정할 수 있음<br/>
이름이 지정된 어큐뮬레이터의 실행 결과는 스파크 UI에 표시되며, 이름이 지정되지 않은 어큐뮬레이터의 경우 스파크 UI에 표시되지 않음<br/>

## 14.2.1 기본 예제

이전에 만들었던 항공운항 데이터셋에 사용자 정의 집계를 수행하면서 어큐뮬레이터를 실험해보겠음<br/>
다음 예제는 RDD API가 아닌 Dataset API를 사용함<br/>

In [10]:
import spark.implicits._

case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flights = spark.read
    .parquet("Downloads/Spark-The-Definitive-Guide/data/flight-data/parquet/2010-summary.parquet")
    .as[Flight]

import spark.implicits._
defined class Flight
flights: org.apache.spark.sql.Dataset[Flight] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


이제 출발지나 도착지가 중국인 항공편의 수를 구하는 어큐뮬레이터를 생성하자<br/>
이런 유형의 집계는 SQL로 처리할 수 있음<br/>
하지만 어큐뮬레이터를 사용해 프로그래밍 방식으로 처리해보겠음<br/>
다음 예제는 이름이 지정되지 않은 어큐뮬레이터를 생성함<br/>

In [11]:
import org.apache.spark.util.LongAccumulator

val accUnnamed = new LongAccumulator
val acc = spark.sparkContext.register(accUnnamed)

import org.apache.spark.util.LongAccumulator
accUnnamed: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 101, name: None, value: 0)
acc: Unit = ()


이 예제에는 이름이 지정된 어큐뮬레이터가 적합함<br/>
어큐뮬레이터를 만드는 가장 간단한 방법은 SparkContext를 사용하는 것임<br/>
아니면 직접 어큐뮬레이터를 생성하고 이름을 붙여 등록할 수도 있음<br/>

In [12]:
val accChina = new LongAccumulator
val accChina2 = spark.sparkContext.longAccumulator("China")

spark.sparkContext.register(accChina, "China")

accChina: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 103, name: Some(China), value: 0)
accChina2: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 102, name: Some(China), value: 0)


함수의 파라미터로 문자열 값을 전달하거나 register 함수의 두 번째 파라미터를 사용해 이름을 지정할 수 있음<br/>
이름이 지정된 어큐뮬레이터는 실행 결과를 스파크 UI에서 확인할 수 있으며 이름이 지정되지 않았다면 스파크 UI에서 확인할 수 없음<br/>

다음은 어큐뮬레이터에 값을 더하는 방법을 정의하는 단계임<br/>
다음 예제의 함수는 직관적인 형태로 구성되어 있음<br/>

In [13]:
def accChinaFunc(flight_row: Flight) = {
    val destination = flight_row.DEST_COUNTRY_NAME
    val origin = flight_row.ORIGIN_COUNTRY_NAME
    
    if (destination == "China") {
        accChina.add(flight_row.count.toLong)
    }
    if (origin == "China") {
        accChina.add(flight_row.count.toLong)
    }
}

accChinaFunc: (flight_row: Flight)Unit


이제 foreach 메서드를 사용해 항공운항 데이터셋의 전체 로우를 처리해보겠음<br/>
이렇게 하는 이유는 foreach 메서드가 액션이고, 스파크는 액션에서만 어큐뮬레이터의 실행을 보장하기 때문<br/>
foreach 메서드는 입력 DataFrame의 매 로우마다 함수를 한 번씩 적용해 어큐뮬레이터 값을 증가시킴<br/>

In [14]:
flights.foreach(flight_row => accChinaFunc(flight_row))

이 연산은 상당히 빨리 종료됨<br/>
스파크 UI에서 executor 단위로 어큐뮬레이터 값을 확인할 수 있음<br/>

어큐뮬레이터 값을 프로그래밍 방식으로 조회할 수도 있음<br/>
이 경우 value 속성을 사용함

In [15]:
accChina.value

res5: Long = 953


## 14.2.2 사용자 정의 어큐뮬레이터

스파크는 몇 가지 기본 어큐뮬레이터를 제공함<br/>
하지만 때에 따라 사용자 정의 어큐뮬레이터가 필요할 수도 있음<br/>
**어큐뮬레이터를 직접 정의하려면 AccumulatorV2 클래스를 상속받아야 함**<br/>
다음 예제와 같이 구현해야 하는 추상 메서드가 몇 가지 있음<br/>
다음은 어큐뮬레이터에 짝수값만 더하는 예제임<br/>
아주 간단하지만 사용자 정의 어큐뮬레이터를 구현하는 과정이 얼마나 쉬운지 알 수 있음<br/>

In [26]:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2

val arr = ArrayBuffer[BigInt]()

class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
  private var num:BigInt = 0
  def reset(): Unit = {
    this.num = 0
  }
  def add(intValue: BigInt): Unit = {
    if (intValue % 2 == 0) {
        this.num += intValue
    }
  }
  def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = {
    this.num += other.value
  }
  def value():BigInt = {
    this.num
  }
  def copy(): AccumulatorV2[BigInt,BigInt] = {
    new EvenAccumulator
  }
  def isZero():Boolean = {
    this.num == 0
  }
}
val acc = new EvenAccumulator
val newAcc = sc.register(acc, "evenAcc")

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2
arr: scala.collection.mutable.ArrayBuffer[BigInt] = ArrayBuffer()
defined class EvenAccumulator
acc: EvenAccumulator = EvenAccumulator(id: 290, name: Some(evenAcc), value: 0)
newAcc: Unit = ()


In [28]:
acc.value // 0

res14: BigInt = 0


In [32]:
flights.foreach(flight_row => acc.add(flight_row.count))
acc.value // 31390

org.apache.spark.SparkException:  Job aborted due to stage failure: Exception while getting task result: java.io.IOException: java.lang.ClassNotFoundException: EvenAccumulator

위 코드에서 에러가 나는 이유를 모르겠음<br/>

에러 메시지에 java.io.IOException: java.lang.ClassNotFoundException: EvenAccumulator 문구가 있으므로 클래스를 정의한 코드와 커맨드를 함께 실행하는 경우 아래와 같은 다른 에러 메시지가 뜸 <br/>

In [33]:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.AccumulatorV2

val arr = ArrayBuffer[BigInt]()

class EvenAccumulator extends AccumulatorV2[BigInt, BigInt] {
  private var num:BigInt = 0
  def reset(): Unit = {
    this.num = 0
  }
  def add(intValue: BigInt): Unit = {
    if (intValue % 2 == 0) {
        this.num += intValue
    }
  }
  def merge(other: AccumulatorV2[BigInt,BigInt]): Unit = {
    this.num += other.value
  }
  def value():BigInt = {
    this.num
  }
  def copy(): AccumulatorV2[BigInt,BigInt] = {
    new EvenAccumulator
  }
  def isZero():Boolean = {
    this.num == 0
  }
}
val acc = new EvenAccumulator
val newAcc = sc.register(acc, "evenAcc")

flights.foreach(flight_row => acc.add(flight_row.count))
acc.value // 31390

org.apache.spark.SparkException:  Job aborted due to stage failure: Exception while getting task result: java.io.IOException: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.BoundReference.accessor of type scala.Function2 in instance of org.apache.spark.sql.catalyst.expressions.BoundReference

나중에 에러의 원인을 찾아내보겠다<br/>

# 14.3 정리

이 장에서는 분산형 공유 변수를 알아보았음<br/>
**분산형 공유 변수는 디버깅이나 최적화 작업에 유용한 도구임**<br/>
다음 장에서는 분산형 공유 변수가 언제 도움이 되는지 이해하기 위해 클러스터 환경에서 스파크가 동작하는 방식에 대해 알아보겠음<br/>