In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

# 현재 기동된 스파크 애플리케이션의 포트를 확인하기 위해 스파크 정보를 출력합니다
spark

In [2]:
# 스트림 테이블을 주기적으로 조회하는 함수 (name: 이름, sql: Spark SQL, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStream(name, sql, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)              # 출력 Cell 을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Query: '+sql)
        display(spark.sql(sql))              # Spark SQL 을 수행합니다
        sleep(sleep_secs)                    # sleep_secs 초 만큼 대기합니다
        i += 1

# 스트림 쿼리의 상태를 주기적으로 조회하는 함수 (name: 이름, query: Streaming Query, iterations: 반복횟수, sleep_secs: 인터벌)
def displayStatus(name, query, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)      # Output Cell 의 내용을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Status: '+query.status['message'])
        display(query.lastProgress)  # 마지막 수행된 쿼리의 상태를 출력합니다
        sleep(sleep_secs)            # 지정된 시간(초)을 대기합니다
        i += 1

In [44]:
# {"hello":"ssm-seoul","id":0,"time":"2022-09-30 16:42:01"}

kafkaReader = (
    spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9093")
  .option("subscribe", "events")
  .option("startingOffsets", "latest")
  .load()
)
kafkaReader.printSchema()

kafkaSchema = (
    StructType()
    .add(StructField("id", LongType()))
    .add(StructField("hello", StringType()))
    .add(StructField("time", StringType()))
)

kafkaSelector = (
    kafkaReader
    .select(
        col("key").cast("string"),
        from_json(col("value").cast("string"), kafkaSchema).alias("events")
    )
    .selectExpr("events.id % 10 as mod_id", "events.*")
)

kafkaSelector.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- mod_id: long (nullable = true)
 |-- id: long (nullable = true)
 |-- hello: string (nullable = true)
 |-- time: string (nullable = true)



