# 스파크 스트리밍 실습 4교시 : 외부 인터페이스

> 파일, 카프카 등의 외부 소스로부터 데이터를 읽고, 카산드라 혹은 MySQL 등의 외부 저장소에 저장하는 실습을 합니다

## 학습 목표
* `File`에 저장된 데이터를 처리합니다
  - 로컬 JSON 파일을 직접 생성 및 수정을 통해 스트리밍 데이터를 생성합니다
* `Kafka`에 저장된 스트리밍 데이터를 처리합니다
  - 카프카 메시지 프로듀서를 통해서 카프카에 스트림 데이터를 생성합니다
* `Cassandra`에 스트림 데이터를 저장합니다
  - 로컬 JSON 파일을 읽어서 처리하는 애플리케이션을 구현합니다
  - 직접 연동이 어렵기 때문에 foreachBatch 메소드를 통해 카산드라에 저장합니다
* `MySQL`에 스트림 데이터를 저장합니다
  - 로컬 JSON 파일을 읽어서 처리하는 애플리케이션을 구현합니다
  - 레코드 단위의 트랜잭션 지원을 위해 foreach 메소드를 통해 MySQL에 저장합니다

## 목차
* [1. File 소스 스트리밍 실습](#1.-File-소스-스트리밍-실습)
* [2. Kafka 소스 스트리밍 실습](#2.-Kafka-소스-스트리밍-실습)
* [3. Cassandra 싱크 스트리밍 실습](#3.-Cassandra-싱크-스트리밍-실습)
* [4. MySQL 싱크 스트리밍 실습](#4.-MySQL-싱크-스트리밍-실습)


In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

# 현재 기동된 스파크 애플리케이션의 포트를 확인하기 위해 스파크 정보를 출력합니다
spark

22/09/09 11:49:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# 스트림 테이블을 주기적으로 조회하는 함수 (name: 이름, sql: Spark SQL, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStream(name, sql, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)              # 출력 Cell 을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Query: '+sql)
        display(spark.sql(sql))              # Spark SQL 을 수행합니다
        sleep(sleep_secs)                    # sleep_secs 초 만큼 대기합니다
        i += 1

# 스트림 쿼리의 상태를 주기적으로 조회하는 함수 (name: 이름, query: Streaming Query, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStatus(name, query, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)      # Output Cell 의 내용을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Status: '+query.status['message'])
        display(query.lastProgress)  # 마지막 수행된 쿼리의 상태를 출력합니다
        sleep(sleep_secs)            # 지정된 시간(초)을 대기합니다
        i += 1

## 1. File 소스 스트리밍 실습

> 기본으로 제공되는 stream 싱크와 소스를 이용하여 데이터 프레임 생성이 가능하며 SparkSession.readStream(), DataFrame.writeStream() 을 이용합니다

* 파일로부터 읽기
  - 기본적으로 텍스트 파일(CSV, TSV 등)의 경우 스키마가 없는 경우가 많기 때문에 스키마를 직접 생성해 줍니다


In [3]:
filePath = f"{work_data}/json"
fileJson = spark.read.json(filePath)
fileJson.printSchema() # 기본적으로 스키마를 추론하는 경우 integer 가 아니라 long 으로 잡는다
display(fileJson)

                                                                                

root
 |-- emp_id: long (nullable = true)
 |-- emp_name: string (nullable = true)



emp_id,emp_name
1,OO전자
2,OO화학
3,OO에너지솔루션
100,OO


In [4]:
filePath = f"{work_data}/json"
queryName = "fileSource"
checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
fileSchema = (
    StructType()
    .add(StructField("emp_id", LongType()))
    .add(StructField("emp_name", StringType()))
)
fileReader = (
    spark
    .readStream
    .format("json")
    .schema(fileSchema)
    .load(filePath)
)
fileSelector = fileReader.select("emp_id", "emp_name")
fileWriter = (
    fileSelector
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("update")
)
fileTrigger = (
    fileWriter
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpointLocation)
)
fileQuery = fileTrigger.start()
displayStream("fileSource", f"select * from {queryName}", 3, 3)
fileQuery.stop()

'[fileSource] Iteration: 3, Query: select * from fileSource'

emp_id,emp_name
1,OO전자
2,OO화학
3,OO에너지솔루션
100,OO


## 2. Kafka 소스 스트리밍 실습

![table.8-1](images/table.8-1.png)

#### 1. 카프카에 데이터 쓰기 - producer.py
```python
#!/usr/bin/env python

import sys
from time import sleep
from json import dumps
import json
from kafka import KafkaProducer
import names

# 카프카에 데이터를 전송하는 코드
def produce(port):
    try:
        hostname="kafka:%d" % port
        producer = KafkaProducer(bootstrap_servers=[hostname],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        data = {}
        for seq in range(9999):
            print("Sequence", seq)
            first_name = names.get_first_name()
            last_name = names.get_last_name()
            data["first"] = first_name
            data["last"] = last_name
            producer.send('events', value=data)
            sleep(0.5)
    except:
        import traceback
        traceback.print_exc(sys.stdout)

# 카프카의 데이터 수신을 위한 내부 포트는 9093
if __name__ == "__main__":
    port = 9093
    if len(sys.argv) == 2:
        port = int(sys.argv[1])
    produce(port)
```
#### 2. 터미널에 접속하여 카프카 메시지를 생성합니다"
```bash
(base) cd /home/jovyan/work/lgde-spark-stream/python
(base) python producer.py
```

#### 3. 스파크를 통해 생성된 스트리밍 데이터를 확인합니다

In [5]:
# 컨테이너 내부에서 토픽에 접근하기 위한 포트는 9093
kafkaReader = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("subscribe", "events")
    .load()
)

kafkaSchema = (
    StructType()
    .add(StructField("first", StringType()))
    .add(StructField("last", StringType()))
)

kafkaSelector = (
    kafkaReader
    .select(
        from_json(col("value").cast("string"), kafkaSchema).alias("name")
    )
    .selectExpr("name.first as first", "name.last as last")
    .groupBy("first")
    .count().alias("count")
)

queryName = "kafkaSource"
kafkaWriter = (
    kafkaSelector
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("complete")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
kafkaTrigger = (
    kafkaWriter
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpointLocation)
)
kafkaQuery = kafkaTrigger.start()
displayStream("kafkaSource", f"select first, count from {queryName} order by count desc limit 5", 10, 3)
kafkaQuery.stop()

'[kafkaSource] Iteration: 10, Query: select first, count from kafkaSource order by count desc limit 5'

first,count
Richard,2
Mary,2
Nicole,2
Barbara,2
Lori,2


In [6]:
kafkaQuery.status
kafkaQuery.stop()

<br>

## 3. Cassandra 싱크 스트리밍 실습

> 빌트인으로 지원하지 않는 저장소 엔진(예: Cassandra 등)의 경우 foreachBatch() 혹은 foreach() 메소드를 통해 커스텀 로직을 추가할 수 있습니다.


### 3.1 모든 스토리지 시스템에 쓰기

> 현재 '스파크 스트리밍'에는 카산드라에 스트리밍 데이터프레임을 저장할 수 있는 방법은 없기 때문에, Batch DataFrame 방식으로 저장합니다

* foreachBatch : 임의의 작업들 혹은 매 '마이크로 배치'의 결과에 대해 커스텀 로직을 적용할 수 있는 배치 메소드
  - foreachBatch(arg1: DataFrame or Dataset, arg2: Unique Identifier)
* 로컬 파일을 카산드라 데이터베이스에 저장합니다
  - 로컬에 저장된 JSON 파일을 읽어서 스트리밍 처리를 합니다
  - 초기 test_keyspace 와 test_table 에 저정합니다

In [7]:
cassandraAddress = "cassandra"
cassandraKeyspace = "test_keyspace"
cassandraTableName = "test_table"
spark.conf.set("spark.cassandra.connection.host", cassandraAddress)

def writeCountsToCassandra(updatedCountsDF, batchId):
    # Use Cassandra batch data source to write the updated counts
    (
        updatedCountsDF
        .write
        .format("org.apache.spark.sql.cassandra")
        .mode("append")
        .options(table=cassandraTableName, keyspace=cassandraKeyspace)
        .save()
    )

In [8]:
cassandraSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
)
cassandraPath = f"{work_data}/json"
cassandraReader = (
    spark
    .readStream
    .format("json")
    .schema(cassandraSchema)
    .load(cassandraPath)
)

cassandraSelector = cassandraReader.select("emp_id", "emp_name")

queryName = "cassandraSink"
checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

cassandraWriter = (
    cassandraSelector
    .writeStream
    .foreachBatch(writeCountsToCassandra)
    .outputMode("update")
)

cassandraTrigger = (
    cassandraWriter
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpointLocation)
)

