# 스파크 스트리밍 실습 5교시 : 스트림 집계 연산

> 스트림 데이터에 대하여 상태를 이용하여 시간에 기반하지 않는 혹은 이벤트 타임에 기반한 다양한 집계함수를 학습합니다

## 학습 목표
* 스테이프풀 스트리밍에 대해 이해합니다
  - 시간에 기반하지 않는 집계함수인 전역, 그룹 별 집계함수를 학습합니다
  - 이벤트 타임에 기반한 다양한 윈도우(텀블링, 슬라이딩 등)를 활용한 집계를 학습합니다

## 목차
* [1. 스테이트풀 스트리밍 집계](#1.-스테이트풀-스트리밍-집계)
  - [1.1 시간에 기반하지 않는 집계](#1.1-시간에-기반하지-않는-집계)
  - [1.2 이벤트타임 윈도우 기반 집계](#1.2-이벤트타임-윈도우-기반-집계)


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

# 현재 기동된 스파크 애플리케이션의 포트를 확인하기 위해 스파크 정보를 출력합니다
spark

In [2]:
# 스트림 테이블을 주기적으로 조회하는 함수 (name: 이름, sql: Spark SQL, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStream(name, sql, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)              # 출력 Cell 을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Query: '+sql)
        display(spark.sql(sql))              # Spark SQL 을 수행합니다
        sleep(sleep_secs)                    # sleep_secs 초 만큼 대기합니다
        i += 1

# 스트림 쿼리의 상태를 주기적으로 조회하는 함수 (name: 이름, query: Streaming Query, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStatus(name, query, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)      # Output Cell 의 내용을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Status: '+query.status['message'])
        display(query.lastProgress)  # 마지막 수행된 쿼리의 상태를 출력합니다
        sleep(sleep_secs)            # 지정된 시간(초)을 대기합니다
        i += 1

<br>

## 1. 스테이트풀 스트리밍 집계

> 집계의 경우 반드시 상태가 필요하며, 크게 시간에 기반한 집계와 그렇지 않은 집계로 구분할 수 있습니다


### 1.1 시간에 기반하지 않는 집계

#### 1.1.1 Global aggregations - 스트림 데이터 전역적인 집계함수

* 스트리밍의 경우 **데이터프레임에 직접 빈도 함수를 호출할 수 없습**니다 (bounded vs. unbounded)

```python
runningCount = sensorReadings.groupBy().count()
```

<br>

#### 1.1.2 Grouped aggregations - 특징 그룹필드에 대한 집계함수

* [All built-in aggregation functions](https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html#aggregate-functions)

```python
baselineValues = sensorReadings.groupBy("sensorId").mean("value")
```
  - sum, mean, stddev, countDistinct, collect_set, approx_count_distinct 등

<br>

#### 1.1.3 Multiple aggregations computed together - 다수의 집계 연산
```python
multipleAggs = (
    sensorReadings
    .groupBy("sensorId")
    .agg(
        count("*")
        , mean("value").alias("baselineValue")
        , collect_set("errorCode").alias("allErrorCodes")
    )
)
```
<br>

#### 1.1.4 [User-defined aggregation functions](https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html#untyped-user-defined-aggregate-functions)
* 이용자 정의 집계함수는 Jar 형태로 배포 및 등록 되어야 하므로, Scala, Java 언어만 지원하며, 등록된 UDF, UDAF 등을 SQL에서 사용할 수 있습니다

#### 1.1.5 시간을 기준으로 하지 않는 집계의 경우 유의해야 할 2가지
* ***1. Output Mode*** : 워터마크를 사용할 수 없으므로 출력 모드가 제한적이다
* ***2. Planning the resource usage by state*** : 워터마크 사용이 불가하므로 특정 상태에 과도한 메모리 사용이 발생할 수 있다


<br>

### 1.2 이벤트타임 윈도우 기반 집계

> 수신되는 전체 데이터의 집계 대신 '타임윈도우'에 근거한 집계를 하려고 합니다. 예를 들어 비정상적으로 많은 빈도를 나타내는 센서를 인식하는 등의 임의의 단위시간 내의 빈도를 측정할 수 있습니다

* 애플리케이션의 강건성을 확보하기 위하여, 센서 데이터가 단위 시간 당 생성되는 빈도 혹은 양을 기준으로 인터벌을 정해야만 합니다. 
* 실제 사건이 발생한 시간을 나타내는 *event time* 기준으로 데이터의 타임 윈도우를 측정합니다
  - 아래의 예제에서는 eventTime 컬럼을 기준으로 5분 단위 윈도우를 구성하여 계산합니다
```python
from pyspark.sql.functions import *
(
    sensorReadings
    .groupBy("sensorId", window("eventTime", "5 minute"))
    .count()
)
```
* 데이터가 발생한 시간기준으로 단위시간 5분 내 이벤트를
* sensorId 키를 기준으로 복합 그룹을 통해 계산하며
* 복합 그룹 내의 빈도를 갱신합니다


#### 1.2.1 `5분 텀플링 윈도우` (non-overlapping)

![figure.8-7](images/figure.8-7.png)



In [16]:
queryName = "tumbling-stream"
tumblingSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)
tumblingSource = f"{work_dir}/data/{queryName}"
tumblingReader = (
    spark
    .readStream
    .format("json")
    .schema(tumblingSchema)
    .load(tumblingSource)
)

tumblingCounter = (
    tumblingReader
    .groupBy("emp_id", window("timestamp", "5 minutes"))
    .count()
)

tumblingWriter = (
    tumblingCounter
    .writeStream
    .format("console")
    .outputMode("complete")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
tumblingTrigger = (
    tumblingWriter
    .trigger(processingTime="3 second")
    .option("checkpointLocation", checkpointLocation)
    .option("truncate", "false")  # console 출력에서 너무 긴 경우 자르는 옵션(default=true)
)

tumblingQuery = tumblingTrigger.start()
tumblingQuery.awaitTermination(10)
tumblingQuery.stop()

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------------------------------------------+-----+
|emp_id|window                                    |count|
+------+------------------------------------------+-----+
|1     |{2016-03-20 12:20:00, 2016-03-20 12:25:00}|3    |
|2     |{2016-03-20 12:35:00, 2016-03-20 12:40:00}|1    |
|1     |{2016-03-20 15:05:00, 2016-03-20 15:10:00}|1    |
|3     |{2016-03-20 11:45:00, 2016-03-20 11:50:00}|1    |
|4     |{2016-03-20 12:50:00, 2016-03-20 12:55:00}|2    |
+------+------------------------------------------+-----+



> 텀블링 윈도우 데이터 생성 및 결과 확인을 위해 예제 데이터를 생성하고 파일스트리밍 결과를 확인합니다. 집계함수의 complete 모드이지만 정렬은 되지 않았으나, 정해진 타임슬롯에 전체 데이터를 출력할 수 있었습니다

```bash
notebook  | -------------------------------------------
notebook  | Batch: 0
notebook  | -------------------------------------------
notebook  | +------+------------------------------------------+-----+
notebook  | |emp_id|window                                    |count|
notebook  | +------+------------------------------------------+-----+
notebook  | |1     |[2016-03-20 15:05:00, 2016-03-20 15:10:00]|1    |
notebook  | |4     |[2016-03-20 12:50:00, 2016-03-20 12:55:00]|2    |
notebook  | |2     |[2016-03-20 12:35:00, 2016-03-20 12:40:00]|1    |
notebook  | |3     |[2016-03-20 11:45:00, 2016-03-20 11:50:00]|1    |
notebook  | |1     |[2016-03-20 12:20:00, 2016-03-20 12:25:00]|3    |
notebook  | +------+------------------------------------------+-----+
```

<br>

#### 1.2.2 `5분 오버랩 10분 슬라이딩 윈도우` (overlapping)



![figure.8-8](images/figure.8-8.png)

> 이전 윈도우와 오버랩되는 구간이 발생하는 슬라이딩 윈도우 (윈도우는 5분이고, 윈도우 간에 5분씩 오버랩)


In [17]:
queryName = "sliding-stream"
slidingSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)

slidingSource = f"{work_dir}/data/{queryName}"
slidingReader = (
    spark
    .readStream
    .format("json")
    .schema(slidingSchema)
    .load(slidingSource)
)

slidingCounter = (
    slidingReader
    .groupBy("emp_id", window("timestamp", "10 minute", "5 minute"))
    .count()
)

slidingWriter = (
    slidingCounter
    .writeStream
    .format("console")
    .outputMode("complete")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
slidingTrigger = (
    slidingWriter
    .option("checkpointLocation", checkpointLocation)
    .option("truncate", "false")
)

slidingQuery = slidingTrigger.start()
slidingQuery.awaitTermination(10)
slidingQuery.stop()

-------------------------------------------
Batch: 0
-------------------------------------------
+------+------------------------------------------+-----+
|emp_id|window                                    |count|
+------+------------------------------------------+-----+
|3     |{2016-03-20 11:45:00, 2016-03-20 11:55:00}|1    |
|2     |{2016-03-20 12:30:00, 2016-03-20 12:40:00}|1    |
|3     |{2016-03-20 11:40:00, 2016-03-20 11:50:00}|1    |
|1     |{2016-03-20 12:15:00, 2016-03-20 12:25:00}|3    |
|4     |{2016-03-20 12:45:00, 2016-03-20 12:55:00}|2    |
|4     |{2016-03-20 12:50:00, 2016-03-20 13:00:00}|2    |
|1     |{2016-03-20 12:20:00, 2016-03-20 12:30:00}|3    |
|1     |{2016-03-20 15:00:00, 2016-03-20 15:10:00}|1    |
|2     |{2016-03-20 12:35:00, 2016-03-20 12:45:00}|1    |
|1     |{2016-03-20 15:05:00, 2016-03-20 15:15:00}|1    |
+------+------------------------------------------+-----+



> 슬라이딩 윈도우의 경우 5분 오버랩 윈도우를 통해 겹치는 구간이 존재하도록 출력됩니다 

```bash
notebook  | -------------------------------------------
notebook  | Batch: 0
notebook  | -------------------------------------------
notebook  | +------+------------------------------------------+-----+
notebook  | |emp_id|window                                    |count|
notebook  | +------+------------------------------------------+-----+
notebook  | |3     |[2016-03-20 11:40:00, 2016-03-20 11:50:00]|1    |
notebook  | |1     |[2016-03-20 12:20:00, 2016-03-20 12:30:00]|3    |
notebook  | |1     |[2016-03-20 12:15:00, 2016-03-20 12:25:00]|3    |
notebook  | |4     |[2016-03-20 12:45:00, 2016-03-20 12:55:00]|2    |
notebook  | |4     |[2016-03-20 12:50:00, 2016-03-20 13:00:00]|2    |
notebook  | |3     |[2016-03-20 11:45:00, 2016-03-20 11:55:00]|1    |
notebook  | |1     |[2016-03-20 15:05:00, 2016-03-20 15:15:00]|1    |
notebook  | |2     |[2016-03-20 12:30:00, 2016-03-20 12:40:00]|1    |
notebook  | |1     |[2016-03-20 15:00:00, 2016-03-20 15:10:00]|1    |
notebook  | |2     |[2016-03-20 12:35:00, 2016-03-20 12:45:00]|1    |
notebook  | +------+------------------------------------------+-----+
```

> 이와 같이 누적으로 모든 데이터를 담고 있다면 리소스 사용 관점에서 지속적으로 늘어나는 상태를 관리하기 어려울 것이므로, 최신 데이터만 유지하되, 어느정도 까지 데이터 입력지연을 감내할 것인지를 정의해야 합니다. 이러한 쿼리의 바운드의 범위를 ***watermarks*** 라고 부릅니다.

<br>

#### 1.2.3 `워터마크를 통한 지연 데이터 처리`

> ***watermark***는 처리된 데이터 내의 쿼리에 의해 발견된 **maximum event time** 이후에 발생하는 **이벤트 타임의 임계치 값**이라고 말할 수 있습니다.

* 주어진 그룹에 더 이상 데이터가 도달하지 않는다는 것을 아는 시점을 말하며, 엔진에 의해서 자동적으로 해당 집계가 완료됨을 인지할 수 있습니다.


![figure.8-9](images/figure.8-9.png)


In [17]:
queryName = "watermark-stream"

watermarkSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)
watermarkSource = f"{work_dir}/data/{queryName}"
watermarkReader = (
    spark
    .readStream
    .format("json")
    .schema(watermarkSchema)
    .load(watermarkSource)
)

watermarkCounter = (
    watermarkReader
    .withWatermark("timestamp", "5 minutes")
    .groupBy("emp_id", window("timestamp", "30 minute", "30 minute"))
    .count()
)

watermarkWriter = (
    watermarkCounter
    .writeStream
    .format("console")
    .outputMode("complete")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
watermarkTrigger = (
    watermarkWriter
    .option("checkpointLocation", checkpointLocation)
    .option("truncate", "false")
)

watermarkQuery = watermarkTrigger.start()
watermarkQuery.awaitTermination(10)
watermarkQuery.stop()

> 파일스트림의 경우 하나의 파일내의 순서와 무관하게 병렬처리하게 되어 한 번에 동시에 온 것처럼 인식되어 워터마크와 딜레이 영향을 받지 않는 것 처럼 보인다

<br>

#### 워터마크 사용시의 유의사항

* 예를 들어 센서 데이터는 아무리 늦어도 10분 이상 지연되지 않는다는 것을 안다면, **워터마크를 10분으로** 지정합니다
  - 반드시 groupBy 집계 이전에 withWatermark 선언이 되어야만 합니다.
  - 쿼리가 수행될 때에 '구조화된 스트리밍'은 ***이벤트 타임*** 의 최대 값을 계속해서 관찰합니다
  - 워터마크를 갱신함에 따라서 ***너무 늦은*** 데이터를 필터할 수 있게 됩니다
  - 그리고나서 '오래된 상태'를 정리하게 됩니다. 여기서 ***10분 이상 지연된 어떠한 데이터도 무시***된다는 의미입니다
  - 이벤트 타임 기준으로 가장 늦게 발생한 입력 데이터 보다 ***10분 이상 오래된 모든 타임 윈도우는 상태에서 정리***되게 됩니다

```python
df = (
    sensorReadings
    .withWatermark("eventTime", "10 minutes") # 최대 10분까지 지연 로그를 대기한다는 의미
    .groupBy("sensorId", window("eventTime", "10 minutes", "5 minutes")) # window(windowLength: '타임 윈도우 크기', slideInterval: '오버랩 시간')
    .mean("value")
)
```
<br>

### 1.2.4 윈도우 그룹 빈도수에 대한 워터마킹 동작 방식

* `매 5분 단위로 트리거링`이 발생하며, 집계연산이 수행됩니다
* `X축은 Processing Time` 즉, Scheduler 를 통해 트리거링이 발생하는 '프로세싱 타임'을 말하며, `Y축은 실제 사건이 발생한 '이벤트 타임'`을 가리킵니다
* 12:05 트리거링 시에는 어떠한 데이터도 수신되지 않았기 때문에 `'쿼리 테이블'에는 아무런 데이터도 없습`니다
* 12:10 트리거링 시에는 `2건의 데이터가 수신`되었고, '이벤트 타임'이 각 각 "12:07,id1 12:08,id2" 으로 사건이 발생한 시간과 수신된 시간이 거의 동일합니다
  - X, Y 축의 위치가 일치한 점으로 알 수 있으며 기울기가 1인 위치에 있는 데이터는 지연이 거의 없다고 말할 수 있습니다
* 12:10 트리거링 시에 윈도우가 10분이고, 슬라이딩이 5분이기 때문에 `2개의 '타임 슬라이드' 그룹에 대한 계산`이 이루어져야 합니다
  - "12:00 ~ 12:10" 과 "12:05 ~ 12:15" 2개가 그것입니다
* 12:15 트리거링 시에는 지연된 데이터 "12:09,id3"가 도착했고, 워터마크 내에 존재하여 '쿼리 테이블'에 반영되어집니다
  - **워터마크 계산**은 `'프로세싱 타임' 기준으로 가장 최신에 수신된 '이벤트 타임' - '12:14,id2' 기준으로 10분 과거 시간까지 허용`합니다
  - 즉, watermark = 12:14 - 10mins = 12:04 분이전 데이터는 지연된 데이터로 버리고, 그 이후 데이터는 워터마크 내의 데이터입니다
* 12:10 트리거링 시에는 총 4개의 데이터가 수신되었고, 2개는 정상, 2개는 지연된 데이터가 존재합니다만, 버리는 경우는 없습니다
  - **지연을 결정짓는 가장 큰 기준은 절대적인 시간이 아니며**, `수신된 데이터의 maximum(eventTime) 시간을 기준으로 계산`합니다
* 현재 '프로세싱 타임'이 아무리 흘러도, 수신되는 '이벤트 타임' 시간이 천천히 흘러간다면 지연이 아니게 됩니다 
  - Y 축으로 늘어나는 시간은 '이벤트 타임' 기준으로 거리가 일정하고 정확하지만, 상대적으로 X 축은 그렇지 못 합니다
  - 일례로 "12:07 ~ 12:08" 과 "12:14 ~ 12:15" 의 Y 축의 거리는 일정하고 동일하지만, X 축의 거리는 전자가 다소 짧습니다 (상대적인 워터마크 계산임을 알아야 합니다)
* '스파크 스트리밍'은 그림에서의 ***파랑색 점선*** 을 항상 모니터링 하면서 '워터마크'를 계산합니다
  - 항상 가장 최신의 '이벤트 타임'을 저장하고 있다가, 트리거링 되는 시점에 watermark 시간을 계산하여 너무 지연된 데이터를 필터링 하게 됩니다
  - 결국, **'쿼리 테이블'도 '이벤트 타임'기준으로 슬라이딩 하면서 갱신**한다고 보면 됩니다
  
  
> ***워터마킹이 적용된 스트리밍 처리는 가장 최신 이벤트 타임 기준으로 워터마크 시간(그림은 10분)만큼 지연된 데이터까지 처리하는 것을 보장합니다*** 
최대 지연 시간의 데이터는 확실히 보장하지만, 누락되지 않는 방향은 확실히 보장하지만, 반면에 ***지연이 되었다고 해서 반드시 누락되는 것을 보장하지는 않습니다.***
이러한 누락되도 되는 데이터를 집계하는 것은 *레코드가 수신된 시점과, 마이크로 배치 처리가 트리거링된 시점*에 기인합니다.


![figure.8-10](images/figure.8-10.png)


<br>

### 1.2.4 카프카 워터마크 이용 예제


In [None]:
queryName = "kafka-watermark-stream"

kafkaWatermarkReader = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("subscribe", "events")
    .option("startingOffsets", "latest") # earliest
    .load()
)

kafkaWatermarkSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)

kafkaWatermarkCounter = (
    kafkaWatermarkReader
    .select(from_json(col("value").cast("string"), kafkaWatermarkSchema).alias("x"))
    .selectExpr("x.emp_id as emp_id", "x.timestamp as timestamp")
    .withWatermark("timestamp", "25 minutes")
    .groupBy("emp_id", window("timestamp", "25 minute", "25 minute"))
    .count()
)
kafkaWatermarkWriter = (
    kafkaWatermarkCounter
    .writeStream
    .format("console")
    .outputMode("append")
)

kafkaWatermarkTrigger = (
    kafkaWatermarkWriter
    .option("truncate", False)
)


> 스트리밍 애플리케이션을 기동하고, 파이썬 프로듀서를 통해서 카프카 메시지를 생성합니다.


In [None]:
kafkaWatermarkQuery = kafkaWatermarkTrigger.start()
kafkaWatermarkQuery.awaitTermination(30)
kafkaWatermarkQuery.stop()


* 노트북 터미널을 통해서 아래의 경로로 이동하여 수행합니다.
```bash
cd /home/jovyan/work/lgde-spark-stream/python 
python watermarks.py
```

<br>

#### 지원하는 출력 모드 - Supported output modes

> '시간을 포함하지 않은 스트리밍 집계'와는 다르게 '이벤트 타임을 포함한 스트리밍 집계'는 3가지 모드를 모두 지원합니다

* Update mode
  - 매 '마이크로 배치'집계 결과에서 변경된 사항만 출력으로 생성되고, 워터마킹은 주기적으로 '상태'를 갱신합니다
  - 하지만, ***Parquet, ORC 와 같은 파일 기반의 싱크의 경우는 '업데이트 모드'를 사용할 수 없***습니다 (단, Delta Lake 는 제외)
  - "수시로 변경 사항만을 지속적으로 갱신하는 경우"

* Complete mode
  - 시간 혹은 변화에 무관하게 모든 출력을 항상 만들어내고 갱신합니다
  - 이 모드는 ***워터마크를 명시하였다고 하더라도 '상태'는 클린업 되지 않으며, 모든 과거의 '상태'를 유지***합니다.
  - 이러한 이유로 무한히 상태가 증가하여 메모리 사용에 항상 주의해야만 합니다

* Append mode
  - ***event-time window 와 watermarking 을 사용했을 때***에만 이용 가능한 모드이며, 이전 결과(상태)에 대해 수정이 불가합니다
  - *워터마크를 명시하지 않은 집계*의 경우는 미래의 데이터로 사용되므로, append 모드에서 *출력이 불가*능합니다
  - 즉, *워터마크가 명시되어야만, 해당 집계 그룹 내의 데이터가 더 이상 업데이트 되지 않음을 알 수* 있습니다
  - append 모드의 경우 '키'와 '집계결과'가 더 이상 과거의 데이터를 갱신하지 않는다는 것을 보장하는 경우에만 *최종 집계 결과를 출력*합니다
  - 이러한 이유로, ***파일과 같은 싱크에 append-only** 출력이 가능하지만, 반면에 워터마크 시간 만큼 ***실시간 처리가 지연***되게 됩니다


<br>

### <font color=blue>1. [중급]</font> "1.2.4 카프카 워터마크 이용 예제" 를 참고하여 아래의 조건에 맞도록 구현하세요

* 카프카 소스
  - 호스트 : kafka
  - 포트 : 9093
* 변환 작업
  - 카프카로부터 전달 받은 value 값의 emp_id 와 timestamp 값을 기준으로 위의 예제와 동일하게 count 를 계산하세요
  - `워터마크 윈도우`를 활용하여 windowLength: 25분, watermarkLength: 25분 으로 수행하세요
* 메모리 싱크
  - 쿼리 : kafkaWatermarkStream
  - 포맷 : memory
  - 모드 : append
* 애플리케이션
  - 타임아웃 : 1~3분 내외 (카프카 프로듀서 실행 할 시간)
  - 화면출력 : displayStream 함수를 이용하여 3초 슬립 총 10회 이상 출력
  - 테스트 후 애플리케이션을 종료해 주세요

<details><summary>[정답] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
# 아래에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
queryName = "kafkaWatermarkStream"

kafkaWatermarkReader = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")
    .load()
)

kafkaWatermarkSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)

