<a href="https://colab.research.google.com/github/kimheeseo/LSCNS/blob/main/colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**1.preform.py**

In [1]:
import pandas as pd
import os
from collections import Counter

def run_preform(choice, input_excel="ab.xlsx", desktop_path=None):
    """
    choice: "1" or "2"
    input_excel: ab.xlsx (default)
    desktop_path: 사용자 바탕화면 경로 (default: 현재 사용자)
    """
    if desktop_path is None:
        desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")

    if choice == "1":
        folder_name = "month_rit_number"
        column_title = "rit_no"
        prefix_length = 3
    elif choice == "2":
        folder_name = "month_resin_type"
        column_title = "resin_type"
        prefix_length = 1
    else:
        raise ValueError("잘못된 choice 값입니다. 1 또는 2만 허용.")

    folder_path = os.path.join(desktop_path, folder_name)
    os.makedirs(folder_path, exist_ok=True)
    file_path = os.path.join(desktop_path, input_excel)

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"입력 파일이 존재하지 않습니다: {file_path}")

    df = pd.read_excel(file_path, header=None, engine='openpyxl')
    df = df.drop_duplicates(subset=1).reset_index(drop=True)  # rit_no 중복 제거
    rows, _ = df.shape

    # 월별 데이터 및 카운터 초기화
    month_data = {f'mon_{str(m).zfill(2)}': [] for m in range(1, 13)}
    month_count = {f'mon_{str(m).zfill(2)}': Counter() for m in range(1, 13)}

    for col in range(rows):
        cell_value = str(df.iloc[col, 2])
        if cell_value.startswith("2025") and len(cell_value) >= 6:
            month = cell_value[4:6]
            key = f'mon_{month}'
            if key in month_data:
                if column_title == 'rit_no':
                    value_main = str(df.iloc[col, 1])
                    value_work = str(df.iloc[col, 3])
                    prefix = value_main[:prefix_length]
                else:
                    value_main = str(df.iloc[col, 4])
                    value_work = str(df.iloc[col, 3])
                    prefix = value_main[:prefix_length]
                month_data[key].append([value_main, value_work])
                month_count[key][prefix] += 1

    # 요약 정보
    summary_lines = []
    summary_lines.append(f"📊 {column_title} 월별 분포 요약:\n")
    for month in sorted(month_count.keys()):
        counts = month_count[month]
        if counts:
            summary_lines.append(f"▶ {month}:")
            for prefix, cnt in counts.items():
                summary_lines.append(f"  - {prefix}: {cnt}개")
        else:
            summary_lines.append(f"▶ {month}: 없음")

    output_path = os.path.join(folder_path, f"mon_split_{column_title}.xlsx")
    with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
        df_summary = pd.DataFrame(summary_lines, columns=[f"{column_title}_summary"])
        df_summary.to_excel(writer, sheet_name="Summary", index=False)
        for month, data in month_data.items():
            if data:
                df_month = pd.DataFrame(data, columns=[column_title, "work_time"])
            else:
                df_month = pd.DataFrame(columns=[column_title, "work_time"])
            df_month.to_excel(writer, sheet_name=month, index=False)

    return output_path, summary_lines

**2.grouping.py**

In [2]:
import pandas as pd
import os

def run_grouping(input_folder, output_base_folder):
    """
    input_folder: 'grouped_by_prefix'
    output_base_folder: 'grouped_by_prefix_split'
    """
    os.makedirs(output_base_folder, exist_ok=True)
    results = []
    for file in os.listdir(input_folder):
        if not file.endswith(".xlsx"):
            continue
        file_path = os.path.join(input_folder, file)
        prefix = os.path.splitext(file)[0]

        df = pd.read_excel(file_path, header=None)
        header_row = df.iloc[[0]]
        df_body = df.iloc[1:].reset_index(drop=True)

        if df_body.shape[1] <= 3:
            print(f"⚠ '{file}'는 4번째 열이 없어 건너뜁니다.")
            continue

        df_body['__preform__'] = df_body[3].astype(str)
        unique_preforms = df_body['__preform__'].unique()

        output_folder = os.path.join(output_base_folder, prefix)
        os.makedirs(output_folder, exist_ok=True)

        for val in unique_preforms:
            group = df_body[df_body['__preform__'] == val].drop(columns='__preform__')
            group = group.replace("0", "")
            result_df = pd.concat([header_row, group], ignore_index=True)
            save_path = os.path.join(output_folder, f"{val}.xlsx")
            result_df.to_excel(save_path, index=False, header=False)
            results.append(save_path)
        print(f"✅ '{file}' 처리 완료 → {len(unique_preforms)}개 파일 저장됨")
    return results

