In [None]:
# label preprocessing code layout

"""
*원시 json 라벨 파일들 샘플링 및 지정 폴더에 복사
*원시 json 라벨 파일들 병합 -> 단일 데이터프레임
*필요 컬럼 추출
    *images.fname
    *images.width, height
    *environments: "_value"
    *annotations: "bbox", "id", "status" 
    *categories: "id", "name"
    *disease_class
    *disease_cause_method

*추출 컬럼 데이터 분리(각 데이터는 딕셔너리 데이터를 요소로 갖는 리스트 형태)
    *딕셔너리 변환: json type -> pandas type
        *변환 전 type: str
        *변환 후 type: list
        *변환 case1(None 없는 경우): .replace("'", "\"")
        *변환 case2(None 있는 경우): .replace("None", "null").replace("'", "\"")
            *주의: pandas에서도 {'key':'value'}, {'key': None}와 같이 (', None)로 출력되기 때문에 헷갈리기 쉽다. 출력 형태는 같아도 내부에서 인식하는 type이 다름을 유의할 것

    *딕셔너리 분리: key, value
    *value 변환(문자열 외 다른 데이터형이 필요한 경우): str -> list, number
        *str -> list변환: ast.literal_eval(arg)
        *str -> num_type변환: 대상 리스트 내에서 num_type(arg) 반복 수행(예: list_num = [float(itme) for item in list if item is not None])
        *주의
            *위의 과정으로 분리된 value가 리스트 형태일 때, 이 값의 type은 리스트가 아니라 리스트 형태의 문자열이므로, 리스트로 변환하는 작업이 필요하다.
            *각 리스트 내에서 '1.5'와 같이 숫자형 데이터가 ''안에 있다면, 숫자형 문자열이므로 숫자형 데이터로 변환하는 작업이 필요하다(자동 변환 안됨)
"""

In [3]:
import pandas as pd
import json
import numpy as np
import ast
import os
from pathlib import Path
import shutil

In [None]:
# 원시 데이터 샘플링

input_path_1 = './data_raw/train_label/TL_01.딸기_001.설향_01.정상'
input_path_2 = './data_raw/train_label/TL_01.딸기_001.설향_02.역병'
input_path_3 = './data_raw/train_label/TL_01.딸기_001.설향_03.시들음병'
input_path_4 = './data_raw/train_label/TL_01.딸기_001.설향_04.잎끝마름'
input_path_5 = './data_raw/train_label/TL_01.딸기_001.설향_05.황화'
output_path_1 = './data_raw/train_label/sampling_normal'
output_path_2 = './data_raw/train_label/sampling_blight'
output_path_3 = './data_raw/train_label/sampling_wilt'
output_path_4 = './data_raw/train_label/sampling_scorch'
output_path_5 = './data_raw/train_label/sampling_chlorosis'

input_paths = [input_path_1, input_path_2, input_path_3, input_path_4, input_path_5]
output_paths = [output_path_1, output_path_2, output_path_3, output_path_4, output_path_5]
num_files = 12000

def copy_files(input_path, output_path, num_files=12000):
    # output폴더를 여기서 생성하게 할 경우에는 아래의 Path 기능 사용
    # Path(output_path).mkdir(parents=True, exist_ok=True)
    
    files = [f for f in os.listdir(input_path) if f.endswith('.json')]
    files_to_copy = files[-num_files:]

    for file in files_to_copy:
        shutil.copy(os.path.join(input_path, file), os.path.join(output_path, file))

    return f"{len(files_to_copy)} files coppied from {input_path} to {output_path}"

In [None]:
# 원시 데이터 샘플링 실행

copy_results = [copy_files(input_path, output_path, num_files) for input_path, output_path in zip(input_paths, output_paths)]
copy_results

In [None]:
# 샘플링한 원시 데이터를 데이터프레임으로 변환 및 병합

def json_to_df(folder_path):
    df_list = []
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if filename.endswith('.json'):
            try:
                with open(file_path, 'r', encoding='utf-8') as file:
                    data = json.load(file)

                df_list.append(pd.json_normalize(data))
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON in file {filename}: {e}")

    if df_list:
        merged_df = pd.concat(df_list, ignore_index=True)
    else:
        print("No valid JSON files found")

    print('merged_df')
    return merged_df

In [None]:
folder_paths = [r'./data_raw/train_label/sampling_normal', r'./data_raw/train_label/sampling_blight',
                r'./data_raw/train_label/sampling_wilt', r'./data_raw/train_label/sampling_scorch',
                r'./data_raw/train_label/sampling_chlorosis']

merged_normal = json_to_df(folder_paths[0])
merged_blight = json_to_df(folder_paths[1])
merged_wilt = json_to_df(folder_paths[2])
merged_scorch = json_to_df(folder_paths[3])
merged_chlorosis = json_to_df(folder_paths[4])

In [None]:
merged_normal.to_csv('./data_raw/train_label/merged_df/merged_raw_normal.csv', encoding='utf-8', index=False)
merged_blight.to_csv('./data_raw/train_label/merged_df/merged_raw_blight.csv', encoding='utf-8', index=False)
merged_wilt.to_csv('./data_raw/train_label/merged_df/merged_raw_wilt.csv', encoding='utf-8', index=False)
merged_scorch.to_csv('./data_raw/train_label/merged_df/merged_raw_scorch.csv', encoding='utf-8', index=False)
merged_chlorosis.to_csv('./data_raw/train_label/merged_df/merged_raw_chlorosis.csv', encoding='utf-8', index=False)

