# Transforamation - 데이터 전처리

# 1. bronze layer 에서 추출된 데이터 불러오기

In [None]:
df = spark.read.table('training.default.bronze')

In [None]:
# dataframe 출력 (칼럼명은 실제 기업에서 사용중인 칼럼명으로서 대외비 사항인 관계로 부득이하게 출력을 하지 못했습니다.)
df.show()

In [None]:
# 기술통계(개수, 평균, 표준편차, 최대값, 최소값) 출력
df.describe().show()

+-------+-------+--------------------+--------------------+
|summary| eqp_id|           record_id|                data|
+-------+-------+--------------------+--------------------+
|  count|  11671|               11671|               11671|
|   mean|   null|4.645852527284817...|                null|
| stddev|   null|2.634030846180855...|                null|
|    min|003MNUO|      44353453588310|++0eIDIwMjYwODA2M...|
|    max|abDVTAR| 9220932448465947920|zvwjIDIwMjMwOTIxM...|
+-------+-------+--------------------+--------------------+



In [None]:
# 데이터 수 출력
df.count()

11671

# 2. 데이터 전처리

### 1) 컬럼명 바꾸기(대외비 보호)

In [None]:
# 칼럼명 변경후 StructType(2차원 필드)으로 정의
# Apache Spark는 Java로 만들어진 프로젝트이기 때문에 각 칼럼의 raw data를 Java의 타입으로 각각 정의해줘야 합니다.|
# 주어진 데이터가 각각 칼럼안의 칼럼으로 구성되어 있어 StructType으로 2차원데이터를 구현했습니다

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, LongType, BooleanType
new_schema = StructType([
    StructField("id", StringType()),
    StructField("data_create_time", TimestampType()),
    StructField("record_id", LongType()),
    StructField("a", StructType([
        StructField("aa", StringType()),
        StructField("ab", IntegerType()),
        StructField("ac", StringType()),
        StructField("ad", StringType()),
        StructField("ae", StringType()),
        StructField("af", StringType()),
        StructField("ag", StringType()),
        StructField("ah", IntegerType()),
        StructField("ai", StringType()),
        StructField("aj", StringType()),
        StructField("ak", StringType()),
        StructField("al", IntegerType()),
        StructField("am", LongType()),
        StructField("an", LongType()),
                                ])),
    
    StructField("b", StructType([
        StructField("ba", StringType()),
        StructField("bb", StringType()),
        StructField("bc", StringType()),
        StructField("ad", StringType()),
        StructField("ae", IntegerType()),
        StructField("af", StringType()),
        StructField("ag", StringType()),
        StructField("ah", StringType()),
        StructField("ai", StringType()),
        StructField("aj", StringType()),
        StructField("ak", StringType()),
        StructField("al", StringType()),
        StructField("am", StringType()),
        StructField("an", IntegerType()),
        StructField("ao", StringType()),
        StructField("ap", BooleanType()),
        StructField("aq", IntegerType()),
                                ])),
    
    StructField("data", StringType()),
 
])

In [None]:
# 칼럼 타입 재정의, 칼럼명 변경 후 전처리가 완료된 dataframe생성
df_renamed = spark.createDataFrame(df.rdd, new_schema)

In [None]:
# 전처리가 완료된 dataframe 출력
df_renamed.show()

