In [1]:
home_jovyan = "/home/jovyan"

In [2]:
work_data = f"{home_jovyan}/work/data"
workspace=f"{home_jovyan}/work"
work_dir=!pwd
work_dir = work_dir[0]

In [3]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON
import codecs

In [4]:
spark_version = os.environ['APACHE_SPARK_VERSION']
os.environ['PYSPARK_SUBMIT_ARGS']="--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 pyspark-shell"



In [5]:
spark = SparkSession.builder\
.appName("KafkaConsumer")\
.master("spark://spark-master:17077")\
.config("spark.executor.instances","3")\
.config("spark.executor.cores","1")\
.config("spark.executor.memory","4G")\
.config("spark.sql.session.timeZone","Asia/Seoul")\
.getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size


# Kafka setting

In [6]:
# configuration
kafka_config={
    "bootstrap.servers":"kafka:19092",
    "group.id":"seoulcity_to_kafka",
    "topic.name":"seoulcity"
}

In [7]:
kafka_reader = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_config["bootstrap.servers"]) \
    .option("group.id", kafka_config["group.id"]) \
    .option("subscribe", kafka_config["topic.name"]) \
    .load()

# Schema

In [8]:
seoulcity_schema = StructType([
    StructField("AREA_NM",StringType()),
    StructField("ROAD_TRAFFIC_STTS",StructType([
        StructField("AVG_ROAD_DATA",StructType([
            StructField("ROAD_MSG", StringType()),
            StructField("ROAD_TRAFFIC_IDX", StringType()),
            StructField("ROAD_TRFFIC_TIME", StringType()),
            StructField("ROAD_TRAFFIC_SPD", StringType())
        ]), nullable=False),
        StructField("ROAD_TRAFFIC_STTS",ArrayType(StructType([
            StructField("LINK_ID", StringType()),
            StructField("ROAD_NM", StringType()),
            StructField("START_ND_CD", StringType()),
            StructField("START_ND_NM", StringType()),
            StructField("START_ND_XY", StringType()),
            StructField("END_ND_CD", StringType()),
            StructField("END_ND_NM", StringType()),
            StructField("END_ND_XY", StringType()),
            StructField("DIST", StringType()),
            StructField("SPD", StringType()),
            StructField("IDX", StringType()),
            StructField("XYLIST", StringType())
        ])), nullable=False),
    ]),nullable=False),
    StructField("PRK_STTS",StructType([
        StructField("PRK_STTS",ArrayType(StructType([
            StructField("PRK_NM", StringType()),
            StructField("PRK_CD", StringType()),
            StructField("CPCTY", StringType()),
            StructField("CUR_PRK_CNT", StringType()),
            StructField("CUR_PRK_TIME", StringType()),
            StructField("CUR_PRK_YN", StringType()),
            StructField("PAY_YN", StringType()),
            StructField("RATES", StringType()),
            StructField("TIME_RATES", StringType()),
            StructField("ADD_RATES", StringType()),
            StructField("ADD_TIME_RATES", StringType()),
            StructField("ADDRESS", StringType()),
            StructField("ROAD_ADDR", StringType()),
            StructField("LNG", StringType()),
            StructField("LAT", StringType())
        ])), nullable=False),
    ]),nullable=False),
     StructField("SBIKE_STTS",StructType([
        StructField("SBIKE_STTS",ArrayType(StructType([
            StructField("SBIKE_SPOT_NM", StringType()),
            StructField("SBIKE_SPOT_ID", StringType()),
            StructField("SBIKE_SHARED", StringType()),
            StructField("SBIKE_PARKING_CNT", StringType()),
            StructField("SBIKE_RACK_CNT", StringType()),
            StructField("SBIKE_X", StringType()),
            StructField("SBIKE_Y", StringType())
        ])), nullable=False),
    ]),nullable=False)
])

