# 스파크 스트리밍 실습 : 카프카 스트리밍 애플리케이션

> 플루언트디를 통해 생성된 `movies` 토픽의 메시지를 스파크 스트리밍 애플리케이션을 통해 `kor_movies` 토픽으로 저장합니다

## 학습 목표
* `Kafka`에 저장된 스트리밍 데이터를 처리합니다
  - 카프카 메시지 프로듀서를 통해서 카프카에 스트림 데이터를 생성합니다
* [Streaming Kafka Application](https://spark.apache.org/docs/3.2.1/streaming-kafka-0-10-integration.html) 예제 참고하여 개발 합니다.


## 참고 사항
> [Streaming update trait SupportsStreamingUpdate from Spark 3.0.0 has been renamed to SupportsStreamingUpdateAsAppend in Spark 3.1.0.](https://enterprise.aerospike.com/enterprise/download/connectors/aerospike-spark/notes.html) 에서 보면 클래스 이름이 변경되면서 발생하는 오류 
* 스파크 버전을 3.0.0 버전으로 낮추는 방법도 있으나, /usr/local/spark/jars 경로에 있는 kafka-streaming 버전이 3.0.0 버전이라 발생
* 해당 버전을 패치하면서 jars 추가 하여 다시 빌드하였으나 이번에는 requirements.txt 파일에 pyspark 버전이 올라가면서 다른 오류 발생
* requirements.txt 파일을 data-engineer-notebook:1.7.2 버전 기준으로 requirements.txt 파일 버전을 명시해서 다시 빌드 하여 해결


In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

# 현재 기동된 스파크 애플리케이션의 포트를 확인하기 위해 스파크 정보를 출력합니다
spark

In [2]:
# 스트림 테이블을 주기적으로 조회하는 함수 (name: 이름, sql: Spark SQL, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStream(name, sql, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)              # 출력 Cell 을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Query: '+sql)
        display(spark.sql(sql))              # Spark SQL 을 수행합니다
        sleep(sleep_secs)                    # sleep_secs 초 만큼 대기합니다
        i += 1

# 스트림 쿼리의 상태를 주기적으로 조회하는 함수 (name: 이름, query: Streaming Query, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStatus(name, query, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)      # Output Cell 의 내용을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Status: '+query.status['message'])
        display(query.lastProgress)  # 마지막 수행된 쿼리의 상태를 출력합니다
        sleep(sleep_secs)            # 지정된 시간(초)을 대기합니다
        i += 1

In [3]:
kafkaReader = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9093") \
  .option("subscribe", "movies") \
  .option("startingOffsets", "earliest") \
  .load()

kafkaReader.printSchema()

# {"movie":"10225","title":"핑크 팬더 6 - 핑크 팬더의 추적","title_eng":"Trail Of The Pink Panther , 1982","year":0,"grade":"PG","time":"2022-07-17 04:19:42"}
kafkaSchema = (
    StructType()
    .add(StructField("movie", StringType()))
    .add(StructField("title", StringType()))
    .add(StructField("title_eng", StringType()))
    .add(StructField("year", IntegerType()))
    .add(StructField("grade", StringType()))
    .add(StructField("time", StringType()))
)

# root
#  |-- key: binary (nullable = true)
#  |-- value: binary (nullable = true)
#  |-- topic: string (nullable = true)
#  |-- partition: integer (nullable = true)
#  |-- offset: long (nullable = true)
#  |-- timestamp: timestamp (nullable = true)
#  |-- timestampType: integer (nullable = true)

kafkaSelector = (
    kafkaReader
    .select(
        col("key").cast("string"),
        from_json(col("value").cast("string"), kafkaSchema).alias("movies")
    )
    # .selectExpr("movies.title as title", "movies.year as year")
    .selectExpr("movies.movie as key", "to_json(struct(movies.*)) as value")
)

kafkaSelector.printSchema()

# root
#  |-- title: string (nullable = true)
#  |-- year: integer (nullable = true)


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [4]:
# 노트북 로그 콘솔로 출력

queryName = "consoleSink"
kafkaWriter = (
    kafkaSelector.select("key", "value")
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

kafkaTrigger = (
    kafkaWriter
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

kafkaQuery = kafkaTrigger.start()

# 파이썬의 경우 콘솔 디버깅이 노트북 표준출력으로 나오기 때문에, 별도 메모리 테이블로 조회
displayStream(queryName, f"select * from {queryName} order by key desc", 4, 5)
kafkaQuery.stop()

'[consoleSink] Iteration: 5, Query: select * from consoleSink'

key,value
movie,"{""movie"":""movie"",""title"":""title"",""title_eng"":""title_eng"",""year"":0,""grade"":""grade""}"
10001,"{""movie"":""10001"",""title"":""시네마 천국"",""title_eng"":""Cinema Paradiso , 1988"",""year"":2013,""grade"":""전체 관람가""}"
10002,"{""movie"":""10002"",""title"":""빽 투 더 퓨쳐"",""title_eng"":""Back To The Future , 1985"",""year"":2015,""grade"":""..."
10003,"{""movie"":""10003"",""title"":""빽 투 더 퓨쳐 2"",""title_eng"":""Back To The Future Part 2 , 1989"",""year"":2015,..."
10004,"{""movie"":""10004"",""title"":""빽 투 더 퓨쳐 3"",""title_eng"":""Back To The Future Part III , 1990"",""year"":199..."
10005,"{""movie"":""10005"",""title"":""스타워즈 에피소드 4 - 새로운 희망"",""title_eng"":""Star Wars , 1977"",""year"":1997,""grade..."
10006,"{""movie"":""10006"",""title"":""스타워즈 에피소드 5 - 제국의 역습"",""title_eng"":""Star Wars Episode V: The Empire Stri..."
10007,"{""movie"":""10007"",""title"":""스타워즈 에피소드 6 - 제다이의 귀환"",""title_eng"":""Star Wars: Episode VI: Return Of Th..."
10008,"{""movie"":""10008"",""title"":""슈퍼맨"",""title_eng"":""Superman , 1978"",""year"":1979,""grade"":""PG""}"
10009,"{""movie"":""10009"",""title"":""슈퍼맨 2"",""title_eng"":""Superman II , 1980"",""year"":1981,""grade"":""PG""}"


In [4]:
# 카프카로 다시 저장

queryName = "kafkaSink"
kafkaWriter = (
    kafkaSelector.select("key", "value")
    .writeStream
    .queryName(queryName)
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("topic", "korean_movies")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

kafkaTrigger = (
    kafkaWriter
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

kafkaQuery = kafkaTrigger.start()

displayStatus(queryName, kafkaQuery, 100, 10)
kafkaQuery.stop()

'[kafkaSink] Iteration: 55, Status: Waiting for next trigger'

{'id': '44b5d609-99e3-4728-968b-4449bae9012d',
 'runId': '64e20cbf-9a01-411a-a322-ee76d50e6aaa',
 'name': 'kafkaSink',
 'timestamp': '2022-07-17T07:48:25.000Z',
 'batchId': 54,
 'numInputRows': 10,
 'inputRowsPerSecond': 2.0,
 'processedRowsPerSecond': 14.88095238095238,
 'durationMs': {'addBatch': 543,
  'getBatch': 0,
  'latestOffset': 3,
  'queryPlanning': 8,
  'triggerExecution': 672,
  'walCommit': 54},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[movies]]',
   'startOffset': {'movies': {'0': 646}},
   'endOffset': {'movies': {'0': 656}},
   'latestOffset': {'movies': {'0': 656}},
   'numInputRows': 10,
   'inputRowsPerSecond': 2.0,
   'processedRowsPerSecond': 14.88095238095238,
   'metrics': {'avgOffsetsBehindLatest': '0.0',
    'maxOffsetsBehindLatest': '0',
    'minOffsetsBehindLatest': '0'}}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@77c87b5a',
  'numOutputRows': 10}}

KeyboardInterrupt: 

In [5]:
kafkaQuery.stop()