In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, pandas_udf, split
from pyspark.sql import SparkSession

topic = 'coin'
kafka_bootstrap_servers = 'kafka:9092'
#sparkSession 생성

spark = SparkSession.builder.appName("kafka-ex")\
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")\
        .getOrCreate()


spark.sparkContext.setLogLevel("ERROR")

In [2]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType

from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

df = spark \
    .readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic) \
    .load()

schema = StructType([
    StructField("t", TimestampType(), True),
    StructField("s", StringType(), True),
    StructField("i", StringType(), True),
    StructField("o", FloatType(), True),
    StructField("c", FloatType(), True),
    StructField("h", FloatType(), True),
    StructField("l", FloatType(), True),
    StructField("v", StringType(), True),
    StructField("n", IntegerType(), True),
    StructField("x", BooleanType(), True),
    StructField("q", StringType(), True),
    StructField("V", StringType(), True),
    StructField("Q", StringType(), True),
    StructField("B", StringType(), True)
])



# JSON으로 데이터 변환
df = df.selectExpr("CAST(value AS STRING)")\
    .withColumn("parsed_value", from_json("value", schema))

# Convert 't' to timestamp and add as a new column
df = df.withColumn("t", to_timestamp("parsed_value.t"))

# Drop the original 't' column
df = df.drop("parsed_value.t")

# Select the parsed values
parsed_stream_df = df.selectExpr("parsed_value.*")


In [3]:
parsed_stream_df

DataFrame[t: timestamp, s: string, i: string, o: float, c: float, h: float, l: float, v: string, n: int, x: boolean, q: string, V: string, Q: string, B: string]

In [13]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

windowDuration = "10 minutes"

# Apply the window function to the DataFrame
windowed_df = parsed_stream_df \
    .withWatermark("t", "10 minutes") \
    .groupBy(F.window(parsed_stream_df.t, windowDuration, "10 minutes")) \
    .agg(F.avg("c").alias("avg_c")) \
    .select("window.start", "window.end", "avg_c")


In [15]:
query = windowed_df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [4]:
query = parsed_stream_df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()


query.awaitTermination()

# update , append

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [16]:
#query.awaitTermination()

query.stop()


In [4]:
import pyspark
print(pyspark.__version__)


3.3.0