In [9]:
# def decode_ascii_to_utf8(value):
#     decoded_value = codecs.decode(value.encode("latin1"), "unicode_escape").encode("latin1").decode("utf-8")
#     return decoded_value

In [10]:
pwd

'/home/jovyan/work'

# UDF 

## Transformation

In [11]:
def rts_preprocessing(ROAD_TRAFFIC_STTS):
    result = []
    if(ROAD_TRAFFIC_STTS is not None):
        for element in ROAD_TRAFFIC_STTS:
            xylist = element.XYLIST
            IDX=element.IDX
            coordinates = [[float(xy.split('_')[1]), float(xy.split('_')[0])] for xy in xylist.split('|')]

            if IDX == "혼잡" :
                color = "red"
            elif IDX == "서행":
                color = "orange"
            elif IDX == "정체":
                color = "red"
            else :
                color = "green"

            road_info = {
                "도로명": element.ROAD_NM,
                "구간명": element.START_ND_NM + " ~ " + element.END_ND_NM,
                "구간 거리": element.DIST + "m",
                "평균 속도": element.SPD + "km/h",
                "혼잡도": IDX
            }


            extracted_data = {
                "KEY": 1,
                "COLOR": color,
                "XYLIST": coordinates,
                "road_info": road_info
            }
            result.append(extracted_data)
    return result


In [12]:
def prks_preprocessing(PRK_STTS):
    result = []
    if(PRK_STTS is not None):
        PRK_PRICE = ""
        for element in PRK_STTS:
            ADD_TIME_RATES = None
            if(element.ADD_TIME_RATES is not None and element.TIME_RATES is not None):
                if int(element.ADD_TIME_RATES) > 0:
                    ADD_TIME_RATES = 10 / int(element.ADD_TIME_RATES)

                TIME_RATES = int(element.TIME_RATES)

                if ADD_TIME_RATES is not None and ADD_TIME_RATES * TIME_RATES <= 1000:
                    PRK_PRICE = "1000원 이하 (10분)"
                elif ADD_TIME_RATES is not None and 1000 < ADD_TIME_RATES * TIME_RATES <= 1500:
                    PRK_PRICE = "1000원~1500원 (10분)"
                elif ADD_TIME_RATES is not None and ADD_TIME_RATES * TIME_RATES > 1500:
                    PRK_PRICE = "1500원 이상 (10분)"
                else:
                    PRK_PRICE = ""

            PARK_YN="black"
            CUR_PRK_NUM = ""

            if element.CUR_PRK_YN == "Y":
                if(int(element.CUR_PRK_CNT) >0):
                    PARK_YN="green"
                else:
                    PARK_YN="red"

                CUR_PRK_NUM = element.CUR_PRK_CNT

            prk_info = {
                "주차장명": element.PRK_NM,
                "주소": element.ADDRESS,
                "주차가능 수": CUR_PRK_NUM,
                "가격": PRK_PRICE
            }
            xylist = [] 
            xylist.append(float(element.LAT))
            xylist.append(float(element.LNG))
            extracted_data = {
                "실시간여부":element.CUR_PRK_YN,
                "COLOR":PARK_YN,
                "XYLIST":xylist,
                "prk_info":prk_info
            }

            result.append(extracted_data)
    return result