cassandraQuery = cassandraTrigger.start()

22/09/09 12:10:16 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 2008 milliseconds


* 카산드라에 접속하여, 데이터를 조회합니다

```bash
$> docker-compose exec cassandra cqlsh
cqlsh> use test_keyspace;
cqlsh:test_keyspace> select emp_id, emp_name from test_table;
```

* 아래와 같이 나오면 성공입니다

```bash
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.10 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> use test_keyspace;
cqlsh:test_keyspace> select emp_id, emp_name from test_table;

 emp_id | emp_name
--------+------------------
      1 |         엘지전자
      2 |         엘지화학
    100 |             엘지
      3 | 엘지에너지솔루션

(4 rows)
```

In [9]:
cassandraQuery.stop()

#### foreachBatch() 로 아래의 동작들이 가능합니다

* 이전에 존재하는 배치 데이터 소스들을 재사용 하기 - *Reuse existing batch data sources* 
  - 이전 예제와 같이 배치에서 사용하는 데이터 소스들을 그대로 사용할 수 있습니다

* 다수의 경로에 저장하기 - *Write to multiple locations*
  - OLAP 및 OLTP 데이터베이스 등에 다양한 위치로 저장할 수 있습니다
  - 출력 데이터에 대해 데이터프레임 캐시를 통해 재계산을 회피할 수 있습니다