kafkaWatermarkCounter = (
    kafkaWatermarkReader
    .select(from_json(col("value").cast("string"), kafkaWatermarkSchema).alias("x"))
    .selectExpr("x.emp_id as emp_id", "x.timestamp as timestamp")
    .withWatermark("timestamp", "25 minutes")
    .groupBy("emp_id", window("timestamp", "25 minute", "25 minute"))
    .count()
)
kafkaWatermarkWriter = (
    kafkaWatermarkCounter
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("append")
)

kafkaWatermarkTrigger = (
    kafkaWatermarkWriter
    .option("truncate", False)
)

kafkaWatermarkQuery = kafkaWatermarkTrigger.start()
displayStream("kafkaWatermarkStream", f"select * from {queryName}", 10, 3)
kafkaWatermarkQuery.stop()
```

</details>

In [26]:
# 아래에 실습 코드를 작성하고 실행하세요 (Shift+Enter)


'[kafkaWatermarkStream] Iteration: 10, Query: select * from kafkaWatermarkStream'

emp_id,window,count
3,"{2021-07-03 00:25:00, 2021-07-03 00:50:00}",1
2,"{2021-07-03 00:00:00, 2021-07-03 00:25:00}",1
1,"{2021-07-03 00:00:00, 2021-07-03 00:25:00}",4


<br>

### <font color=blue>2. [중급]</font> "1.2.4 카프카 워터마크 이용 예제" 를 참고하여 아래의 조건에 맞도록 구현하세요

* 카프카 소스
  - 호스트 : kafka
  - 포트 : 9093
* 변환 작업
  - 카프카로부터 전달 받은 value 값의 emp_id 와 timestamp 값을 기준으로 위의 예제와 동일하게 count 를 계산하세요
  - `텀블링 윈도우`를 활용하여 windowLength: 30분
* 메모리 싱크
  - 쿼리 : kafkaTumblingStream
  - 포맷 : memory
  - 모드 : append
* 애플리케이션
  - 타임아웃 : 1~3분 내외 (카프카 프로듀서 실행 할 시간)
  - 화면출력 : displayStream 함수를 이용하여 3초 슬립 총 10회 이상 출력
  - 정렬순서 : 1순위: 시간(window) 오름차순, 2순위: 빈도수(count) 내림차순
  - 테스트 후 애플리케이션을 종료해 주세요

<details><summary>[정답] 출력 결과 확인 </summary>

> 아래와 유사하게 방식으로 작성 되었다면 정답입니다

```python
# 아래에 실습 코드를 작성하고 실행하세요 (Shift+Enter)
queryName = "kafkaTumblingStream"