In [4]:
# 데이터프레임 정제: 필요 컬럼 추출 및 변환

raw_normal = pd.read_csv('./data_raw/train_label/merged_df/merged_raw_normal.csv', encoding='utf-8')
raw_blight = pd.read_csv('./data_raw/train_label/merged_df/merged_raw_blight.csv', encoding='utf-8')
raw_wilt = pd.read_csv('./data_raw/train_label/merged_df/merged_raw_wilt.csv', encoding='utf-8')
raw_scorch = pd.read_csv('./data_raw/train_label/merged_df/merged_raw_scorch.csv', encoding='utf-8')
raw_chlorosis = pd.read_csv('./data_raw/train_label/merged_df/merged_raw_chlorosis.csv', encoding='utf-8')

In [6]:
# 필요 컬럼 추출
col_selection = ['environments', 'annotations', 'categories', 'images.fname', 'images.width', 'images.height', 'images.disease_class', 'images.disease_cause_method']

normal_data = raw_normal[col_selection].copy()
blight_data = raw_blight[col_selection].copy()
wilt_data = raw_wilt[col_selection].copy()
scorch_data = raw_scorch[col_selection].copy()
chlorosis_data = raw_chlorosis[col_selection].copy()

In [41]:
# 데이터 분리 및 변환: environments
    # '_value'에 해당하는 값들 분리하여 각각 컬럼으로 할당
    # environments의 id는 annotations, categories의 id와 불일치하므로 별도 처리
        # environments의 id는 모두 0~4의 값만 가짐: 각 부위에 대한 측정이 아니라, 측정 회차를 의미하는 것으로 추정됨(예: 각 샘플마다 5회 측정)
    # 모든 회차의 수치를 컬럼화 하기에는 비효율적이므로, 평균값으로 통일

normal_sample = normal_data[:500].copy()

def json_data_to_pandas(df, target_col, saving_col):
    df[saving_col] = None
    
    for idx, row in df.iterrows():
        json_str = row[target_col]
        str_corrected = json_str.replace("None", "null").replace("'", "\"")
        corrected_data = json.loads(str_corrected)
        df.at[idx, saving_col] = corrected_data

    dict_format = df[saving_col][0]
    extraction_keys = dict_format[0].keys()
    key_list = list(extraction_keys)
    df_pandas_type = df.drop([target_col], axis=1)

    print(type(df[saving_col]))
    return df_pandas_type, key_list

def extraction_env_values(df_pandas_type, key_list):
    df = df_pandas_type
    df['value_types'] = None
    value_names = []

    for i in range(len(key_list)):
        candidate = key_list[i]
        if candidate.endswith('_value'):
            value_names.append(candidate)

    for idx, row in df.iterrows():
        value_dict = {}
        dict_list = row['values']
        for dictionary in dict_list:
            for key, value in dictionary.items():
                if key in value_names:
                    if key not in value_dict:
                        value_dict[key] = [value]
                    else:
                        value_dict[key].append(value)
        df.at[idx, 'value_types'] = value_dict
    
    for idx, row in df.iterrows():
        value_to_split = row['value_types']
        for key, value in value_to_split.items():
            if key not in df.columns:
                df[key] = pd.NA
            df.at[idx, key] = value

    df_splited_env_values = df.drop(['values', 'value_types'], axis=1)
    print(df_splited_env_values.columns)
    return df_splited_env_values

def str_to_num(string_list):
    if isinstance(string_list, str):
        try:
            list_obj = ast.literal_eval(string_list)
        except ValueError:
            return string_list
    else:
        list_obj = string_list

    if all(item is None for item in list_obj):
        return None
    else:
        list_num = [float(item) for item in list_obj if item is not None]
        return list_num
    
def string_to_number(df):
    col_list = [col for col in df.columns if col.endswith('_value')]
    
    for col in col_list:
        df[col] = df[col].apply(str_to_num)
        
    return df

def calculate_avg(df):
    col_list = [col for col in df.columns if col.endswith('_value')]

    for value_name in col_list:
        for idx, row in df.iterrows():
            values = row[value_name]
            if values is not None:
                df.at[idx, value_name] = np.mean(values)

            else:
                df.at[idx, value_name] = None

    return df



In [42]:
normal_test, env_list = json_data_to_pandas(normal_sample, target_col='environments', saving_col='values')
normal_test_env = extraction_env_values(normal_test, env_list)
normal_test_values = string_to_number(normal_test_env)
normal_test_avg = calculate_avg(normal_test_values)

<class 'pandas.core.series.Series'>
Index(['annotations', 'categories', 'images.fname', 'images.width',
       'images.height', 'images.disease_class', 'images.disease_cause_method',
       'ti_value', 'hi_value', 'ci_value', 'ir_value', 'tl_value', 'ei_value',
       'pl_value', 'sr_value', 'cl_value', 'el_value', 'hl_value', 'pi_value',
       'rp_value'],
      dtype='object')


In [43]:
normal_test_avg.to_csv('normal_avg_test.csv', encoding='utf-8', index=False)

In [None]:
# 데이터 분리 및 변환: annotations, categories
    # annotation의 id와 categories의 id-name은 연결되므로 같이 처리