```python
# In Python
def writeCountsToMultipleLocations(updatedCountsDF, batchId):
    updatedCountsDF.persist()
    updatedCountsDF.write.format(...).save() # Location 1
    updatedCountsDF.write.format(...).save() # Location 2
    updatedCountsDF.unpersist()
}
```

* 추가적인 데이터프레임 연산 하기 - *Apply additional DataFrame operations*
  - Incremental Plan 생성이 어렵기 때문에 '스트리밍 데이터프레임'에 대한 연산 지원이 되지 않습니다 - **아래**
  - '마이크로 배치' 출력에 대하여 foreachBatch 메소드를 통해 작업 수행이 가능합니다.
  - **at-least-once** 지원만 가능하며, **exactly-once** 는 후처리 dedup 작업을 통해 가능합니다


#### '스파크 스트리밍'에 지원되지 않는 연산 - Unsupported Operations

> 스트리밍 '데이터프레임/데이터셋' 에서는 몇 가지 지원하지 않는 연산이 있습니다. 실행 시에는 ***DataFrame/Datasets 에서 지원하지 않는 연산이라는 AnalysisException*** 을 반환합니다. 생각해보면 끝없이 수신되는 데이터에 대한 정렬을 하는 것은 모호한 동작이며, 수신된 데이터를 모두 모니터링 해야 하므로, 비효율적인 연산일 가능성이 높습니다.

| 연산자 | 설명 |
| --- | --- |
| 다수 스트리밍 집계 연산 | 여러개의 스트리밍 데이터프레임의 집계는 현재 지원하지 않습니다 |
| limit or take | 스트리밍 데이터셋은 일부 로우를 가져오는 연산은 지원하지 않습니다 |
| distinct | 스트리밍 데이터셋은 유일 데이터 연산은 지원하지 않습니다 |
| sort | 집계 연산 이후의 Complete 출력모드에서만 정렬을 사용할 수 있습니다 |
| outer join | 스트림 사이드에 대해서만 left inner, outer 조인을 지원 |
| count | 단일 빈도수를 반환하지 않기 때문에, 실행 횟수를 반환하는 ds.groupBy().count() 를 사용하세요 |
| foreach | 대신 ds.writeStream.foreach(...) 를 사용하세요 (다음 챕터에서) |
| show | 콘솔 sink 를 사용하세요 (다음 챕터에서) |
| cube, rollup | 연산 비용 및 실용성 면에서 지원하지 않는 연산 UnsupportedOperationException |

