In [1]:
import os

# yarn 클러스터매니저를 사용하기 위해 yarn conf path 지정
os.environ["YARN_CONF_DIR"] = "/mcw/spark3/conf2"

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
    .master("yarn") \
    .appName("yarn-spark") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
    .config("spark.executor.instances", "5") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.8/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/spark/.ivy2/cache
The jars for the packages stored in: /home/spark/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-02f547c0-36f9-4312-afe8-b5553c559b2d;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	

In [2]:
from pyspark.ml.classification import RandomForestClassificationModel

# 모델 로드
rf_model = RandomForestClassificationModel.load("hdfs://spark-master-01:9000/mcw/model/ml_model_pf")

                                                                                

In [3]:
# Kafka 데이터 소스 설정
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092") \
    .option("subscribe", "heat") \
    .option("startingOffsets", "latest") \
    .load()

In [4]:
from pyspark.sql.types import *

#스키마 설정
schema = StructType([
    StructField("R상무효전력", DoubleType(), True),
    StructField("R상선간전압", DoubleType(), True),
    StructField("R상역률", DoubleType(), True),
    StructField("R상유효전력", DoubleType(), True),
    StructField("R상전류", DoubleType(), True),
    StructField("R상전류고조파", DoubleType(), True),
    StructField("R상전압", DoubleType(), True),
    StructField("R상전압고조파", DoubleType(), True),
    StructField("S상무효전력", DoubleType(), True),
    StructField("S상선간전압", DoubleType(), True),
    StructField("S상역률", DoubleType(), True),
    StructField("S상유효전력", DoubleType(), True),
    StructField("S상전류", DoubleType(), True),
    StructField("S상전류고조파", DoubleType(), True),
    StructField("S상전압", DoubleType(), True),
    StructField("S상전압고조파", DoubleType(), True),
    StructField("T상무효전력", DoubleType(), True),
    StructField("T상선간전압", DoubleType(), True),
    StructField("T상역률", DoubleType(), True),
    StructField("T상유효전력", DoubleType(), True),
    StructField("T상전류", DoubleType(), True),
    StructField("T상전류고조파", DoubleType(), True),
    StructField("T상전압", DoubleType(), True),
    StructField("T상전압고조파", DoubleType(), True),
    StructField("누적전력량", DoubleType(), True),
    StructField("무효전력평균", DoubleType(), True),
    StructField("상전압평균", DoubleType(), True),
    StructField("선간전압평균", DoubleType(), True),
    StructField("역률평균", DoubleType(), True),
    StructField("온도", DoubleType(), True),
    StructField("유효전력평균", DoubleType(), True),
    StructField("전류고조파평균", DoubleType(), True),
    StructField("전류평균", DoubleType(), True),
    StructField("전압고조파평균", DoubleType(), True),
    StructField("주파수", DoubleType(), True),
    StructField("label", StringType(), True),
    StructField("sensor_id", IntegerType(), True),
    StructField("timestamp", DoubleType(), True),
])

In [5]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

# JSON 데이터 파싱
parsed_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .select(from_json("value", schema).alias("data")).select("data.*")

# 데이터 전처리
preprocessed_df = parsed_df.select(
    struct(
        col("sensor_id"),
        col("온도"),
        when(col("유효전력평균") != 0, expr("`유효전력평균` / sqrt(pow(`유효전력평균`, 2) + pow(`무효전력평균`, 2))")).otherwise(0).alias("pf"),
        when(col("R상유효전력") != 0, expr("`R상유효전력` / sqrt(pow(`R상유효전력`, 2) + pow(`R상무효전력`, 2))")).otherwise(0).alias("R_pf"),
        when(col("S상유효전력") != 0, expr("`S상유효전력` / sqrt(pow(`S상유효전력`, 2) + pow(`S상무효전력`, 2))")).otherwise(0).alias("S_pf"),
        when(col("T상유효전력") != 0, expr("`T상유효전력` / sqrt(pow(`T상유효전력`, 2) + pow(`T상무효전력`, 2))")).otherwise(0).alias("T_pf"),
    ).alias("data")
)


# 특성 벡터 생성
feature_cols = ["온도", "pf", "R_pf", "S_pf", "T_pf"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
input_df = assembler.transform(preprocessed_df.select("data.*"))

# 예측 수행
predictions = rf_model.transform(input_df)

# 예측값을 포함하여 데이터 전처리
preprocessed_df2 = predictions.select(
            struct(
                col("sensor_id"),
                col("온도"),
                col("pf"),
                col("R_pf"),
                col("S_pf"),
                col("T_pf"),
                when(col("prediction") == 2.0, "경고").otherwise(when(col("prediction") == 1.0, "주의").otherwise("정상")).alias("pred")
            ).alias("data")
        )


# JSON 문자열로 변환
output_df = preprocessed_df2.selectExpr("to_json(data) AS value")

23/05/24 07:22:46 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [6]:
# 배치 df 확인
def check_df(batch_df, batch_id):
    batch_df.show()
    
# spark warehouse에 저장하는 쿼리
writeDW = parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("json") \
    .option("checkpointLocation", "hdfs://spark-master-01:9000/checkpoint/writeDW") \
    .option("path", "hdfs://spark-master-01:9000/mcw/raw") \
    .queryName("writeDW") \
    .start()

# # 스트림 실행
# spark.streams.awaitAnyTermination()

23/05/24 09:41:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [12]:
writeDW.stop()

In [7]:
# 데이터 처리를 한 후 다시 kafka에 보내는 쿼리
pf_pred_to_kafka = output_df \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "spark-worker-01:9092,spark-worker-02:9092,spark-worker-03:9092") \
    .option("topic", "heat_pf") \
    .option("checkpointLocation", "hdfs://spark-master-01:9000/checkpoint/writestreaming") \
    .queryName("pf_pred_to_kafka") \
    .start()

# 스트리밍 실행
# query.awaitTermination()

23/05/24 09:41:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [13]:
pf_pred_to_kafka.stop()

In [14]:
spark.stop()