In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON
# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]


spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")

kafkaReader = (
    spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9093")
  .option("subscribe", "bts")
  .option("startingOffsets", "earliest")
  .load()
)


kafkaSchema = (
    StructType()
    .add(StructField("status", StringType()))
    .add(StructField("id", StringType()))
    .add(StructField("screen_name", StringType()))
    .add(StructField("user_id", IntegerType()))
    .add(StructField("profile", StringType()))
    .add(StructField("time", DateType()))
    .add(StructField("text", StringType()))
    .add(StructField("retweet_count", IntegerType()))
    .add(StructField("favorite_count", IntegerType()))
    .add(StructField("lat", IntegerType()))
    .add(StructField("long", IntegerType()))
)


kafkaSelector = (
    kafkaReader
    .select(
        col("key").cast("string"),
        from_json(col("value").cast("string"), kafkaSchema).alias("movies")
    )
    .selectExpr("movies.id as key", "movies.status", "movies.screen_name", "movies.user_id", "movies.profile", "movies.time", "movies.text", "movies.retweet_count", "movies.favorite_count", "movies.lat", "movies.long")
)

def processing(s):
    import re
    s = re.compile('[^ ㄱ-ㅣ가-힣a-zA-Z0-9./%#:\n]+').sub('',s)
    return s
    
processing_udf = udf(lambda x: processing(x),StringType())
kafkaSelector = kafkaSelector.withColumn("processed_text", processing_udf(col("text"))) 
kafkaSelector = (
    kafkaSelector
    .select("id", "user_id", "profile", "time", "processed_text", "retweet_count", "favorite_count")
    .where("status='ORIGINAL'")
)
kafkaSelector.printSchema
p_kafkaSelector = kafkaSelector.select(to_json(kafkaSelector.*))

qname = "temp"
kafkaWriter_origin = (
    p_kafkaSelector.select("*")
    .writeStream
    .queryName(qname)
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9093")
    .option("topic", "groups")
    .outputMode("append")
)
checkpointLocation = f"{work_dir}/tmp/{queryName1}"
!rm -rf $checkpointLocation
kafkaTrigger = (
    kafkaWriter_origin
    .trigger(processingTime="2 second")
    .option("checkpointLocation", checkpointLocation)
)
kafkaQuery = kafkaTrigger.start()

for i in range(1000):
    display(kafkaQuery.status['message'])
    display(kafkaQuery.lastProgress)
    
kafkaQuery.stop()