**3.classification.py**

In [None]:
import pandas as pd
import os

def normalize_prefix(user_prefix):
    return user_prefix.upper().replace("O", "0")

def run_classification(prefix_list, input_excel="alls.xlsx", output_dir="grouped_by_prefix"):
    """
    prefix_list: ["20M", "L0E", ...] (이미 정규화된 상태로 전달 권장)
    input_excel: 엑셀 원본 파일명
    output_dir: 결과 폴더명
    """
    preform_prefix_list = [normalize_prefix(p) for p in prefix_list]
    if not preform_prefix_list:
        raise ValueError("입력된 프리폼 접두사가 없습니다.")

    if not os.path.exists(input_excel):
        raise FileNotFoundError(f"입력 파일이 존재하지 않습니다: {input_excel}")

    df_all = pd.read_excel(input_excel, header=None)
    header_row = df_all.iloc[[0]]
    df_data = df_all.iloc[1:]

    filtered_rows = []
    for idx in range(len(df_data)):
        val_col3 = df_data.iloc[idx, 2]
        if isinstance(val_col3, str) and len(val_col3) >= 2:
            if val_col3[-2] == '0':
                filtered_rows.append(df_data.iloc[idx])

    if not filtered_rows:
        raise ValueError("조건(3번째 열의 뒤에서 두 번째 문자가 '0')에 맞는 행이 없습니다.")

    df_filtered = pd.DataFrame(filtered_rows)
    df_final = pd.concat([header_row, df_filtered], ignore_index=True)
    df_final.to_excel("remove_not_zero.xlsx", index=False, header=False)

    df = pd.read_excel("remove_not_zero.xlsx", header=None)
    header = df.iloc[[0]]
    df_body = df.iloc[1:]

    os.makedirs(output_dir, exist_ok=True)
    saved_files = []

    for prefix in preform_prefix_list:
        group = df_body[df_body[3].astype(str).str[:3].str.upper() == prefix]
        if not group.empty:
            df_prefixed = pd.concat([header, group], ignore_index=True)
            filename = os.path.join(output_dir, f"{prefix}.xlsx")
            df_prefixed.to_excel(filename, index=False, header=False)
            saved_files.append(filename)

    return saved_files

**4.average.py**

In [None]:
import os
import pandas as pd

def run_average(root_folder):
    """
    root_folder: grouped_by_prefix_split
    """
    save_count = 0
    result_files = []
    for subfolder in os.listdir(root_folder):
        subfolder_path = os.path.join(root_folder, subfolder)
        if not os.path.isdir(subfolder_path):
            continue

        average_rows = []
        header_row = None

        for file in os.listdir(subfolder_path):
            if file.endswith(".xlsx") and not (file.endswith("_zerox.xlsx") or file.endswith("_average.xlsx")):
                file_path = os.path.join(subfolder_path, file)
                try:
                    df = pd.read_excel(file_path, engine='openpyxl')
                    df_zerox = df.where(df != 0, "")
                    filename_wo_ext = os.path.splitext(file)[0]
                    zerox_filename = f"{filename_wo_ext}_zerox.xlsx"
                    zerox_path = os.path.join(subfolder_path, zerox_filename)
                    df_zerox.to_excel(zerox_path, index=False)
                    save_count += 1
                    if save_count % 100 == 0:
                        print(f"✅ 총 {save_count}개 파일 저장 완료")

                    df_zerox = pd.read_excel(zerox_path, engine='openpyxl')
                    col_4_name = df_zerox.columns[3]
                    filtered_df = df_zerox[df_zerox[col_4_name].astype(str) == filename_wo_ext]

                    sum_series = []
                    count_series = []
                    for col in filtered_df.columns:
                        values = pd.to_numeric(filtered_df[col], errors='coerce')
                        values_for_sum = values.fillna(0)
                        values_for_count = values.notna().astype(int)
                        sum_series.append(values_for_sum.sum())
                        count_series.append(values_for_count.sum())

                    avg_values = []
                    for total, count in zip(sum_series, count_series):
                        avg = round(total / count, 4) if count > 0 else ""
                        avg_values.append(avg)

                    average_row = pd.DataFrame([avg_values], columns=df_zerox.columns)
                    average_row = average_row.astype("object")
                    average_row.iat[0, 1] = filename_wo_ext
                    first_row = df_zerox.iloc[[0]]
                    result_df = pd.concat([first_row, average_row], ignore_index=True)
                    average_filename = f"{filename_wo_ext}_average.xlsx"
                    average_path = os.path.join(subfolder_path, average_filename)
                    result_df.to_excel(average_path, index=False)
                except Exception as e:
                    print(f"⚠️ 오류 발생 ({file}): {e}")

        # 평균 행만 모아서 최종 파일 저장
        for file in os.listdir(subfolder_path):
            if file.endswith("_average.xlsx"):
                try:
                    df_avg = pd.read_excel(os.path.join(subfolder_path, file), engine='openpyxl')
                    if header_row is None:
                        header_row = df_avg.iloc[[0]]
                    last_row = df_avg.iloc[[-1]]
                    average_rows.append(last_row)
                except Exception as e:
                    print(f"⚠️ 평균 파일 처리 오류 ({file}): {e}")

        if average_rows and header_row is not None:
            final_df = pd.concat([header_row] + average_rows, ignore_index=True)
            final_filename = f"{subfolder}_final.xlsx"
            final_path = os.path.join(subfolder_path, final_filename)
            final_df.to_excel(final_path, index=False)
            result_files.append(final_path)
            print(f"📦 최종 요약 저장 완료: {final_filename}")
    return result_files

