In [1]:
pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.


In [1]:
## Structured Streaming 

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType


In [18]:
# SparkSession 생성
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

In [26]:
spark.sparkContext.setLogLevel("INFO")

ConnectionRefusedError: [Errno 111] Connection refused

In [19]:
# 데이터 스키마 정의
schema = StructType([
    StructField("@version", StringType(), True),
    StructField("type", StringType(), True),
    StructField("time", StringType(), True),
    StructField("host", StringType(), True),
    StructField("stream", StringType(), True),
    StructField("@timestamp", TimestampType(), True),
    StructField("log", StringType(), True),
    StructField("path", StringType(), True)
])

In [20]:
# Kafka에서 데이터 읽기
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "new_test_topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Kafka에서 읽어온 데이터에서 value 컬럼을 문자열로 변환
value_df = kafka_df.selectExpr("CAST(value AS STRING)")

In [21]:
# JSON 데이터를 스키마에 맞게 파싱
json_df = value_df.select(from_json(col("value"), schema).alias("data")).select("data.*")

In [22]:
json_df

DataFrame[@version: string, type: string, time: string, host: string, stream: string, @timestamp: timestamp, log: string, path: string]

In [23]:
# 특정 host에 대한 로그 필터링 예시
filtered_df = json_df.filter(col("host") == "4839c495357b")

In [24]:
filtered_df

DataFrame[@version: string, type: string, time: string, host: string, stream: string, @timestamp: timestamp, log: string, path: string]

In [None]:
# # 데이터를 HDFS에 저장
# query = filtered_df.writeStream \
#     .format("parquet") \
#     .outputMode("append") \
#     .option("checkpointLocation", "hdfs://namenode:9000/user/your_user/checkpoint") \
#     .option("path", "hdfs://namenode:9000/user/your_user/datasets/logs") \
#     .start()

# query.awaitTermination()

In [25]:
# 실시간 콘솔에 출력하기
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f

Py4JError: py4j does not exist in the JVM

In [1]:
from pyspark.sql import SparkSession

# SparkSession 생성 (Kafka 패키지 포함)
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

# 로그 레벨 설정
spark.sparkContext.setLogLevel("INFO")

In [2]:
# Kafka 데이터 읽기
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "new_test_topic") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka 메시지는 바이너리 형식으로 오기 때문에 문자열로 변환해야 함
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [3]:
df

DataFrame[key: string, value: string]

In [4]:
# 데이터를 적절히 필터링하거나 변환
filtered_df = df.filter(df['value'].isNotNull())

In [5]:
filtered_df

DataFrame[key: string, value: string]

In [6]:
# 실시간으로 콘솔에 출력
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 스트리밍 실행
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [7]:
print(spark.sparkContext.master)

local[*]


In [3]:
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .master("spark://43.202.6.17:7077") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

In [None]:
spark-submit --master spark://:7077 --status

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .master("spark://182d782c5905:7077") \
    .getOrCreate()

In [4]:
# Kafka에서 데이터 읽기
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "new_test_topic") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka 메시지는 바이너리 형식으로 오기 때문에 문자형으로 변환해야 함
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 데이터 필터링: Null이 아닌 값만 선택
filtered_df = df.filter(df['value'].isNotNull())

# 실시간으로 콘솔에 출력
query = filtered_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# 스트리밍 실행
query.awaitTermination()

StreamingQueryException: [STREAM_FAILED] Query [id = af337cce-a366-4615-82a5-01ad93dba2e0, runId = 7e207cd0-f70e-4af0-8199-774757c2686f] terminated with exception: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1725804640245, tries=1, nextAllowedTryMs=1725804640346) timed out at 1725804640246 after 1 attempt(s)