* [support-matrix-for-joins-in-streaming-queries](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries)
  - 양쪽이 모두 스트림인 경우는 워터마크를 사용해야만 합니다


<br>

## 4. MySQL 싱크 스트리밍 실습

### 4.1 foreach 메소드를 이용하여 화면에 직접 출력 하는 예제

* 커스텀 저장 로직을 매 로우마다 적용하는 메소드 (고정된 map 함수 같은 방식)
  - 배치저장을 위한 인터페이스가 제공되지 않는 경우 직접 데이터를 저장하는 로직을 구현해야 하며, open, process, close 단계에 해당하는 구현을 해야만 합니다


In [None]:
mysqlSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
)
mysqlPath = f"{work_data}/json"
mysqlReader = (
    spark
    .readStream
    .format("json")
    .schema(mysqlSchema)
    .load(mysqlPath)
)

# 화면에 데이터를 그대로 출력하는 예제
def process_row(row):
    print("%d, %s" % (row[0], row[1]))

mysqlWriter = (
    mysqlReader
    .writeStream
    .foreach(process_row)
)

queryName = "mysqlSink"
checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
mysqlTrigger = (
    mysqlWriter
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpointLocation)
)

In [None]:
mysqlQuery = mysqlTrigger.start()
mysqlQuery.awaitTermination(5)
mysqlQuery.stop()

<br>

### 4.2 ForeachWriter 제 실습을 위하여 python 기반의 mysql connector 예제 (참고)

> foreach 구문에서는 디버깅이 어렵기 때문에 온전하게 동작하는 mysql python 예제를 미리 작성합니다

In [None]:
import mysql.connector
from mysql.connector import errorcode

cnx = mysql.connector.connect(user='sqoop', password='sqoop', host='mysql', database='testdb')
cursor = cnx.cursor()

def createEmployee(cursor):
    try:
        employee = "CREATE TABLE employees ( emp_id int, emp_name varchar(30) )"
        cursor.execute(employee)
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
            return "employees table already exists."
        else:
            return err.msg
    else:
        return "employees table created."

print(createEmployee(cursor))

def deleteEmployee(delete_employee):
    try:
        cursor.execute(delete_employee)
    except:
        import traceback
        traceback.print_exc()
        return "failed to delete employees"
    return "employees table deleted."

delete_employee = "DELETE FROM employees"
print(deleteEmployee(delete_employee))

def insertEmployee(add_employee, data_employee):
    emp_no = -1
    try:
        cursor.execute(add_employee, data_employee)
        emp_no = cursor.lastrowid
    except:
        import traceback
        traceback.print_exc()
        return emp_no
    return emp_no

add_employee = "INSERT INTO employees ( emp_id, emp_name ) VALUES ( %s, %s )"
data_employee = ( 1, '박수혁' )
print(insertEmployee(add_employee, data_employee))

cnx.commit()
cursor.close()
cnx.close()

<br>

#### 저장된 데이터를 확인합니다
```bash
$ docker-compose exec mysql mysql -usqoop -psqoop
mysql> use testdb;
mysql> select emp_id, emp_name from employees;
+--------+--------------------------+
| emp_id | emp_name                 |
+--------+--------------------------+
|      1 | 엘지전자                   |
|      2 | 엘지화학                   |
|      3 | 엘지에너지솔루션             |
|    100 | 엘지                      |
+--------+--------------------------+
```


### 4.3 ForeachWriter 클래스를 통해서 외부 데이터베이스에 저장하는 예제

> 아래의 경우는 foreach 메소드를 사용하는 예제를 설명하기 위함이며, 실무에서는 ***사용하지 사용해서는 안 되는 예제*** 입니다.