**5.auto_analyzer.py**

In [3]:
import os
import pandas as pd

def run_auto_analyzer(root_folder):
    """
    root_folder: grouped_by_prefix_split
    """
    # (출력 열 이름, final 엑셀 열 번호, 계산식 lambda 또는 None, 곱셈 계수 또는 None)
    column_info = [
        ("spoolno2", 1, None, None),
        ("OTDR length", 9, None, None),
        ("Attenuation 1310 I/E", 5, None, None),
        ("Attenuation 1310 O/E", 6, None, None),
        ("Attenuation 1383 I/E", 73, None, None),
        ("Attenuation 1383 O/E", 74, None, None),
        ("Attenuation 1550 I/E", 7, None, None),
        ("Attenuation 1550 O/E", 8, None, None),
        ("Attenuation 1625 I/E", 75, None, None),
        ("Attenuation 1625 O/E", 76, None, None),
        ("MFD 1310nm I/E", 12, None, None),
        ("MFD 1310nm O/E", 13, None, None),
        ("", None, None, None),
        ("", None, None, None),
        ("", None, None, None),
        ("", None, None, None),
        ("", None, None, None),
        ("", None, None, None),
        ("Cutoff 2m I/E", 14, None, None),
        ("Cutoff 2m O/E", 15, None, None),
        ("Cutoff 22m", 24, None, None),
        ("delta 2m-22m", None, 'delta', None),
        ("Mac value", None, 'mac', None),
        ("Clad Dia. I/E", 16, None, None),
        ("Clad Dia. O/E", 17, None, None),
        ("Clad Ovality I/E", 18, None, None),
        ("Clad Ovality O/E", 19, None, None),
        ("Core Ovality I/E", 20, None, None),
        ("Core Ovality O/E", 21, None, None),
        ("ECC I/E", 22, None, None),
        ("ECC O/E", 23, None, None),
        ("Zero Dispersion Wave.", 30, None, None),
        ("dispslope at ZDW", 31, None, None),
        ("Dispersion 1285", 32, None, None),
        ("Dispersion 1290", 33, None, None),
        ("Dispersion 1330", 34, None, None),
        ("Dispersion 1550", 35, None, None),
        ("", None, None, None),
        ("PMD", 37, None, None),
        ("R7.5mm 1t 1550", 26, 'scale', 0.1),
        ("R7.5mm 1t 1625", 69, 'scale', 0.1),
        ("R10mm 1t 1550", 70, 'scale', 0.1),
        ("R10mm 1t 1625", 71, 'scale', 0.1),
        ("R15mm 10t 1550", 81, 'scale', 0.5),
        ("R15mm 10t 1625", 82, 'scale', 0.5),
    ]

    for subfolder in os.listdir(root_folder):
        subfolder_path = os.path.join(root_folder, subfolder)
        if not os.path.isdir(subfolder_path):
            continue

        final_files = [f for f in os.listdir(subfolder_path) if f.endswith("final.xlsx")]
        if not final_files:
            print(f"❌ '{subfolder}' 폴더에는 final.xlsx 파일이 없습니다.")
            continue

        final_path = os.path.join(subfolder_path, final_files[0])
        try:
            df = pd.read_excel(final_path, header=None, engine='openpyxl')
            df_result = pd.DataFrame()
            n_rows = len(df) - 2  # 3번째 줄부터 시작

            for col_idx, (title, src_col, calc_type, factor) in enumerate(column_info):
                df_result.loc[0, col_idx] = title  # 첫 행 제목

                if calc_type is None:
                    if src_col is not None:
                        col_values = df.iloc[2:, src_col].tolist()
                        for row_idx, val in enumerate(col_values):
                            df_result.loc[row_idx + 1, col_idx] = val
                elif calc_type == 'delta':
                    col_20 = df_result.iloc[1:, 19].astype(float)
                    col_21 = df_result.iloc[1:, 20].astype(float)
                    df_result.iloc[1:, col_idx] = (col_20 - col_21).round(4)
                elif calc_type == 'mac':
                    col_12 = pd.to_numeric(df_result.iloc[1:, 11], errors='coerce')
                    col_19 = pd.to_numeric(df_result.iloc[1:, 18], errors='coerce')
                    df_result.iloc[1:, col_idx] = ((col_12 / col_19) * 1000).round(2)
                elif calc_type == 'scale':
                    raw_values = df.iloc[2:, src_col]
                    scaled_values = []
                    for val in raw_values:
                        try:
                            scaled_values.append(round(float(val) * factor, 4))
                        except:
                            scaled_values.append("")
                    for row_idx, val in enumerate(scaled_values):
                        df_result.loc[row_idx + 1, col_idx] = val

            save_path = os.path.join(subfolder_path, f"{subfolder}_final_result_report.xlsx")
            df_result.to_excel(save_path, index=False, header=False)
            print(f"✅ 저장 완료: {save_path}")

        except Exception as e:
            print(f"⚠️ 오류 발생 ({subfolder}): {e}")