In [13]:
def sbikes_preprocessing(SBIKE_STTS):
    result=[]
    if(SBIKE_STTS is not None):
        for element in SBIKE_STTS:
           
            SBIKE_RATE = "None"
            if(element.SBIKE_RACK_CNT is not None and element.SBIKE_SHARED is not None):
                SBIKE_SHARED = int(element.SBIKE_SHARED)
                SBIKE_RACK_CNT = int(element.SBIKE_RACK_CNT)
                if SBIKE_RACK_CNT != 0:
                    ratio = SBIKE_SHARED / SBIKE_RACK_CNT
                    if 0 <= ratio <= 0.7:
                        SBIKE_RATE = "낮음"
                    elif 0.7 < ratio <= 1.3:
                        SBIKE_RATE = "보통"
                    else:
                        SBIKE_RATE = "높음"
                        
            IS_SURPLUS = -1
            SBIKE_SURPLUS = "black"
            if(element.SBIKE_PARKING_CNT is not None):
                SBIKE_PARKING_CNT=int(element.SBIKE_PARKING_CNT)

                if SBIKE_PARKING_CNT > 0:
                    IS_SURPLUS=1

                    if 0 < SBIKE_PARKING_CNT <= 2:
                        SBIKE_SURPLUS = "orange"
                    elif 2 < SBIKE_PARKING_CNT <= 4:
                        SBIKE_SURPLUS = "green"
                    else:
                        SBIKE_SURPLUS = "blue"
                else:
                    IS_SURPLUS = 0
                    SBIKE_SURPLUS = "black"

            xylist = [] 
            xylist.append(float(element.SBIKE_Y))
            xylist.append(float(element.SBIKE_X))
            sbike_info = {
                "정류장명": element.SBIKE_SPOT_NM,
                "대여가능 수": element.SBIKE_PARKING_CNT,
                "반납율": SBIKE_RATE,
            }

            extracted_data = {
                "대여가능여부": IS_SURPLUS,
                "COLOR": SBIKE_SURPLUS,
                "XYLIST": xylist,
                "sbike_info": sbike_info
            }
            result.append(extracted_data)
    return result

In [14]:
rts_udf = udf(rts_preprocessing, ArrayType(StructType([
    StructField("KEY", IntegerType()),
    StructField("COLOR", StringType()),
    StructField("XYLIST", ArrayType(ArrayType(DoubleType()))),
    StructField("road_info", StructType([
        StructField("도로명", StringType()),
        StructField("구간명", StringType()),
        StructField("구간 거리", StringType()),
        StructField("평균 속도", StringType()),
        StructField("혼잡도", StringType())
    ])),
])))

In [15]:
prks_udf = udf(prks_preprocessing, ArrayType(StructType([
    StructField("실시간여부", StringType()),
    StructField("COLOR", StringType()),
    StructField("XYLIST", ArrayType(DoubleType())),
    StructField("prk_info", StructType([
        StructField("주차장명", StringType()),
        StructField("주소", StringType()),
        StructField("주차가능 수", StringType()),
        StructField("가격", StringType()),
    ])),
])))

In [16]:
sbikes_udf = udf(sbikes_preprocessing, ArrayType(StructType([
    StructField("대여가능여부", IntegerType(), True),
    StructField("COLOR", StringType()),
    StructField("XYLIST", ArrayType(DoubleType())),
    StructField("sbike_info", StructType([
        StructField("정류장명",StringType()),
        StructField("대여가능 수",StringType()),
        StructField("반납율",StringType())
    ]))
])))

# Processing

In [17]:

kafka_selector = (
    kafka_reader
    .select(
        col("key").cast("string"),
        from_json(col("value").cast("string"),seoulcity_schema).alias("seoulcitydata")
    )
    .selectExpr("seoulcitydata.AREA_NM as key",
                "seoulcitydata.ROAD_TRAFFIC_STTS.AVG_ROAD_DATA", 
                "seoulcitydata.ROAD_TRAFFIC_STTS.ROAD_TRAFFIC_STTS", 
                "seoulcitydata.PRK_STTS.PRK_STTS",
                "seoulcitydata.SBIKE_STTS.SBIKE_STTS"
               )
    .withColumn("ROAD_TRAFFIC_STTS", rts_udf(col("ROAD_TRAFFIC_STTS")))
    .withColumn("PRK_STTS",prks_udf(col("PRK_STTS")))
    .withColumn("SBIKE_STTS",sbikes_udf(col("SBIKE_STTS")))
)

