# Kafka Streaming + PySpark 예제

### 1. findspark를 통해 pyspark 등 라이브러리 추가

In [36]:
import findspark
findspark.init("/usr/local/lib/spark-3.3.2-bin-hadoop3")

from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf, col, from_json, pandas_udf, split

sconf = SparkConf()
sconf.setAppName("Jupyter_Notebook").set("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,com.datastax.spark:spark-cassandra-connector_2.12:3.3.0")

sc = SparkContext(conf=sconf)

23/03/21 16:34:42 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/03/21 16:34:47 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.12-3.3.2.jar added multiple times to distributed cache.
23/03/21 16:34:47 WARN Client: Same path resource file:///root/.ivy2/jars/com.datastax.spark_spark-cassandra-connector_2.12-3.3.0.jar added multiple times to distributed cache.
23/03/21 16:34:47 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-token-provider-kafka-0-10_2.12-3.3.2.jar added multiple times to distributed cache.
23/03/21 16:34:47 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.kafka_kafka-clients-2.8.1.jar added multiple times to distributed cache.
23/03/21 16:34:47 WARN Client: Same path resource file:///root/.ivy2/jars/org.apache.commons_commons-pool2-2.11.1.jar added multiple times to distributed cache.
23/03/21 16:3

### 2. SparkConf를 통해 configuration 추가하고, SparkContext 생성
spark-kafka와 spark-cassandra 의존성이 추가되어야 한다.

### 3. Kafka Topic에 sobscribe하여 Session의 readStream을 정의
printSchema() 메소드를 통해 Kafka의 스키마를 확인할 수 있다.

In [37]:
kafka_bootstrap_servers = 'master01:9092,master02:9092,slave01:9092,slave02:9092,slave03:9092'
topic = 'tagmanager'
schema = StructType(
        [
                StructField("serviceToken", StringType()),
                StructField("clientId", LongType()),
                StructField("serviceId", LongType()),
                StructField("sessionId", StringType()),
                StructField("event", StringType()),
                StructField("targetId", StringType()),
                StructField("positionX", IntegerType()),
                StructField("positionY", IntegerType()),
                StructField("location", StringType()),
                StructField("timestamp", LongType())
        ]
)

session = SparkSession(sc)
streaming_df = session \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
  .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") \
  .option("failOnDataLoss","False") \
  .option("subscribe", topic) \
  .load() \
  .withColumn("key", col("key").cast("string")) \
  .withColumn("value", from_json(col("value").cast("string"), schema))