In [46]:
# 노트북 로그 콘솔로 출력
queryName = "consoleSink"
kafkaWriter = (
    kafkaSelector.select("*")
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

kafkaTrigger = (
    kafkaWriter
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

# 파이썬의 경우 콘솔 디버깅이 노트북 표준출력으로 나오기 때문에, 별도 메모리 테이블로 조회
kafkaQuery = kafkaTrigger.start()
displayStream(queryName, f"select * from {queryName} order by mod_id desc", 4, 5)
kafkaQuery.stop()

'[consoleSink] Iteration: 4, Query: select * from consoleSink order by mod_id desc'

mod_id,id,hello,time
9,2849,ssm-seoul,2022-09-30 17:38:46
9,2859,ssm-seoul,2022-09-30 17:38:56
8,2848,ssm-seoul,2022-09-30 17:38:45
8,2858,ssm-seoul,2022-09-30 17:38:55
7,2857,ssm-seoul,2022-09-30 17:38:54
6,2856,ssm-seoul,2022-09-30 17:38:53
5,2855,ssm-seoul,2022-09-30 17:38:52
4,2854,ssm-seoul,2022-09-30 17:38:51
3,2853,ssm-seoul,2022-09-30 17:38:50
2,2852,ssm-seoul,2022-09-30 17:38:49


In [47]:
namePath = f"{work_dir}/data/names"
nameStatic = (
    spark
    .read
    .option("inferSchema", "true")
    .json(namePath)
)
nameStatic.printSchema()
nameStatic.show(truncate=False)

root
 |-- name: string (nullable = true)
 |-- uid: long (nullable = true)

+-----+---+
|name |uid|
+-----+---+
|zero |0  |
|one  |1  |
|two  |2  |
|three|3  |
|four |4  |
|five |5  |
+-----+---+



In [None]:
joinExpression = (kafkaSelector.mod_id == nameStatic.uid)
staticSelector = kafkaSelector.join(nameStatic, joinExpression, "leftOuter")

In [51]:
queryName = "memorySink"
staticWriter = (
    staticSelector
    .selectExpr("time", "id as user_id", "name as user_name", "hello", "uid")
    .selectExpr("user_id as key", "to_json(struct(*)) as value")
    .writeStream
    .queryName(queryName)
    .format("memory")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

staticTrigger = (
    staticWriter
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

In [52]:
staticQuery = staticTrigger.start()
displayStream(queryName, f"select * from {queryName} order by key desc", 10, 6)
staticQuery.explain(True)
staticQuery.stop()

'[kafkaSink] Iteration: 4, Query: select * from kafkaSink order by key desc'

key,value
2959,"{""time"":""2022-09-30 17:40:36"",""user_id"":2959,""hello"":""ssm-seoul""}"
2958,"{""time"":""2022-09-30 17:40:35"",""user_id"":2958,""hello"":""ssm-seoul""}"
2957,"{""time"":""2022-09-30 17:40:34"",""user_id"":2957,""hello"":""ssm-seoul""}"
2956,"{""time"":""2022-09-30 17:40:33"",""user_id"":2956,""hello"":""ssm-seoul""}"
2955,"{""time"":""2022-09-30 17:40:32"",""user_id"":2955,""user_name"":""five"",""hello"":""ssm-seoul"",""uid"":5}"
2954,"{""time"":""2022-09-30 17:40:31"",""user_id"":2954,""user_name"":""four"",""hello"":""ssm-seoul"",""uid"":4}"
2953,"{""time"":""2022-09-30 17:40:30"",""user_id"":2953,""user_name"":""three"",""hello"":""ssm-seoul"",""uid"":3}"
2952,"{""time"":""2022-09-30 17:40:29"",""user_id"":2952,""user_name"":""two"",""hello"":""ssm-seoul"",""uid"":2}"
2951,"{""time"":""2022-09-30 17:40:28"",""user_id"":2951,""user_name"":""one"",""hello"":""ssm-seoul"",""uid"":1}"
2950,"{""time"":""2022-09-30 17:40:27"",""user_id"":2950,""user_name"":""zero"",""hello"":""ssm-seoul"",""uid"":0}"


== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@38b40bf9
+- Project [user_id#3466L AS key#3473L, to_json(struct(time, time#3122, user_id, user_id#3466L, user_name, user_name#3467, hello, hello#3121, uid, uid#3406L), Some(Asia/Seoul)) AS value#3474]
   +- Project [time#3122, id#3120L AS user_id#3466L, name#3405 AS user_name#3467, hello#3121, uid#3406L]
      +- Join LeftOuter, (mod_id#3119L = uid#3406L)
         :- Project [(events#3115.id % cast(10 as bigint)) AS mod_id#3119L, events#3115.id AS id#3120L, events#3115.hello AS hello#3121, events#3115.time AS time#3122]
         :  +- Project [cast(key#3101 as string) AS key#3116, from_json(StructField(id,LongType,true), StructField(hello,StringType,true), StructField(time,StringType,true), cast(value#3102 as string), Some(Asia/Seoul)) AS events#3115]
         :     +- StreamingDataSourceV2Relation [key#3101, value#3102, topic#3103, partition#3104, offset#3105L, timestamp#3106

In [58]:
staticQuery.stop()

In [63]:
queryName = "kafkaSink"
staticWriter = (
    staticSelector
    .selectExpr("time", "id as user_id", "name as user_name", "hello", "uid")
    .selectExpr("cast(user_id as string) as key", "to_json(struct(*)) as value")
    .writeStream
    .queryName(queryName)
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("topic", "events_enrich")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{queryName}"
!rm -rf $checkpointLocation

staticTrigger = (
    staticWriter
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

staticQuery = staticTrigger.start()
displayStatus(queryName, staticQuery, 1000, 10)
staticQuery.stop()

'[kafkaSink] Iteration: 21, Status: Waiting for next trigger'

{'id': '66571cc2-b231-4eab-9a22-faae530abc19',
 'runId': '6ee2022a-4731-44c2-bcab-f3bfc2176d56',
 'name': 'kafkaSink',
 'timestamp': '2022-09-30T08:53:35.000Z',
 'batchId': 39,
 'numInputRows': 4,
 'inputRowsPerSecond': 0.8001600320064013,
 'processedRowsPerSecond': 5.649717514124294,
 'durationMs': {'addBatch': 584,
  'getBatch': 0,
  'latestOffset': 2,
  'queryPlanning': 7,
  'triggerExecution': 708,
  'walCommit': 60},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[events]]',
   'startOffset': {'events': {'0': 3760}},
   'endOffset': {'events': {'0': 3764}},
   'latestOffset': {'events': {'0': 3764}},
   'numInputRows': 4,
   'inputRowsPerSecond': 0.8001600320064013,
   'processedRowsPerSecond': 5.649717514124294,
   'metrics': {'avgOffsetsBehindLatest': '0.0',
    'maxOffsetsBehindLatest': '0',
    'minOffsetsBehindLatest': '0'}}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@2a913d0c',
  'numOutputRows': 4}}

KeyboardInterrupt: 

In [None]:
json = spark.read.json("/source/json")

# API 활용하여 조회하는 방법
json.select("emp_id", "emp_name")

# SQL 활용하여 조회하는 방법
json.createOrReplaceTempView("simple")
spark.sql("select emp_id, emp_name from simple")

# 데이터 출력
json.show()

# 경로에 저장
json.write.json("/target/json")