**main.py**

In [None]:
import os
import time
from preform import run_preform
from classification import run_classification
from grouping import run_grouping
from average import run_average
from auto_analyzer import run_auto_analyzer

def main():
    # 1. 첫 번째 자동 입력
    print("월별 어떤 값을 추출할 것입니까?\n1. rit_no\n2. resin_type")
    time.sleep(2)
    print("1이 자동으로 입력됩니다.")
    result_path_1, summary_1 = run_preform("1")
    print("\n[1번 결과]")
    print("\n".join(summary_1))
    print(f"✔️ preform 완료: {result_path_1}\n")

    # 2. 두 번째 자동 입력
    time.sleep(2)
    print("2이 자동으로 입력됩니다.")
    result_path_2, summary_2 = run_preform("2")
    print("\n[2번 결과]")
    print("\n".join(summary_2))
    print(f"✔️ preform 완료: {result_path_2}\n")

    # 3. 관심 프리폼 접두사 입력
    print("\n관심 프리폼 접두사를 하나씩 입력하세요. (끝내려면 'a' 입력)")
    prefixes = []
    while True:
        pf = input("▶ 프리폼 입력: ").strip()
        if pf.lower() == 'a':
            break
        if pf:
            prefixes.append(pf)
    if not prefixes:
        print("❗입력된 프리폼이 없습니다. 프로그램을 종료합니다.")
        return

    desktop_path = os.path.join(os.path.expanduser("~"), "Desktop")

    # 4. classification.py 실행
    print("\n[2/5] classification.py 실행중...")
    classified_files = run_classification(prefixes)
    print(f"✔️ classification 완료: {len(classified_files)}개 파일 분류")

    # 5. grouping.py 실행
    print("\n[3/5] grouping.py 실행중...")
    grouped_files = run_grouping("grouped_by_prefix", "grouped_by_prefix_split")
    print(f"✔️ grouping 완료: {len(grouped_files)}개 파일 생성")

    # 6. average.py 실행
    print("\n[4/5] average.py 실행중...")
    average_files = run_average(os.path.join(desktop_path, "grouped_by_prefix_split"))
    print(f"✔️ average 완료: {len(average_files)}개 요약 파일 생성")

    # 7. auto_analyzer.py 실행
    print("\n[5/5] auto_analyzer.py 실행중...")
    run_auto_analyzer(os.path.join(desktop_path, "grouped_by_prefix_split"))
    print(f"✔️ auto_analyzer 완료")

    print("\n🎉 전체 자동화 파이프라인이 종료되었습니다.")

if __name__ == "__main__":
    main()