streaming_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- serviceToken: string (nullable = true)
 |    |-- clientId: long (nullable = true)
 |    |-- serviceId: long (nullable = true)
 |    |-- sessionId: string (nullable = true)
 |    |-- event: string (nullable = true)
 |    |-- targetId: string (nullable = true)
 |    |-- positionX: integer (nullable = true)
 |    |-- positionY: integer (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### 4. Binary 형태인 key, value를 String으로 cast하여 전처리
key는 null값을 가지고 있어, 임의로 interger로 cast한 timestamp를 넣어주었다.

In [42]:
streaming_query = streaming_df.select("key", "value.*") \
    .withColumnRenamed("serviceToken", "service_token") \
    .withColumnRenamed("clientId", "client_id") \
    .withColumnRenamed("serviceId", "service_id") \
    .withColumnRenamed("sessionId", "session_id") \
    .withColumnRenamed("event", "event") \
    .withColumnRenamed("targetId", "target_id") \
    .withColumnRenamed("positionX", "position_x") \
    .withColumnRenamed("positionY", "position_y") \
    .withColumnRenamed("location", "location") \
    .withColumnRenamed("timestamp", "creation_timestamp")
streaming_query.printSchema()

root
 |-- key: string (nullable = true)
 |-- service_token: string (nullable = true)
 |-- client_id: long (nullable = true)
 |-- service_id: long (nullable = true)
 |-- session_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- target_id: string (nullable = true)
 |-- position_x: integer (nullable = true)
 |-- position_y: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- creation_timestamp: long (nullable = true)



In [47]:
import time
cassandra_keyspace = "mykeyspace"
cassandra_table = "stream"

query = streaming_query.writeStream.outputMode("append") \
      .format("org.apache.spark.sql.cassandra") \
  .option("checkpointLocation", "/") \
  .option("spark.cassandra.connection.host", "master01") \
  .option("spark.cassandra.connection.port", 9042) \
  .option("keyspace", cassandra_keyspace) \
  .option("table", cassandra_table) \
  .option("spark.cassandra.connection.remoteConnectionsPerExecutor", 10) \
  .option("spark.cassandra.output.concurrent.writes", 1000) \
  .option("spark.cassandra.concurrent.reads", 512) \
  .option("spark.cassandra.output.batch.grouping.buffer.size", 1000) \
  .option("spark.cassandra.connection.keep_alive_ms", 600000000) \
      .start()
time.sleep(10)
query.stop()


23/03/21 16:44:01 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:01 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/03/21 16:44:01 WARN StreamingQueryManager: Stopping existing streaming query [id=cc9e88f7-bd3e-4113-b0d0-d7680ad346c6, runId=451ed4d4-d321-4401-81f3-b84d302b5b7b], as a new run is being started.
23/03/21 16:44:04 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:04 WARN Deprecat

                                                                                

23/03/21 16:44:07 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:07 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:07 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:07 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 
23/03/21 16:44:07 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parame

In [31]:
query = streamming_query.writeStream.format("console").outputMode("complete").start()
time.sleep(10)
query.stop()

23/03/21 16:04:42 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-09e2486d-f3f2-49bc-bfa8-bb373fd0ec8e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/03/21 16:04:42 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
Project [key#185, service_token#220, client_id#231L, session_id#242, event#253, target_id#264, position_x#275, positionY#207, location#286, timestamp#209L AS creation_timestamp#297L]
+- Project [key#185, service_token#220, client_id#231L, session_id#242, event#253, target_id#264, position_x#275, positionY#207, location#208 AS location#286, timestamp#209L]
   +- Project [key#185, service_token#220, client_id#231L, session_id#242, event#253, target_id#264, positionX#206 AS position_x#275, positionY#207, location#208, timestamp#209L]
      +- Project [key#185, service_token#220, client_id#231L, session_id#242, event#253, targetId#205 AS target_id#264, positionX#206, positionY#207, location#208, timestamp#209L]
         +- Project [key#185, service_token#220, client_id#231L, session_id#242, event#204 AS event#253, targetId#205, positionX#206, positionY#207, location#208, timestamp#209L]
            +- Project [key#185, service_token#220, client_id#231L, sessionId#203 AS session_id#242, event#204, targetId#205, positionX#206, positionY#207, location#208, timestamp#209L]
               +- Project [key#185, service_token#220, clientId#202L AS client_id#231L, sessionId#203, event#204, targetId#205, positionX#206, positionY#207, location#208, timestamp#209L]
                  +- Project [key#185, serviceToken#201 AS service_token#220, clientId#202L, sessionId#203, event#204, targetId#205, positionX#206, positionY#207, location#208, timestamp#209L]
                     +- Project [key#185, value#193.serviceToken AS serviceToken#201, value#193.clientId AS clientId#202L, value#193.sessionId AS sessionId#203, value#193.event AS event#204, value#193.targetId AS targetId#205, value#193.positionX AS positionX#206, value#193.positionY AS positionY#207, value#193.location AS location#208, value#193.timestamp AS timestamp#209L]
                        +- Project [key#185, from_json(StructField(serviceToken,StringType,true), StructField(clientId,LongType,true), StructField(sessionId,StringType,true), StructField(event,StringType,true), StructField(targetId,StringType,true), StructField(positionX,IntegerType,true), StructField(positionY,IntegerType,true), StructField(location,StringType,true), StructField(timestamp,LongType,true), cast(value#172 as string), Some(Etc/UTC)) AS value#193, topic#173, partition#174, offset#175L, timestamp#176, timestampType#177]
                           +- Project [cast(key#171 as string) AS key#185, value#172, topic#173, partition#174, offset#175L, timestamp#176, timestampType#177]
                              +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@12494232, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@2d04270, [key.deserializer=org.apache.kafka.common.serialization.StringDeserializer, subscribe=tagmanager, failOnDataLoss=False, value.deserializer=org.apache.kafka.common.serialization.StringDeserializer, kafka.bootstrap.servers=master01:9092,master02:9092,slave01:9092,slave02:9092,slave03:9092], [key#171, value#172, topic#173, partition#174, offset#175L, timestamp#176, timestampType#177], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6700d4dc,kafka,List(),None,List(),None,Map(key.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, subscribe -> tagmanager, failOnDataLoss -> False, value.deserializer -> org.apache.kafka.common.serialization.StringDeserializer, kafka.bootstrap.servers -> master01:9092,master02:9092,slave01:9092,slave02:9092,slave03:9092),None), kafka, [key#164, value#165, topic#166, partition#167, offset#168L, timestamp#169, timestampType#170]


### 5. Spark Cassandra Connector를 사용해 Cassandra 연결, Streaming되는 dataframe 출력
20초 간 INSERT 후에 자동으로 writeStream 쿼리를 종료한다. Cassandra의 keyspace, table은 그 형태가 미리 정의되어있어야 한다.

In [17]:
# query = streamming_query.writeStream.format("console").start()

query = streamming_query.writeStream.outputMode("append") \
      .format("org.apache.spark.sql.cassandra") \
  .option("checkpointLocation", "/") \
  .option("spark.cassandra.connection.host", "master01") \
  .option("spark.cassandra.connection.port", 9042) \
  .option("keyspace", cassandra_keyspace) \
  .option("table", cassandra_table) \
  .option("spark.cassandra.connection.remoteConnectionsPerExecutor", 10) \
  .option("spark.cassandra.output.concurrent.writes", 1000) \
  .option("spark.cassandra.concurrent.reads", 512) \
  .option("spark.cassandra.output.batch.grouping.buffer.size", 1000) \
  .option("spark.cassandra.connection.keep_alive_ms", 600000000) \
      .start()

query.awaitTermination()

23/03/21 15:25:37 WARN DeprecatedConfigParameter: spark.cassandra.connection.keep_alive_ms is deprecated (DSE 6.0.0) and has been automatically replaced with parameter spark.cassandra.connection.keepAliveMS. 


AnalysisException: Couldn't find tagmanager or any similarly named keyspaces

### 6. Session과 Context 종료

In [35]:
session.stop()
sc.stop()