In [105]:
import findspark
findspark.init()


In [106]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSVReader") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://127.0.0.1:9001") \
    .getOrCreate()

In [107]:
# HDFS 디렉토리 내 모든 JSON 파일 읽기
file_path = "hdfs://127.0.0.1:9001/songho/*.json"

# 모든 JSON 파일 로드
df = spark.read.option("multiline", "true").json(file_path)


In [108]:
from pyspark.sql.functions import explode

# TimeSeriesData를 개별 레코드로 변환 및 평탄화
df_exploded = df.select(explode("TimeSeriesData").alias("TimeSeries"))

flattened_df = df_exploded.select(
    "TimeSeries.TimeStamp",
    "TimeSeries.SM_Sensor.DataType",
    "TimeSeries.SM_Sensor.HeartRate",
    "TimeSeries.SM_Sensor.BreathRate",
    "TimeSeries.SM_Sensor.SPO2",
    "TimeSeries.SM_Sensor.SkinTemperature",
    "TimeSeries.SM_Sensor.SleepPhase",
    "TimeSeries.SM_Sensor.SleepScore",
    "TimeSeries.SM_Sensor.WalkingSteps",
    "TimeSeries.SM_Sensor.StressIndex", 
    "TimeSeries.SM_Sensor.ActivityIntensity",
    "TimeSeries.SM_Sensor.CaloricExpenditure",
    "TimeSeries.SM_Sensor.Label",
    "TimeSeries.Counseling.DataType",
    "TimeSeries.Counseling.Counseling",
    "TimeSeries.Counseling.Memo",
    "TimeSeries.Counseling.Information",
    "TimeSeries.Total_Labeling.Estimation",
    "TimeSeries.Total_Labeling.Reason"
)


In [109]:
# 상위 N개의 행만 확인
flattened_df.show(20, truncate=False)  # 20개의 행을 출력
print(f"데이터 개수: {flattened_df.count()}")

+----------------+---------+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+-----+----------+--------------+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+------------------------------------+
|TimeStamp       |DataType |HeartRate|BreathRate|SPO2|SkinTemperature|SleepPhase|SleepScore|WalkingSteps|StressIndex|ActivityIntensity|CaloricExpenditure|Label|DataType  |Counseling    |Memo          |Information                                                                                                                                                                 |Estimation|Reason                              |
+----------------+---------+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+

                                                                                

In [110]:
flattened_df.printSchema()


root
 |-- TimeStamp: string (nullable = true)
 |-- DataType: string (nullable = true)
 |-- HeartRate: long (nullable = true)
 |-- BreathRate: long (nullable = true)
 |-- SPO2: long (nullable = true)
 |-- SkinTemperature: double (nullable = true)
 |-- SleepPhase: long (nullable = true)
 |-- SleepScore: long (nullable = true)
 |-- WalkingSteps: long (nullable = true)
 |-- StressIndex: long (nullable = true)
 |-- ActivityIntensity: long (nullable = true)
 |-- CaloricExpenditure: long (nullable = true)
 |-- Label: string (nullable = true)
 |-- DataType: string (nullable = true)
 |-- Counseling: string (nullable = true)
 |-- Memo: string (nullable = true)
 |-- Information: string (nullable = true)
 |-- Estimation: string (nullable = true)
 |-- Reason: string (nullable = true)



In [111]:
print(flattened_df.columns)


['TimeStamp', 'DataType', 'HeartRate', 'BreathRate', 'SPO2', 'SkinTemperature', 'SleepPhase', 'SleepScore', 'WalkingSteps', 'StressIndex', 'ActivityIntensity', 'CaloricExpenditure', 'Label', 'DataType', 'Counseling', 'Memo', 'Information', 'Estimation', 'Reason']


