In [1]:
# 사내 서버 카프카에 "json_topic" 토픽 추가 하였다, 없으면 추가
# {"id":1,"firstname":"James ","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1,"gender":"M","salary":3000}
# {"id":2,"firstname":"Michael ","middlename":"Rose","lastname":"","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}
# {"id":3,"firstname":"Robert ","middlename":"","lastname":"Williams","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}
# {"id":4,"firstname":"Maria ","middlename":"Anne","lastname":"Jones","dob_year":2005,"dob_month":5,"gender":"F","salary":4000}
# {"id":5,"firstname":"Jen","middlename":"Mary","lastname":"Brown","dob_year":2010,"dob_month":7,"gender":"","salary":-1}

#setting
import os
os.chdir("..")
import nb_init

os.environ['CEP_ENV'] = 'local'


In [2]:
# 필요 Pyspark 모듈 import
from cep_common import spark_utils as su
from cep_common import config as con

from pyspark.sql.types import *
import pyspark.sql.functions as F

# Create SparkSession
spark = su.create_spark_session(appName="kafka-sample")

In [3]:
# 스파크 스트리밍으로 카프카 데이터 읽어오기
topic_name = "json_topic"

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", con.KAFKA_NODES) \
  .option("subscribe", topic_name) \
  .option("startingOffsets", "earliest") \
  .load()

df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
#df에서 value값만 가져와서 binary에서 string 형태로 캐스팅한다
df = df.selectExpr("CAST(value AS STRING)")

In [5]:
#커스텀 스키마 형태
schema = StructType().add("id", "integer") \
      .add("firstname", "string") \
      .add("middlename","string") \
      .add("lastname","string") \
      .add("dob_year","integer") \
      .add("dob_month","integer") \
      .add("gender","string") \
      .add("salary","integer")

In [6]:
#df에서 value 값을 스키마 형태로 변형
df = df.select(F.from_json(df.value, schema).alias("data")).select("data.*")

df = df.withColumn("lastname_starting_char", F.substring(F.col("lastname"), 1, 1))

In [7]:
# delta 형태로 저장 Lastname 첫 알파벳으로 파티션한다
save_path = "hdfs://{}/datalake/kafka/topics/{}".format(con.HADOOP_NODE, topic_name)

df.writeStream \
      .format("delta") \
      .outputMode("append") \
      .partitionBy("lastname_starting_char") \
      .trigger(processingTime='1 seconds') \
      .option("checkpointLocation", save_path+"/check_points") \
      .option("path", save_path) \
      .start()

<pyspark.sql.streaming.StreamingQuery at 0x15f26511d88>

In [9]:
# 저장된 데이터 로드하여 보여주기
load_df = spark.read \
    .format("delta") \
    .load(save_path)

load_df.show()

+---+---------+----------+--------+--------+---------+------+------+----------------------+
| id|firstname|middlename|lastname|dob_year|dob_month|gender|salary|lastname_starting_char|
+---+---------+----------+--------+--------+---------+------+------+----------------------+
|  2| Michael |      Rose|        |    2010|        3|     M|  4000|                  null|
|  1|   James |          |   Smith|    2018|        1|     M|  3000|                     S|
+---+---------+----------+--------+--------+---------+------+------+----------------------+



In [10]:
spark.stop()