In [10]:
import mysql.connector
from mysql.connector import errorcode

class ForeachWriter:    
    
    def createEmployee(self, cursor):
        try:
            employee = "CREATE TABLE employees ( emp_id int, emp_name varchar(30) )"
            cursor.execute(employee)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
                return "employees table already exists."
            else:
                return err.msg
        else:
            return "employees table created."
        
    def deleteEmployee(self, cursor):
        try:
            delete_employee = "DELETE FROM employees"
            cursor.execute(delete_employee)
        except:
            import traceback
            traceback.print_exc()
            return "failed to delete employees"
        return "employees table deleted."
        
    def insertEmployee(self, cursor, data_employee):
        emp_no = -1
        try:
            add_employee = "INSERT INTO employees ( emp_id, emp_name ) VALUES ( %s, %s )"
            cursor.execute(add_employee, data_employee)
            emp_no = cursor.lastrowid
        except:
            import traceback
            traceback.print_exc()
            return emp_no
        return emp_no
    
    def insertDocument(self, data_employee):
        emp_id = -1
        try:
            cnx = mysql.connector.connect(user='sqoop', password='sqoop', host='mysql', database='testdb')
            cursor = cnx.cursor()
            emp_id = self.insertEmployee(cursor, data_employee)
            cnx.commit()
        except:
            import traceback
            traceback.print_exc()
        finally:
            cursor.close()
            cnx.close()
        return emp_id
    
    def selectEmployee(self, cursor):
        emp_no = -1
        try:
            get_employees = "SELECT emp_id, emp_name FROM employees"
            cursor.execute(get_employees)
            for (emp_id, emp_name) in cursor:
                print("{} in '{}' has retrieved.".format(emp_id, emp_name))
        except:
            import traceback
            traceback.print_exc()
            return emp_no
        return emp_no
    
    # 데이터베이스 커넥션, 테이블 생성 및 테이블 데이터 삭제
    def open(self, partitionId, epochId):
        try:
            cnx = mysql.connector.connect(user='sqoop', password='sqoop', host='mysql', database='testdb')
            cursor = cnx.cursor()
            print(self.createEmployee(cursor))
            print(self.deleteEmployee(cursor))
            cnx.commit()
        except:
            import traceback
            traceback.print_exc()
        finally:
            cursor.close()
            cnx.close()
        return True
    
    def process(self, row):
        data_employee = (row[0], row[1])
        result = self.insertDocument(data_employee)
        if (result >= 0):
            print("[%s] '%s' is inserted" % data_employee)
    
    def close(self, error):
        try:
            cnx = mysql.connector.connect(user='sqoop', password='sqoop', host='mysql', database='testdb')
            cursor = cnx.cursor()
            self.selectEmployee(cursor)
        except:
            import traceback
            traceback.print_exc()
        finally:
            cursor.close()
            cnx.close()

In [11]:
mysqlSchema = (
    StructType()
    .add(StructField("emp_id", IntegerType()))
    .add(StructField("emp_name", StringType()))
)
mysqlPath = f"{work_data}/json"
mysqlReader = (
    spark
    .readStream
    .format("json")
    .schema(mysqlSchema)
    .load(mysqlPath)
)

# ForeachWriter 객체를 생성하여 전달합니다
mysqlWriter = (
    mysqlReader
    .writeStream
    .foreach(ForeachWriter())
)

queryName = "mysqlSink"
checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation
mysqlTrigger = (
    mysqlWriter
    .trigger(processingTime="1 second")
    .option("checkpointLocation", checkpointLocation)
)

In [12]:
mysqlQuery = mysqlTrigger.start()
mysqlQuery.awaitTermination(5)
mysqlQuery.stop()

employees table created.
employees table deleted.
[1] 'OO전자' is inserted
[2] 'OO화학' is inserted
[3] 'OO에너지솔루션' is inserted
[100] 'OO' is inserted
1 in 'OO전자' has retrieved.
2 in 'OO화학' has retrieved.
3 in 'OO에너지솔루션' has retrieved.
100 in 'OO' has retrieved.