kafkaTumblingReader = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("subscribe", "events")
    .option("startingOffsets", "latest")
    .load()
)

kafkaTumblingSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
    .add(StructField("timestamp", TimestampType()))
    .add(StructField("time", StringType()))
)

kafkaTumblingCounter = (
    kafkaTumblingReader
    .select(from_json(col("value").cast("string"), kafkaTumblingSchema).alias("x"))
    .selectExpr("x.emp_id as emp_id", "x.timestamp as timestamp")
    .groupBy("emp_id", window("timestamp", "30 minute"))
    .count()
)
kafkaTumblingWriter = (
    kafkaTumblingCounter
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("complete")
)

kafkaTumblingTrigger = (
    kafkaTumblingWriter
    .option("truncate", False)
)

kafkaTumblingQuery = kafkaTumblingTrigger.start()
displayStream("kafkaTumblingStream", f"select * from {queryName} order by window asc, count desc", 10, 3)
kafkaTumblingQuery.stop()
```

</details>

In [31]:
# 아래에 실습 코드를 작성하고 실행하세요 (Shift+Enter)


'[kafkaTumblingStream] Iteration: 10, Query: select * from kafkaTumblingStream order by window asc, count desc'

emp_id,window,count
1,"{2021-07-03 00:00:00, 2021-07-03 00:30:00}",4
2,"{2021-07-03 00:00:00, 2021-07-03 00:30:00}",1
3,"{2021-07-03 00:30:00, 2021-07-03 01:00:00}",1
4,"{2021-07-03 01:00:00, 2021-07-03 01:30:00}",1
4,"{2021-07-03 01:30:00, 2021-07-03 02:00:00}",1