# selected_data = kafka_selector.select("서울시데이터.ROAD_TRAFFIC_STTS.AVG_ROAD_DATA")
kafka_selector.printSchema()
# 데이터 처리 및 출력 등 원하는 작업을 수행하세요.
# 예: 콘솔에 데이터 출력
streaming_query = (
    kafka_selector.writeStream
    .format("console")
    .option("header",False)
    .option("truncate", False)
    .outputMode("append")
    .start()
)


root
 |-- key: string (nullable = true)
 |-- AVG_ROAD_DATA: struct (nullable = true)
 |    |-- ROAD_MSG: string (nullable = true)
 |    |-- ROAD_TRAFFIC_IDX: string (nullable = true)
 |    |-- ROAD_TRFFIC_TIME: string (nullable = true)
 |    |-- ROAD_TRAFFIC_SPD: string (nullable = true)
 |-- ROAD_TRAFFIC_STTS: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- KEY: integer (nullable = true)
 |    |    |-- COLOR: string (nullable = true)
 |    |    |-- XYLIST: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- road_info: struct (nullable = true)
 |    |    |    |-- 도로명: string (nullable = true)
 |    |    |    |-- 구간명: string (nullable = true)
 |    |    |    |-- 구간 거리: string (nullable = true)
 |    |    |    |-- 평균 속도: string (nullable = true)
 |    |    |    |-- 혼잡도: string (nullable = true)
 |-- PRK_STTS: array (nullable = true)
 |    |-- e

In [18]:

p_kafkaSelector = (
    kafka_selector
    .withColumn("value", to_json(struct("key", "AVG_ROAD_DATA", "ROAD_TRAFFIC_STTS", "PRK_STTS", "SBIKE_STTS")))
)


In [19]:
def displayStatus(name, query, iterations, sleep_secs):
    from time import sleep
    i = 1
    for x in range(iterations):
        clear_output(wait=True)      # Output Cell 의 내용을 지웁니다
        display('[' + name + '] Iteration: '+str(i)+', Status: '+query.status['message'])
        display(query.lastProgress)  # 마지막 수행된 쿼리의 상태를 출력합니다
        sleep(sleep_secs)            # 지정된 시간(초)을 대기합니다
        i += 1

In [20]:


qname = "kafkaQ"
kafkaWriter_origin = (
    p_kafkaSelector.select("key","value")
    .writeStream
    .queryName(qname)
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:19092")
    .option("topic", "seoulcity_visual")
    .outputMode("append")
)

checkpointLocation = f"{work_dir}/tmp/{qname}"
!rm -rf $checkpointLocation
kafkaTrigger = (
    kafkaWriter_origin
    .trigger(processingTime="5 second")
    .option("checkpointLocation", checkpointLocation)
)

kafkaQuery = kafkaTrigger.start()

displayStatus(qname, kafkaQuery, 1000, 5)
    
kafkaQuery.stop()

'[kafkaQ] Iteration: 1000, Status: Waiting for next trigger'

{'id': '5f8c00ea-a78a-411c-9fb1-180ca3321401',
 'runId': '63067b7c-e409-45cc-9b5d-04d4bc1c13a4',
 'name': 'kafkaQ',
 'timestamp': '2023-06-05T01:14:00.000Z',
 'batchId': 20,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'latestOffset': 1, 'triggerExecution': 1},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[seoulcity]]',
   'startOffset': {'seoulcity': {'2': 1254, '1': 1230, '0': 1226}},
   'endOffset': {'seoulcity': {'2': 1254, '1': 1230, '0': 1226}},
   'latestOffset': {'seoulcity': {'2': 1254, '1': 1230, '0': 1226}},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0,
   'metrics': {'avgOffsetsBehindLatest': '0.0',
    'maxOffsetsBehindLatest': '0',
    'minOffsetsBehindLatest': '0'}}],
 'sink': {'description': 'org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@6905e09c',
  'numOutputRows': 0}}