스파크 세선을 성성하고 버전을 확인합니다.
상세한 가이드는 Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.2.1 Documentation 를 참고 합니다.

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from IPython.display import display, display_pretty, clear_output, JSON
# 공통 데이터 위치
home_jovyan = "/home/jovyan"
work_data = f"{home_jovyan}/work/data"
work_dir=!pwd
work_dir = work_dir[0]

# mongodb 관련 conf
mongo_conn = "mongodb://root:123456@127.0.0.1:27017"

spark = (
    SparkSession
    .builder
    .config("spark.sql.session.timeZone", "Asia/Seoul")
    .config('spark.mongodb.input.uri', 'mongodb://root:123456@mongo/tweetDB.test')
    .config('spark.mongodb.output.uri', 'mongodb://root:123456@mongo/tweetDB.test')
    .getOrCreate()
)

# 노트북에서 테이블 형태로 데이터 프레임 출력을 위한 설정을 합니다
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # display enabled
spark.conf.set("spark.sql.repl.eagerEval.truncate", 100) # display output columns size

# 로컬 환경 최적화
spark.conf.set("spark.sql.shuffle.partitions", 5) # the number of partitions to use when shuffling data for joins or aggregations.
spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true")


# 현재 기동된 스파크 애플리케이션의 포트를 확인하기 위해 스파크 정보를 출력합니다
spark

### kafka reader 생성

카프카로부터 메시지 수신을 위한 카프카 리더를 생성합니다
earliest 옵션으로 kafka:9093 의 rawdata 토픽으로부터 메시지를 읽어옵니다.

In [2]:
# kafkaReader 생성 (spark.readStream) -> 카프카에서 메세지 수신
# server.properties 에서 advertised.listeners=INSIDE://kafka:9093,OUTSIDE://localhost:9092

kafkaReader = (
    spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9093")
  .option("subscribe", "rawdata")
  .option("startingOffsets", "earliest")
  .load()
)
kafkaReader.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### mongoDB로 저장

In [None]:
kafkaSchema = (
    StructType()
    .add(StructField("status", StringType()))
    .add(StructField("id", StringType()))
    .add(StructField("screen_name", StringType()))
    .add(StructField("user_id", IntegerType()))
    .add(StructField("profile", StringType()))
    .add(StructField("time", DateType()))
    .add(StructField("text", StringType()))
    .add(StructField("retweet_count", IntegerType()))
    .add(StructField("favorite_count", IntegerType()))
    .add(StructField("lat", IntegerType()))
    .add(StructField("long", IntegerType()))
)


kafkaSelector = (
    kafkaReader
    .select(
        col("key").cast("string"),
        from_json(col("value").cast("string"), kafkaSchema).alias("groups")
    )
    .selectExpr("groups.id as key", "to_json(struct(groups.*)) as value")
)

def write_row(batch_df , batch_id):
    clear_output(wait=True)
    display(batch_id, batch_df)
    batch_df.write.format("mongo").mode("append").save()
    pass

kafkaSelector.writeStream.foreachBatch(write_row).start().awaitTermination()



65

key,value
1603384048797175808,"{""status"":""ORIGINAL"",""id"":""1603384048797175808"",""screen_name"":""KINGOFAKESUBJIN"",""profile"":""http:/..."