In [112]:
from pyspark.sql.functions import col
flattened_df.groupby("Information").count().orderBy(col("count").desc()).show(40,truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Information                                                                                                                                                                 |count|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|취침 습관은, 규칙적인 수면환경임, 3끼 규칙적인 식사(7시반, 12시 반, 17시 반 전후)                                                                                           |10658|
|취침 습관은, 새벽까지 잠을 잘 못자고, 자주 깨서 깊은 잠을 잘 못 주무심, 주로 점심과 저녁 식사하심, 병원진료, 분기별 친구와 외출, 방에 운동기구가 있어 집에서 아침에 운동하심|6397 |
|취침 습관은, 21~9시경, 침대생활하고 새벽에 자주 깸, 하루에 2~3끼 식사, 병원 월 1~2회 방문. 거동 불편하여 외출이 힘듦                                                        |6299 |
|취침 습관은, 21~5시, 최근 숙면 취하심, 식사 주로 3번, 주 2~3회 이상 외출하심, 점심

                                                                                

In [113]:
flattened_df.groupby('Counseling').count().orderBy(col("count").desc()).show(40)

+---------------------------------+-----+
|                       Counseling|count|
+---------------------------------+-----+
|                   특이사항 없음.|85630|
|                        정기확인_|  848|
| 정기확인-건강에는 이상이 없으...|  551|
|         정기확인_ 특이사항 없음.|  432|
|                         정기확인|  432|
|  정기확인_ 응급호출 오늘도 한...|  144|
|    정기확인_ 어르신 잘 주무셨음.|  144|
|  정기확인- 어르신 경로당 나가...|  144|
| 정기확인_어르신 컨디션 좋아보...|  144|
| 정기확인_소화불량으로 인해 배...|  144|
| 정기확인_관절 마디마디에 통증...|  144|
| 정기확인-건강에는 아무 이상이...|  144|
| 정기확인- 주말에 성당다녀오고...|  144|
| 정기확인-응급벨을 눌러 안부확...|  144|
|  정기확인- 벨 누르는거 깜박하...|  144|
| 정기확인-건강에는 이상이 없으...|  144|
| 정기확인_몸 상태가 좋아졌다고...|  144|
|  정기확인-통화를 했을 때 운동...|  144|
| 정기확인_생활지원사와 오전에 ...|  144|
|  정기확인-주말 사이 건강에 이...|  144|
|                        정기확인.|  144|
| 정기확인_생활지원사를 통해 허...|  144|
|  정기확인_ 얼굴에 멍은 여전하...|  144|
| 정기확인_다리가 저리고 아프다...|  144|
|                         정기확인|  144|
| 정기확인_ 노인일자리 참여하러...|  144|
| 정기확인-건강에는 전혀 이상이...|  144|
|  정기확인-응급호출 누르는 법 ...|  144|
|  정기확인_

                                                                                

In [114]:
# Reason 컬럼의 값별 빈도수 계산
reason_df = flattened_df.select(col("Reason"))
reason_frequency = reason_df.groupBy("Reason").count()

reason_frequency.orderBy(col("count").desc()).show(40)

+------------------------------------+-----+
|                              Reason|count|
+------------------------------------+-----+
|특이사항 없음으로 일상 상황으로 판단|78803|
|   분당 심박 기준으로 주의 상황으...| 7859|
|    분당 심박, 분당 호흡 기준으로...| 2827|
|   분당 호흡 기준으로 주의 상황으...| 2215|
|  피부온도변화 기준으로 위험 상황...| 1886|
|  피부온도변화 기준으로 주의 상황...| 1739|
|   분당 심박, 피부온도변화 기준으...| 1520|
|     분당 심박, 분당 호흡 등 기준...|  760|
|   분당 심박 기준으로 위험 상황으...|  719|
|   분당 호흡, 피부온도변화 기준으...|  578|
|   분당 호흡 기준으로 위험 상황으...|  397|
|  혈중산소농도 기준으로 주의 상황...|  370|
|응급버튼 기준으로 위험 상황으로 판단|  268|
|   분당 심박, 혈중산소농도 기준으...|  224|
|   분당 호흡, 혈중산소농도 기준으...|  108|
|   분당 심박, 피부온도변화 기준으...|   70|
|  혈중산소농도, 피부온도변화 기준...|   60|
|   분당 호흡, 피부온도변화 기준으...|   55|
|    분당 호흡, 혈중산소농도 등 기...|   40|
|    분당 심박, 혈중산소농도 등 기...|   36|
|    분당 심박, 분당 호흡 기준으로...|   25|
|     분당 심박, 분당 호흡 등 기준...|   11|
|    분당 심박, 피부온도변화 등 기...|    8|
|응급음성 기준으로 위험 상황으로 판단|    8|
|  혈중산소농도 기준으로 위험 상황...|    5|
|  피부온도변화, 응급버튼 기준으로...|    4|
|   분당 심박, 혈중산소농도 기준으...|    2|
|  특이사항

                                                                                

In [115]:
# Reason과 Label의 관계 분석
reason_label_fre = flattened_df.groupBy("Reason", "Label").count()
reason_label_fre.orderBy(col("count").desc()).show(truncate=False)


+------------------------------------------------------+-----+-----+
|Reason                                                |Label|count|
+------------------------------------------------------+-----+-----+
|특이사항 없음으로 일상 상황으로 판단                  |일상 |78377|
|분당 심박 기준으로 주의 상황으로 판단                 |일상 |7786 |
|분당 심박, 분당 호흡 기준으로 주의 상황으로 판단      |일상 |2802 |
|분당 호흡 기준으로 주의 상황으로 판단                 |일상 |2186 |
|피부온도변화 기준으로 위험 상황으로 판단              |일상 |1865 |
|피부온도변화 기준으로 주의 상황으로 판단              |일상 |1720 |
|분당 심박, 피부온도변화 기준으로 주의 상황으로 판단   |일상 |1511 |
|분당 심박, 분당 호흡 등 기준으로 주의 상황으로 판단   |일상 |756  |
|분당 심박 기준으로 위험 상황으로 판단                 |일상 |716  |
|분당 호흡, 피부온도변화 기준으로 주의 상황으로 판단   |일상 |574  |
|특이사항 없음으로 일상 상황으로 판단                  |주의 |411  |
|분당 호흡 기준으로 위험 상황으로 판단                 |일상 |394  |
|혈중산소농도 기준으로 주의 상황으로 판단              |일상 |369  |
|응급버튼 기준으로 위험 상황으로 판단                  |일상 |267  |
|분당 심박, 혈중산소농도 기준으로 주의 상황으로 판단   |일상 |221  |
|분당 호흡, 혈중산소농도 기준으로 주의 상황으로 판단   |일상 |106  |
|분당 심박, 피부온도변화 기

                                                                                

In [116]:
# TimeSeriesData를 개별 레코드로 변환 및 평탄화
data = df_exploded.select(
    "TimeSeries.SM_Sensor.HeartRate",
    "TimeSeries.SM_Sensor.BreathRate",
    "TimeSeries.SM_Sensor.SPO2",
    "TimeSeries.SM_Sensor.SkinTemperature",
    "TimeSeries.SM_Sensor.SleepPhase",
    "TimeSeries.SM_Sensor.SleepScore",
    "TimeSeries.SM_Sensor.WalkingSteps",
    "TimeSeries.SM_Sensor.StressIndex",
    "TimeSeries.SM_Sensor.ActivityIntensity",
    "TimeSeries.SM_Sensor.CaloricExpenditure",
    "TimeSeries.Total_Labeling.Reason"
)

from pyspark.sql.functions import monotonically_increasing_id
# 기존 DataFrame(data)에 인덱스 컬럼 추가
data = data.withColumn("Index", monotonically_increasing_id())
# Index를 첫 번째 컬럼으로 이동(*는 언리스트 하여 select에 전달하는 역할 )
data = data.select("Index", *[col for col in data.columns if col != "Index"])


In [117]:
data.show(20, truncate=False)  # 20개의 행을 출력
print(f"데이터 개수: {data.count()}")

+-----+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+------------------------------------+
|Index|HeartRate|BreathRate|SPO2|SkinTemperature|SleepPhase|SleepScore|WalkingSteps|StressIndex|ActivityIntensity|CaloricExpenditure|Reason                              |
+-----+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+------------------------------------+
|0    |73       |16        |98  |0.0            |9         |0         |0           |0          |102              |0                 |특이사항 없음으로 일상 상황으로 판단|
|1    |73       |16        |98  |0.0            |9         |0         |0           |0          |0                |0                 |특이사항 없음으로 일상 상황으로 판단|
|2    |73       |16        |98  |0.0            |9         |0         |0           |0          |0                |0                 |특이사항 없음으로 일상 상황으로 판단|
|3    |74       |16   

                                                                                

In [118]:
from pyspark.sql.functions import col

# "Reason" 컬럼에서 "응급버튼"을 포함하지 않은 행만 남기기
data = data.where(~col("Reason").contains("응급버튼"))
print(f"데이터 개수: {data.count()}")

데이터 개수: 100330


                                                                                

## 모델을 위한 전처리

In [119]:
from pyspark.sql.functions import when

#  다중 레이블을 위한 변환 (심박, 호흡, 피부온도, 혈중산소농도, 일상)
data = data.withColumn("심박", when(col("Reason").contains("심박"), 1).otherwise(0)) \
           .withColumn("호흡", when(col("Reason").contains("호흡"), 1).otherwise(0)) \
           .withColumn("피부온도", when(col("Reason").contains("피부온도"), 1).otherwise(0)) \
           .withColumn("혈중산소농도", when(col("Reason").contains("혈중산소농도"), 1).otherwise(0)) \
           .withColumn("일상", when(col("Reason").contains("특이사항"), 1).otherwise(0)) \
           .withColumn("상태", 
                          when(col("Reason").contains("일상"), 0)
                          .when(col("Reason").contains("주의"), 1)
                          .when(col("Reason").contains("위험"), 2)
                          .otherwise(None))  # 어떤 카테고리에도 포함되지 않으면 Null 값
            

In [120]:
data = data.drop('Reason')

In [121]:
data.show(40)

+-----+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+----+----+--------+------------+----+----+
|Index|HeartRate|BreathRate|SPO2|SkinTemperature|SleepPhase|SleepScore|WalkingSteps|StressIndex|ActivityIntensity|CaloricExpenditure|심박|호흡|피부온도|혈중산소농도|일상|상태|
+-----+---------+----------+----+---------------+----------+----------+------------+-----------+-----------------+------------------+----+----+--------+------------+----+----+
|    0|       73|        16|  98|            0.0|         9|         0|           0|          0|              102|                 0|   0|   0|       0|           0|   1|   0|
|    1|       73|        16|  98|            0.0|         9|         0|           0|          0|                0|                 0|   0|   0|       0|           0|   1|   0|
|    2|       73|        16|  98|            0.0|         9|         0|           0|          0|                0|                 0|   0|

In [122]:
data.groupby("상태").count().orderBy(col("count").desc()).show(40,truncate=False)

+----+-----+
|상태|count|
+----+-----+
|0   |78804|
|1   |18345|
|2   |3181 |
+----+-----+



                                                                                

In [123]:
# 변환된 컬럼들이 실제로 존재하는지 확인
print("Columns in DataFrame:", data.columns)

# 일부 데이터 출력하여 컬럼 확인
data.select("심박", "호흡", "피부온도", "혈중산소농도","일상","상태").show(20, False)

Columns in DataFrame: ['Index', 'HeartRate', 'BreathRate', 'SPO2', 'SkinTemperature', 'SleepPhase', 'SleepScore', 'WalkingSteps', 'StressIndex', 'ActivityIntensity', 'CaloricExpenditure', '심박', '호흡', '피부온도', '혈중산소농도', '일상', '상태']
+----+----+--------+------------+----+----+
|심박|호흡|피부온도|혈중산소농도|일상|상태|
+----+----+--------+------------+----+----+
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0   |
|0   |0   |0       |0           |1   |0 

In [124]:
from pyspark.sql.functions import col, percentile_approx  # percentile_approx 추가
# "상태(Status) == 0" 인 데이터만 필터링
data_0 = data.filter(col("상태") == 0)

# IQR을 적용할 컬럼 리스트 지정
iqr_columns = ['HeartRate', 'BreathRate', 'SPO2', 'SkinTemperature', 'SleepPhase', 'SleepScore', 'WalkingSteps', 'StressIndex', 'ActivityIntensity', 'CaloricExpenditure']

# IQR 계산 및 이상치 제거
for column in iqr_columns:
    # Q1, Q3 계산
    q1, q3 = data_0.select(
        percentile_approx(col(column), 0.25).alias("Q1"),
        percentile_approx(col(column), 0.75).alias("Q3")
    ).first()

    iqr = q3 - q1
    lower_bound = q1 - 6 * iqr  # 1.5 → 3배로 완화
    upper_bound = q3 + 6 * iqr

    # 이상치 제거 (해당 컬럼에서 IQR 범위를 벗어난 값 제거)
    data_0 = data_0.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

# 이상치 제거 후 "상태 = 1" 데이터와 다시 결합
data_final = data_0.union(data.filter(col("상태") == 1))
data_final = data_final.union(data.filter(col("상태") == 2))


                                                                                

In [125]:
data_final.groupby("상태").count().orderBy(col("count").desc()).show(40,truncate=False)



+----+-----+
|상태|count|
+----+-----+
|0   |46129|
|1   |18345|
|2   |3181 |
+----+-----+



                                                                                

In [None]:
# from sqlalchemy import create_engine
# # SQLAlchemy 엔진 생성
# ssl_cert_path = r'/home/azureuser/Desktop/config/DigiCertGlobalRootG2.crt.pem'
# engine = create_engine(
#     f"mysql+pymysql://human:!q1w2e3r4@human-mysql.mysql.database.azure.com/humandb",
#     connect_args={"ssl_ca": ssl_cert_path}
# )
# # spark to pandas 
# data = data_final.select('*').toPandas()
# # 학습 결과를 DB에 저장
# data.to_sql('modeling_final', con=engine, if_exists='replace', index=False)
# print("결과가 데이터베이스에 저장되었습니다!")

                                                                                

결과가 데이터베이스에 저장되었습니다!


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import OneVsRest, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler

# 특징 벡터 변환
feature_cols = ["HeartRate", "BreathRate", "SPO2", "SkinTemperature",
                "SleepPhase", "SleepScore", "WalkingSteps", 
                "StressIndex", "ActivityIntensity", "CaloricExpenditure"]
label_cols = ["심박", "호흡", "피부온도", "혈중산소농도"]

# VectorAssembler 사용하여 특징 벡터 변환
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data_transformed = assembler.transform(data).select("features", *label_cols)

# 다중 레이블 학습을 위해 반복 실행 (각 레이블마다 개별 모델 생성)
models = {}
for label in label_cols:
    print(f"Training model for {label}...")

    # 랜덤 포레스트 모델 정의 (각 레이블별 학습을 위해 labelCol 명확히 지정)
    rf = RandomForestClassifier(labelCol=label, featuresCol="features", numTrees=100)

    # OneVsRest 실행 시 labelCol 명확히 지정
    one_vs_rest = OneVsRest(classifier=rf, labelCol=label)
    model = one_vs_rest.fit(data_transformed)

    # 모델 저장
    models[label] = model
    print(f"Model for {label} trained successfully!")


In [None]:
# 모든 레이블에 대해 예측 수행 (join 최적화)
predictions = data_transformed.cache()

for label, model in models.items():
    print(f"Predicting for {label}...")
    pred = model.transform(predictions).select("features", label, "prediction")
    
    # 새로운 열 이름 변경 (충돌 방지)
    pred = pred.withColumnRenamed("prediction", f"prediction_{label}")
    
    # `join` 최적화
    predictions = predictions.drop(label).join(pred, on="features", how="inner")

# 최적화된 예측 결과 출력
predictions.show(10)

In [None]:
# 모든 레이블에 대해 예측 수행
predictions = data_transformed
for label, model in models.items():
    pred = model.transform(data_transformed).select("features", label, "prediction")
    predictions = predictions.join(pred, on=["features", label], how="inner")

# 예측 결과 출력
predictions.show(10)

### retrival을 위한 데이터 전처리

In [None]:
# from pyspark.ml.feature import VectorAssembler
# from pyspark.sql.functions import udf
# import json
# from pyspark.sql.types import StringType

# # VectorAssembler로 수치형 데이터 벡터화
# assembler = VectorAssembler(
#     inputCols=['HeartRate', 'BreathRate', 'SPO2', 'SkinTemperature', 'SleepPhase', 'SleepScore', 'WalkingSteps', 'StressIndex', 'ActivityIntensity', 'CaloricExpenditure'],
#     outputCol="features"
# )

# vectorized_df = assembler.transform(data)

# # 벡터 데이터를 JSON으로 변환하는 UDF
# vector_to_json = udf(lambda v: json.dumps(v.toArray().tolist()), StringType())

# # JSON으로 변환하여 저장할 준비
# vectorized_df = vectorized_df.withColumn("features_json", vector_to_json(vectorized_df["features"]))



In [None]:
# # 필요한 컬럼 선택
# final_df = vectorized_df.select("features_json", "Reason")

# # 결과 확인
# final_df.show(truncate=False)

In [None]:
# # json 데이터 sql로 보내기
# from sqlalchemy import create_engine
# # SQLAlchemy 엔진 생성
# ssl_cert_path = r'/home/azureuser/Desktop/config/DigiCertGlobalRootG2.crt.pem'
# engine = create_engine(
#     f"mysql+pymysql://human:!q1w2e3r4@human-mysql.mysql.database.azure.com/humandb",
#     connect_args={"ssl_ca": ssl_cert_path}
# )
# # spark to pandas 
# final_df = final_df.toPandas()
# # 학습 결과를 DB에 저장
# final_df.to_sql('modeling_retrival', con=engine, if_exists='replace', index=False)
# print("결과가 데이터베이스에 저장되었습니다!")