+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|     id|   data_create_time|          record_id|                   a|                   b|                data|
+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|7VBbJa4|2027-09-18 23:16:00|3890904313625369530|{AAA, 449828909, ...|{AAAA, YW8Q, b9Lb...|ia0WIDIwMjcwOTE5M...|
|I7QS5WL|2032-01-06 03:03:00|1919815607339019300|{AAA, 993055219, ...|{AAAA, T5IN, b9Lb...|scUhIDIwMzIwMTA2M...|
|PWaQ00W|2024-12-19 10:14:00|4403797140526972380|{AAA, 921859806, ...|{AAAA, T5IN, 2IJV...|tpsrIDIwMjQxMjE5M...|
|7VBbJa4|2027-09-19 23:11:00|3890904313625369530|{AAA, 449828909, ...|{AAAA, YW8Q, b9Lb...|lfoOHDIwMjcwOTIwF...|
|I7QS5WL|2032-01-06 03:08:00|1919815607339019300|{AAA, 993055219, ...|{AAAA, T5IN, b9Lb...|qPMmIDIwMzIwMTA2M...|
|7VBbJa4|2027-09-20 15:31:00|8972448469089846440|{AAA, 1836905390,...|{AAAA, RMT3, 2IJV...|n/UuI

In [None]:
# 전처리가 완료된 데이터프레임의 스키마 출력
df_renamed.printSchema()

root
 |-- id: string (nullable = true)
 |-- data_create_time: timestamp (nullable = true)
 |-- record_id: long (nullable = true)
 |-- a: struct (nullable = true)
 |    |-- aa: string (nullable = true)
 |    |-- ab: integer (nullable = true)
 |    |-- ac: string (nullable = true)
 |    |-- ad: string (nullable = true)
 |    |-- ae: string (nullable = true)
 |    |-- af: string (nullable = true)
 |    |-- ag: string (nullable = true)
 |    |-- ah: integer (nullable = true)
 |    |-- ai: string (nullable = true)
 |    |-- aj: string (nullable = true)
 |    |-- ak: string (nullable = true)
 |    |-- al: integer (nullable = true)
 |    |-- am: long (nullable = true)
 |    |-- an: long (nullable = true)
 |-- b: struct (nullable = true)
 |    |-- ba: string (nullable = true)
 |    |-- bb: string (nullable = true)
 |    |-- bc: string (nullable = true)
 |    |-- ad: string (nullable = true)
 |    |-- ae: integer (nullable = true)
 |    |-- af: string (nullable = true)
 |    |-- ag: string (nul

In [None]:
# 11671개의 레코드 중 결측치가 적은 레코드 9개 출력
# a.am : data칼럼 안 2차원 배열의 총 레코드 수
# a.an : data칼럼 안 2차원 배열의 총 결측치 수

from pyspark.sql import functions as F
filtered_df = df_renamed.filter(F.col("a.am") / F.col("a.an") < 0.001)
filtered_df.show()

+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|     id|   data_create_time|          record_id|                   a|                   b|                data|
+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|4PEE803|2023-06-02 06:21:00|4269170469969724980|{AAA, 1807480370,...|{AAAA, RMT3, b9Lb...|8/kvIDIwMjMwNjAyM...|
|H6VMCFC|2026-01-10 10:19:00|3552028701557523310|{AAA, 1866077324,...|{AAAA, RMT3, b9Lb...|uqwbHDIwMjYwMTEwF...|
|1G8TW3J|2040-04-12 23:46:00| 931117701050083260|{AAA, 1612291914,...|{AAAA, RMT3, 2IJV...|iJYaIDIwNDAwNDEzM...|
|CW9J9VG|2050-04-04 05:58:00|8767117699235549170|{AAA, 1512679046,...|{AAAA, 522J, 2IJV...|yugkIDIwNTAwNDA0M...|
|MQ2FWMH|2024-02-10 02:21:00| 964520930587333440|{AAA, 1131422925,...|{AAAA, AZ8T, 6XaS...|45cjHDIwMjQwMjEwF...|
|XEVbYST|2034-10-12 03:59:00|1411758500928517400|{AAA, 625507585, ...|{AAAA, RMT3, 2IJV...|5cMiI

### 2) 필요한 라이브러리를 Databricks클라우드 환경에 설치

In [None]:
%sh
pip install --upgrade pip

Collecting pip
  Using cached pip-23.1.2-py3-none-any.whl (2.1 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 22.2.2
    Uninstalling pip-22.2.2:
      Successfully uninstalled pip-22.2.2
Successfully installed pip-23.1.2


In [None]:
# snappy 압축해제를 위한 라이브러리
%sh 
pip install python-snappy

Collecting python-snappy
  Using cached python_snappy-0.6.1-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (55 kB)
Installing collected packages: python-snappy
Successfully installed python-snappy-0.6.1


### 3) 암호화된 데이터 복호화

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import base64
import snappy


# Base64 디코딩 및 Snappy 압축 해제를 위한 UDF 정의
def decode_base64_and_decompress(data):
    decoded_data = base64.b64decode(data)
    decompressed_data = snappy.uncompress(decoded_data)
    return decompressed_data.decode('utf-8')

decode_udf = udf(lambda x : decode_base64_and_decompress(x), StringType())

# 각 컬럼에 UDF 적용
decoded_df = filtered_df.withColumn("data", decode_udf(col("data")))


In [None]:
# 결과 확인
decoded_df.show()

# data 칼럼의 레코드가 8/kvIDIwMjMwNjAyM... -> 20230602000000000...로 복호화됨

+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|     id|   data_create_time|          record_id|                   a|                   b|                data|
+-------+-------------------+-------------------+--------------------+--------------------+--------------------+
|4PEE803|2023-06-02 06:21:00|4269170469969724980|{AAA, 1807480370,...|{AAAA, RMT3, b9Lb...|20230602000000000...|
|H6VMCFC|2026-01-10 10:19:00|3552028701557523310|{AAA, 1866077324,...|{AAAA, RMT3, b9Lb...|20260110000000000...|
|1G8TW3J|2040-04-12 23:46:00| 931117701050083260|{AAA, 1612291914,...|{AAAA, RMT3, 2IJV...|20400413000000000...|
|CW9J9VG|2050-04-04 05:58:00|8767117699235549170|{AAA, 1512679046,...|{AAAA, 522J, 2IJV...|20500404000000000...|
|MQ2FWMH|2024-02-10 02:21:00| 964520930587333440|{AAA, 1131422925,...|{AAAA, AZ8T, 6XaS...|20240210000000000...|
|XEVbYST|2034-10-12 03:59:00|1411758500928517400|{AAA, 625507585, ...|{AAAA, RMT3, 2IJV...|20341

### 4) 디코딩 및 압축해제 된 데이터 저장 - gold layer로 넘기기

In [None]:
# decoded_df.write.saveAsTable("training.default.silver_decoded")

### 5) 9개의 레코드를 각각 하나의 테이블로 전환

In [None]:
decoded_df = spark.read.table("training.default.silver_decoded")

In [None]:
data_string1.split('^')

['20341012000000000|20341012000000100|20341012000000200|20341012000000300|20341012000000400|20341012000000500|20341012000000600|20341012000000700|20341012000000800|20341012000000900|20341012000001000|20341012000001100|20341012000001200|20341012000001300|20341012000001400|20341012000001500|20341012000001600|20341012000001700|20341012000001800|20341012000001900|20341012000002000|20341012000002100|20341012000002200|20341012000002300|20341012000002400|20341012000002500|20341012000002600|20341012000002700|20341012000002800|20341012000002900|20341012000003000|20341012000003100|20341012000003200|20341012000003300|20341012000003400|20341012000003500|20341012000003600|20341012000003700|20341012000003800|20341012000003900|20341012000004000|20341012000004100|20341012000004200|20341012000004300|20341012000004400|20341012000004500|20341012000004600|20341012000004700|20341012000004800|20341012000004900|20341012000005000|20341012000005100|20341012000005200|20341012000005300|20341012000005400|20341012

In [None]:
# 열간 구분자 ^, 행간 구분자 |로 2차원 테이터 생성
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col, to_timestamp, expr,lit

def convert_string_to_df(data_string):
    splited_string = data_string.split('^')
    
    timestamp_col = splited_string[0].split('|')
    step_col = splited_string[1].split('|')
    value_col = splited_string[2].split('|')
    us_col = splited_string[3].split('|')
    ls_col = splited_string[4].split('|')
    validity_col = splited_string[5].split('|')

    # Dataframe을 만들기 위한 형태변환
    data = []
    flag = 0 
    idx = 0    
    while flag == 0:

        try:
            a = timestamp_col[idx]
            b = step_col[idx]
            c = value_col[idx]
            d = us_col[idx]
            e = ls_col[idx]
            f = validity_col[idx]

            if c == 'NaN' :
                pass
            else :
                #data.append((a, c))
                data.append((a, b, c, d, e, f))
            idx += 1
        except:
            flag = 1
            
    # 스키마 생성
    schema = StructType([
    StructField("time", StringType()),
    StructField("cs", StringType()),
    StructField("value", StringType()),
    StructField("lower_spec", StringType()),
    StructField("upper_spec", StringType()),
    StructField("validity", StringType()),
                    ])
    
    # dataframe 생성
    df = spark.createDataFrame(data=data, schema=schema)

    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
    
    # data_create_time 컬럼을 timestamp 타입으로 변환
    df = df.withColumn("time", to_timestamp(col("time"),"yyyyMMddHHmmssSSS"))
    df = df.withColumn("value", col("value").cast("double"))
    df = df.withColumn("lower_spec", col("lower_spec").cast("double"))
    df = df.withColumn("upper_spec", col("upper_spec").cast("double"))

    return df

In [None]:
data_string1 = decoded_df.take(10)[0][5]
data_string2 = decoded_df.take(10)[1][5]
data_string3 = decoded_df.take(10)[2][5]
data_string4 = decoded_df.take(10)[3][5]
data_string5 = decoded_df.take(10)[4][5]
data_string6 = decoded_df.take(10)[5][5]
data_string7 = decoded_df.take(10)[6][5]
data_string8 = decoded_df.take(10)[8][5]
data_string9 = decoded_df.take(10)[9][5]

df1 = convert_string_to_df(data_string1)
df2 = convert_string_to_df(data_string2)
df3 = convert_string_to_df(data_string3)
df4 = convert_string_to_df(data_string4)
df5 = convert_string_to_df(data_string5)
df6 = convert_string_to_df(data_string6)
df7 = convert_string_to_df(data_string7)
df8 = convert_string_to_df(data_string8)
df9 = convert_string_to_df(data_string9)

df1 = df1.withColumn("time", expr("time - INTERVAL 12 YEARS"))
df2 = df2.withColumn("time", expr("time - INTERVAL 5 YEARS"))
df3 = df3.withColumn("time", expr("time - INTERVAL 1 YEARS"))
df4 = df4.withColumn("time", expr("time - INTERVAL 4 YEARS"))
df5 = df5.withColumn("time", expr("time - INTERVAL 27 YEARS"))
df6 = df6.withColumn("time", expr("time - INTERVAL 1 YEARS"))
df7 = df7.withColumn("time", expr("time - INTERVAL 0 YEARS"))
df8 = df8.withColumn("time", expr("time - INTERVAL 17 YEARS"))
df9 = df9.withColumn("time", expr("time - INTERVAL 1 YEARS"))

df1 = df1.withColumn("id", lit(decoded_df.take(10)[0][0]))
df2 = df2.withColumn("id", lit(decoded_df.take(10)[1][0]))
df3 = df3.withColumn("id", lit(decoded_df.take(10)[2][0]))
df4 = df4.withColumn("id", lit(decoded_df.take(10)[3][0]))
df5 = df5.withColumn("id", lit(decoded_df.take(10)[4][0]))
df6 = df6.withColumn("id", lit(decoded_df.take(10)[5][0]))
df7 = df7.withColumn("id", lit(decoded_df.take(10)[6][0]))
df8 = df8.withColumn("id", lit(decoded_df.take(10)[8][0]))
df9 = df9.withColumn("id", lit(decoded_df.take(10)[9][0]))

### 6) 결과

In [None]:
display(df1)  # upper_spec과 lower_spec은 모두 같다 유의미한 데이터는 value밖에 없음 -> feature가 다 같아서 머신러닝을 돌릴수가 없다

time,cs,value,lower_spec,upper_spec,validity,id
2022-10-12T00:00:00.000+0000,step0,0.7462742731831129,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.100+0000,step0,0.5908762536554375,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.200+0000,step0,0.7156824964975331,-4207.775320051697,3873.321841797008,F,XEVbYST
2022-10-12T00:00:00.300+0000,step0,1.3013396880367225,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.400+0000,step0,1.9268138382523747,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.500+0000,step0,2.758611200066647,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.600+0000,step0,2.77985506960018,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.700+0000,step0,2.780590383608615,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.800+0000,step0,2.393093278946119,-4207.775320051697,3873.321841797008,T,XEVbYST
2022-10-12T00:00:00.900+0000,step0,3.255230473276993,-4207.775320051697,3873.321841797008,T,XEVbYST


In [None]:
# id로 그룹화 -> 각 공정 장비별 데이터 확인
from pyspark.sql.functions import min, max, avg, sum, stddev, expr

agg_df = merged_df.groupBy('id').agg(
    min('value').alias('min_value'),
    max('value').alias('max_value'),
    avg('value').alias('avg_value'),
    expr('percentile_approx(value, 0.5)').alias('median_value'),
    sum('value').alias('sum_value'),
    stddev('value').alias('std_value')
)

display(agg_df)


id,min_value,max_value,avg_value,median_value,sum_value,std_value
XEVbYST,-76.00042803695712,7.648109730337248,-23.506747021655947,-24.773614172065507,-159775.35950619547,17.53501573233937
NaGXJE1,-24.483684864853878,54.16974102664124,16.92441257715337,17.615694628071655,148545.56918967512,17.507958242120065
4PEE803,-41.690770590749615,29.386629511485467,-14.095848210340604,-14.99256441585494,-132937.94447172224,14.809203708562569
H6VMCFC,-23.835979816930777,29.49735293879746,5.044909272411667,5.591676744376673,27298.00407301953,13.747083016432544
CW9J9VG,-2.5189442130302098,78.67932956356701,41.81490792556921,47.62024145080135,309597.5782809144,18.057718847849102
MQ2FWMH,-22.275374308680227,47.27260777674275,11.928752475686547,12.105458879746148,83191.119765438,18.15389887873317
ES5EZb9,-15.835808501989488,47.99792334148774,15.02192164125922,16.504524064376504,88178.68003419162,13.965963379059096
1G8TW3J,-21.97419886004947,36.64047830650508,5.6568312168400725,4.693165957117209,29296.728872014733,14.379439734461949
KaNM5T0,-7.772436882227855,19.33477781910081,6.961737317772145,6.506515886538618,8187.003085700042,5.933821963261426


#### 7-1) 시각화를 위해 value칼럼을 남겨두고 나머지는 버리기

In [None]:
display(df1)

time,value,id
2022-10-12T00:00:00.000+0000,0.7462742731831129,XEVbYST
2022-10-12T00:00:00.100+0000,0.5908762536554375,XEVbYST
2022-10-12T00:00:00.200+0000,0.7156824964975331,XEVbYST
2022-10-12T00:00:00.300+0000,1.3013396880367225,XEVbYST
2022-10-12T00:00:00.400+0000,1.9268138382523747,XEVbYST
2022-10-12T00:00:00.500+0000,2.758611200066647,XEVbYST
2022-10-12T00:00:00.600+0000,2.77985506960018,XEVbYST
2022-10-12T00:00:00.700+0000,2.780590383608615,XEVbYST
2022-10-12T00:00:00.800+0000,2.393093278946119,XEVbYST
2022-10-12T00:00:00.900+0000,3.255230473276993,XEVbYST


### 8) 모든 데이터프레임 합쳐서 하나의 파일로 저장 -> gold layer로 넘기기

In [None]:
# 데이터프레임들을 리스트로 저장
dataframes = [df1, df2, df3, df4, df5, df6, df7, df8, df9]

# 데이터프레임들을 합치기
merged_df = dataframes[0]
for df in dataframes[1:]:
    merged_df = merged_df.union(df)

In [None]:
merged_df.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- cs: string (nullable = true)
 |-- value: double (nullable = true)
 |-- lower_spec: double (nullable = true)
 |-- upper_spec: double (nullable = true)
 |-- validity: string (nullable = true)
 |-- id: string (nullable = false)



In [None]:
merged_df.write.saveAsTable('training.default.gold')

In [None]:
merged_df.write.saveAsTable('training.default.gold_with_full_feature')

In [None]:
merged_df.groupBy('id').avg().show()

+-------+-------------------+
|     id|         avg(value)|
+-------+-------------------+
|XEVbYST|-23.506747021655947|
|NaGXJE1|  16.92441257715337|
|4PEE803|-14.095848210340604|
|H6VMCFC|  5.044909272411667|
|CW9J9VG|  41.81490792556921|
|MQ2FWMH| 11.928752475686549|
|ES5EZb9|  15.02192164125922|
|1G8TW3J| 5.6568312168400725|
|KaNM5T0|  6.961737317772145|
+-------+-------------------+



In [None]:
display(merged_df)




time,value,id
2022-10-12T00:00:00.000+0000,0.7462742731831129,XEVbYST
2022-10-12T00:00:00.100+0000,0.5908762536554375,XEVbYST
2022-10-12T00:00:00.200+0000,0.7156824964975331,XEVbYST
2022-10-12T00:00:00.300+0000,1.3013396880367225,XEVbYST
2022-10-12T00:00:00.400+0000,1.9268138382523747,XEVbYST
2022-10-12T00:00:00.500+0000,2.758611200066647,XEVbYST
2022-10-12T00:00:00.600+0000,2.77985506960018,XEVbYST
2022-10-12T00:00:00.700+0000,2.780590383608615,XEVbYST
2022-10-12T00:00:00.800+0000,2.393093278946119,XEVbYST
2022-10-12T00:00:00.900+0000,3.255230473276993,XEVbYST


In [None]:
# End of file