In [1]:
import glob
import os

csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results"
all_files = glob.glob(os.path.join(csv_dir, "*.csv"))

variables = [
    "Backpr", "Extruder", "Flow_Rate", "Inject", "Nozzle",
    "Piston_Position", "Piston_Pressure", "Piston_Velocity", "Retract", "Volume"
]
selected_files = {}

for var in variables:
    matched = [f for f in all_files if var.lower() in os.path.basename(f).lower() and 'RID014' in f]
    matched_sorted = sorted(matched)
    selected_files[var] = matched_sorted[:120]
# 결과 확인
for var, files in selected_files.items():
    print(f"{var}: {len(files)} files")

Backpr: 109 files
Extruder: 109 files
Flow_Rate: 109 files
Inject: 109 files
Nozzle: 109 files
Piston_Position: 109 files
Piston_Pressure: 109 files
Piston_Velocity: 109 files
Retract: 109 files
Volume: 109 files


In [5]:
import pandas as pd
import numpy as np
import glob
import os

# === 함수 정의 ===

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    if np.any(mask):
        return np.max(signal[mask])
    return np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    if np.any(mask):
        return np.mean(signal[mask])
    return np.nan

def compute_flat_ratio(time, signal, threshold=1e-6):
    grad = np.gradient(signal)
    flat_mask = np.abs(grad) < threshold
    return np.sum(flat_mask) / len(signal)

def compute_flat_ratio_after(time, signal, t_threshold, threshold=1e-6):
    mask = time > t_threshold
    if not np.any(mask): return np.nan
    grad = np.gradient(signal[mask])
    flat_mask = np.abs(grad) < threshold
    return np.sum(flat_mask) / len(grad)

def compute_flat_duration_after(time, signal, t_threshold=8.0, dt=None, threshold=1e-4):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    if dt is None:
        dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < threshold
    return np.sum(flat_mask) * dt

def compute_max_gradient_duration(time, signal, threshold=3.5e-6):
    grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    duration = np.sum(grad > threshold) * dt
    return duration

# === 시나리오 분류 규칙 ===

def rule_label(feats):
    v = feats.get("Volume_max", np.nan)

    if v < 48:
        if feats.get("Flow_Rate_flat_ratio", 0) > 0.8:
            return "Short shot - No flow"
        if feats.get("Piston_Pressure_max_t6", np.inf) < 0.99 * 5e6:
            return "Short shot - Low sustained pressure"
        if feats.get("Piston_Position_max", 0) < -0.01:
            return "Short shot - Piston undertravel"
        return "Short shot - Unknown origin"

    elif v > 52:
        if feats.get("Piston_Pressure_max_t6", 0) > 1.05 * 5.5e6:
            return "Overpacking / Flash - Excessive pressure"
        if feats.get("Piston_Position_max", 0) > 0.005:
            return "Overpacking / Flash - Over stroke"
        return "Overpacking / Flash - Unknown origin"

    else:  # 정상 충진 범위
        if feats.get("Flow_Rate_jetting_duration", 0) > 0.07:
            return "Jetting / Burn mark - Rapid surge"
        if feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.5:
            return "Sticking / Slip - Velocity stalled in post-holding phase"
        if feats.get("Flow_Rate_flat_ratio", 1.0) > 0.95 and feats.get("Piston_Pressure_mean_t8", 1e9) < 2e6:
            return "Voids / Air trap - Early flow stop, low pressure"
        return "Normal"

# === CSV 로드 및 feature + 시나리오 추출 ===

csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results"
variables = ["Volume", "Piston_Pressure", "Piston_Position", "Piston_Velocity", "Flow_Rate"]

# 사전 정렬된 selected_files 사용 시 이 부분은 주석처리 가능
all_files = glob.glob(os.path.join(csv_dir, "*.csv"))
selected_files = {}
for var in variables:
    matched = [f for f in all_files if var.lower() in os.path.basename(f).lower() and "RID014" in f]
    matched_sorted = sorted(matched)
    selected_files[var] = matched_sorted[:109]

rows = []

for i in range(109):
    row = {}
    for var in selected_files:
        df = pd.read_csv(selected_files[var][i], header=None)
        time = df[0].values
        signal = df[1].values

        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t8"] = compute_mean_after(time, signal, 8.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_ratio_t6"] = compute_flat_ratio_after(time, signal, 6.0)
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_result = pd.DataFrame(rows)
df_result.to_csv("scenario_results.csv", index=False)

KeyError: 'Value'

In [8]:
import pandas as pd
import numpy as np
import glob
import os

# 경로 설정
csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results/"
variables = ["Volume", "Piston_Pressure", "Flow_Rate", "Piston_Position", "Piston_Velocity"]

# === Feature 추출 함수 ===

def compute_flat_ratio(time, signal, threshold=1e-4):
    grad = np.gradient(signal)
    flat = np.abs(grad) < threshold
    return np.sum(flat) / len(flat)

def compute_flat_duration_after(time, signal, t_threshold=8.0):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < 1e-4
    return np.sum(flat_mask) * dt

def compute_max_gradient_duration(time, signal, threshold=3.5e-6):
    abs_grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    duration_above_th = np.sum(abs_grad > threshold) * dt
    return duration_above_th

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.max(signal[mask]) if np.any(mask) else np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.mean(signal[mask]) if np.any(mask) else np.nan

# === 이상 판정 함수 ===

def rule_label(feats):
    v = feats.get("Volume_max", np.inf)
    scenario = "Normal"

    if v < 48:
        if feats.get("Flow_Rate_flat_ratio", 0) > 0.8:
            scenario = "Short shot - No flow"
        elif feats.get("Piston_Pressure_max_t6", np.inf) < 0.99 * 5e6:
            scenario = "Short shot - Low sustained pressure"
        elif feats.get("Piston_Position_max", 0) < -0.01:
            scenario = "Short shot - Piston undertravel"
        else:
            scenario = "Short shot - Unknown origin"

    elif v > 52:
        if feats.get("Piston_Pressure_max_t6", 0) > 1.05 * 5.5e6:
            scenario = "Overpacking / Flash - Excessive pressure"
        elif feats.get("Piston_Position_max", 0) > 0.005:
            scenario = "Overpacking / Flash - Over stroke"
        else:
            scenario = "Overpacking / Flash - Unknown origin"

    elif feats.get("Flow_Rate_jetting_duration", 0) > 0.07:
        scenario = "Jetting - High flow surge"

    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.5:
        scenario = "Sticking / Slip - Velocity stalled in post-holding phase"

    elif feats.get("Volume_max", 0) > 48 and feats.get("Volume_max", 0) < 52:
        if feats.get("Piston_Pressure_mean_t6", 1e9) < 1.5e6:
            scenario = "Sink mark - Low holding pressure"

    elif feats.get("Flow_Rate_flat_ratio_t6", 0) > 0.95:
        scenario = "Voids / Air trap - Flow stopped during packing"

    return scenario

# === Feature 추출 및 시나리오 분류 ===

selected_files = {var: sorted(glob.glob(os.path.join(csv_dir, f"{var}_*RID014.csv"))) for var in variables}
num_cycles = len(selected_files["Volume"])

rows = []
for i in range(num_cycles):
    row = {"cycle_id": os.path.basename(selected_files["Volume"][i]).split("_", 1)[1].rsplit(".", 1)[0]}
    for var in variables:
        df = pd.read_csv(selected_files[var][i])
        time = df.iloc[:, 0].astype(float).values
        signal = df.iloc[:, 1].astype(float).values


        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t6"] = compute_mean_after(time, signal, 6.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)
            row["Piston_Position_min"] = np.min(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_scenarios = pd.DataFrame(rows)
import ace_tools as tools; tools.display_dataframe_to_user(name="이상 판정 결과 요약", dataframe=df_scenarios)


ModuleNotFoundError: No module named 'ace_tools'

In [10]:
import pandas as pd
import numpy as np
import os
import glob

# ----- 유틸 함수 정의 -----
def clean_time_column(col):
    return col.str.replace("초", "", regex=False).str.replace("s", "", regex=False).astype(float)

def compute_flat_ratio(time, signal):
    gradient = np.gradient(signal)
    return np.sum(np.abs(gradient) < 1e-4) / len(signal)

def compute_flat_ratio_after(time, signal, t_threshold):
    mask = time > t_threshold
    if not np.any(mask):
        return np.nan
    gradient = np.gradient(signal[mask])
    return np.sum(np.abs(gradient) < 1e-4) / len(gradient)

def compute_flat_duration_after(time, signal, t_threshold=8.0, dt=None):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    if dt is None:
        dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < 1e-4
    return np.sum(flat_mask) * dt

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.max(signal[mask]) if np.any(mask) else np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.mean(signal[mask]) if np.any(mask) else np.nan

def compute_max_gradient_duration(time, signal, threshold=3.5e-6):
    abs_grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    return np.sum(abs_grad > threshold) * dt

# ----- 이상 판정 룰 -----
def rule_label(feats):
    v = feats.get("Volume_max", np.nan)

    if v < 48:
        if feats.get("Flow_Rate_flat_ratio", 0) > 0.8:
            return "Short shot - No flow"
        elif feats.get("Piston_Pressure_max_t6", np.inf) < 0.99 * 5e6:
            return "Short shot - Low sustained pressure"
        elif feats.get("Piston_Position_max", np.inf) < -0.01:
            return "Short shot - Piston undertravel"
        else:
            return "Short shot - Unknown origin"

    elif v > 52:
        if feats.get("Piston_Pressure_max_t6", 0) > 1.05 * 5.5e6:
            return "Overpacking / Flash - Excessive pressure"
        elif feats.get("Piston_Position_max", 0) > 0.005:
            return "Overpacking / Flash - Over stroke"
        else:
            return "Overpacking / Flash - Unknown origin"

    elif feats.get("Piston_Velocity_flat_ratio_t6", 0) > 0.9:
        return "Sticking / Slip - Late velocity stall"
    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.5:
        return "Sticking / Slip - Velocity stalled in post-holding phase"

    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.5:
        return "Sink mark - Velocity stalled during packing end"

    elif feats.get("Flow_Rate_jetting_duration", 0) > 0.07:
        return "Jetting - High flow surge"

    return "Normal"

# ----- 파일 읽기 및 처리 -----
csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results/"
variables = ["Volume", "Piston_Pressure", "Piston_Position", "Piston_Velocity", "Flow_Rate"]
file_dict = {var: sorted(glob.glob(os.path.join(csv_dir, f"{var}_*.csv"))) for var in variables}
num_files = len(next(iter(file_dict.values())))

rows = []
for i in range(num_files):
    row = {}
    for var in variables:
        df = pd.read_csv(file_dict[var][i])
        time_raw = df.iloc[:, 0].astype(str)
        time = clean_time_column(time_raw)
        signal = df.iloc[:, 1].astype(float).values

        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t6"] = compute_mean_after(time, signal, 6.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)
            row["Piston_Position_min"] = np.min(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_ratio_t6"] = compute_flat_ratio_after(time, signal, 6.0)
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_scenarios = pd.DataFrame(rows)
df_scenarios.to_csv("classified_scenarios.csv", index=False)
print(df_scenarios.head(10))
print("✅ classified_scenarios.csv 저장 완료")


   Volume_max  Piston_Pressure_max_t6  Piston_Pressure_mean_t6  \
0   49.635846            5.000309e+06             1.190352e+06   
1   49.635846            5.000309e+06             1.190352e+06   
2   49.635846            5.000309e+06             1.190352e+06   
3   49.635846            5.000309e+06             1.190352e+06   
4   49.635846            5.000309e+06             1.190352e+06   
5   49.635846            5.000309e+06             1.190352e+06   
6   49.635846            5.000309e+06             1.190352e+06   
7   49.635846            5.000309e+06             1.190352e+06   
8   49.635846            5.000309e+06             1.190352e+06   
9   49.635846            5.000309e+06             1.190352e+06   

   Piston_Position_max  Piston_Position_min  Piston_Velocity_flat_ratio_t6  \
0            -0.000244            -0.051932                         0.9625   
1            -0.000244            -0.051932                         0.9625   
2            -0.000244            -0.05

In [11]:
import pandas as pd

# CSV 로드
df = pd.read_csv("/mnt/data/classified_scenarios.csv")

# 수정된 기준으로 Volume 범주 지정
def classify_volume_range(v):
    if v < 47:
        return "Under 47"
    elif v <= 53:
        return "Between 47–53"
    else:
        return "Over 53"

df["volume_range_adj"] = df["Volume_max"].apply(classify_volume_range)

# 시나리오도 연동해서 재분류
def adjust_scenario(row):
    v = row["Volume_max"]
    if v < 47:
        if row["Flow_Rate_flat_ratio"] > 0.8:
            return "Short shot - No flow"
        elif row["Piston_Pressure_max_t6"] < 0.99 * 5e6:
            return "Short shot - Low sustained pressure"
        elif row["Piston_Position_max"] < -0.01:
            return "Short shot - Piston undertravel"
        else:
            return "Short shot - Unknown origin"
    elif v > 53:
        if row["Piston_Pressure_max_t6"] > 1.05 * 5.5e6:
            return "Overpacking / Flash - Excessive pressure"
        elif row["Piston_Position_max"] > 0.005:
            return "Overpacking / Flash - Over stroke"
        else:
            return "Overpacking / Flash - Unknown origin"
    else:
        return row["scenario"]  # 기존 시나리오 유지 (중립 구간은 나머지 판정 기준 사용)

df["scenario"] = df.apply(adjust_scenario, axis=1)

# 새로운 CSV 저장
output_path = "/mnt/data/classified_scenarios_02.csv"
df.to_csv(output_path, index=False)
output_path

FileNotFoundError: [Errno 2] No such file or directory: '/mnt/data/classified_scenarios.csv'

In [14]:
import pandas as pd
import numpy as np
import os
import glob

# ----- 유틸 함수 정의 -----
def clean_time_column(col):
    return col.str.replace("초", "", regex=False).str.replace("s", "", regex=False).astype(float)

def compute_flat_ratio(time, signal):
    gradient = np.gradient(signal)
    return np.sum(np.abs(gradient) < 1e-4) / len(signal)

def compute_flat_ratio_after(time, signal, t_threshold):
    mask = time > t_threshold
    if not np.any(mask):
        return np.nan
    gradient = np.gradient(signal[mask])
    return np.sum(np.abs(gradient) < 1e-4) / len(gradient)

def compute_flat_duration_after(time, signal, t_threshold=8.0, dt=None):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    if dt is None:
        dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < 1e-4
    return np.sum(flat_mask) * dt

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.max(signal[mask]) if np.any(mask) else np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.mean(signal[mask]) if np.any(mask) else np.nan

def compute_max_gradient_duration(time, signal, threshold=3.5e-6):
    abs_grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    return np.sum(abs_grad > threshold) * dt

# ----- 이상 판정 룰 -----
def rule_label(feats):
    v = feats.get("Volume_max", np.nan)

    if v < 47:
        if feats.get("Flow_Rate_flat_ratio", 0) > 0.8:
            return "Short shot - No flow"
        elif feats.get("Piston_Pressure_max_t6", np.inf) < 0.99 * 5e6:
            return "Short shot - Low sustained pressure"
        elif feats.get("Piston_Position_max", np.inf) < -0.01:
            return "Short shot - Piston undertravel"
        else:
            return "Short shot - Unknown origin"

    elif v > 53:
        if feats.get("Piston_Pressure_max_t6", 0) > 1.05 * 5.5e6:
            return "Overpacking / Flash - Excessive pressure"
        elif feats.get("Piston_Position_max", 0) > 0.005:
            return "Overpacking / Flash - Over stroke"
        else:
            return "Overpacking / Flash - Unknown origin"

    elif feats.get("Piston_Velocity_flat_ratio_t6", 0) < 0.9:
        return "Sticking / Slip - Unstable motion after holding start"

    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.95:
        return "Sink mark - Velocity stalled during packing end"

    elif feats.get("Flow_Rate_jetting_duration", 0) > 0.07:
        return "Jetting - High flow surge"

    return "Normal"

# ----- 파일 읽기 및 처리 -----
csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results/"
variables = ["Volume", "Piston_Pressure", "Piston_Position", "Piston_Velocity", "Flow_Rate"]
file_dict = {var: sorted(glob.glob(os.path.join(csv_dir, f"{var}_*.csv"))) for var in variables}
num_files = len(next(iter(file_dict.values())))

rows = []
for i in range(num_files):
    row = {}
    for var in variables:
        df = pd.read_csv(file_dict[var][i])
        time_raw = df.iloc[:, 0].astype(str)
        time = clean_time_column(time_raw)
        signal = df.iloc[:, 1].astype(float).values

        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t6"] = compute_mean_after(time, signal, 6.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)
            row["Piston_Position_min"] = np.min(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_ratio_t6"] = compute_flat_ratio_after(time, signal, 6.0)
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_scenarios = pd.DataFrame(rows)
df_scenarios.to_csv("classified_scenarios_004.csv", index=False)
print(df_scenarios.head(10))
print("✅ classified_scenarios_004.csv 저장 완료")

   Volume_max  Piston_Pressure_max_t6  Piston_Pressure_mean_t6  \
0   49.635846            5.000309e+06             1.190352e+06   
1   49.635846            5.000309e+06             1.190352e+06   
2   49.635846            5.000309e+06             1.190352e+06   
3   49.635846            5.000309e+06             1.190352e+06   
4   49.635846            5.000309e+06             1.190352e+06   
5   49.635846            5.000309e+06             1.190352e+06   
6   49.635846            5.000309e+06             1.190352e+06   
7   49.635846            5.000309e+06             1.190352e+06   
8   49.635846            5.000309e+06             1.190352e+06   
9   49.635846            5.000309e+06             1.190352e+06   

   Piston_Position_max  Piston_Position_min  Piston_Velocity_flat_ratio_t6  \
0            -0.000244            -0.051932                         0.9625   
1            -0.000244            -0.051932                         0.9625   
2            -0.000244            -0.05

In [22]:
import pandas as pd
import numpy as np
import os
import glob

# ----- 유틸 함수 정의 -----
def clean_time_column(col):
    return col.str.replace("초", "", regex=False).str.replace("s", "", regex=False).astype(float)

def compute_flat_ratio(time, signal):
    gradient = np.gradient(signal)
    return np.sum(np.abs(gradient) < 1e-4) / len(signal)

def compute_flat_ratio_after(time, signal, t_threshold):
    mask = time > t_threshold
    if not np.any(mask):
        return np.nan
    gradient = np.gradient(signal[mask])
    return np.sum(np.abs(gradient) < 1e-4) / len(gradient)

def compute_flat_duration_after(time, signal, t_threshold=8.0, dt=None):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    if dt is None:
        dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < 1e-4
    return np.sum(flat_mask) * dt

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.max(signal[mask]) if np.any(mask) else np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.mean(signal[mask]) if np.any(mask) else np.nan

def compute_max_gradient_duration(time, signal, threshold=1e-5):
    abs_grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    return np.sum(abs_grad > threshold) * dt

# ----- 이상 판정 룰 -----
def rule_label(feats):
    v = feats.get("Volume_max", np.nan)

    if v < 45:
        if feats.get("Flow_Rate_flat_ratio", 0) > 0.8:
            return "Short shot - No flow"
        elif feats.get("Piston_Pressure_max_t6", np.inf) < 0.99 * 5e6:
            return "Short shot - Low sustained pressure"
        elif feats.get("Piston_Position_max", np.inf) < -0.01:
            return "Short shot - Piston undertravel"
        else:
            return "Short shot - Unknown origin"

    elif v > 55:
        if feats.get("Piston_Pressure_max_t6", 0) > 1.05 * 5.5e6:
            return "Overpacking / Flash - Excessive pressure"
        elif feats.get("Piston_Position_max", 0) > 0.005:
            return "Overpacking / Flash - Over stroke"
        else:
            return "Overpacking / Flash - Unknown origin"

    elif feats.get("Piston_Velocity_flat_ratio_t6", 0) < 0.9:
        return "Sticking / Slip - Unstable motion after holding start"

    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.95:
        return "Sink mark - Velocity stalled during packing end"

    elif feats.get("Flow_Rate_jetting_duration", 0) > 0.6:
        return "Jetting - High flow surge"

    return "Normal"

# ----- 파일 읽기 및 처리 -----
csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results/"
variables = ["Volume", "Piston_Pressure", "Piston_Position", "Piston_Velocity", "Flow_Rate"]
file_dict = {var: sorted(glob.glob(os.path.join(csv_dir, f"{var}_*.csv"))) for var in variables}
num_files = len(next(iter(file_dict.values())))

rows = []
for i in range(num_files):
    row = {}
    for var in variables:
        df = pd.read_csv(file_dict[var][i])
        time_raw = df.iloc[:, 0].astype(str)
        time = clean_time_column(time_raw)
        signal = df.iloc[:, 1].astype(float).values

        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t6"] = compute_mean_after(time, signal, 6.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)
            row["Piston_Position_min"] = np.min(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_ratio_t6"] = compute_flat_ratio_after(time, signal, 6.0)
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_scenarios = pd.DataFrame(rows)
df_scenarios.to_csv("classified_scenarios_011.csv", index=False)
print(df_scenarios.head(10))
print("✅ classified_scenarios_011.csv 저장 완료")

   Volume_max  Piston_Pressure_max_t6  Piston_Pressure_mean_t6  \
0   49.635846            5.000309e+06             1.190352e+06   
1   49.635846            5.000309e+06             1.190352e+06   
2   49.635846            5.000309e+06             1.190352e+06   
3   49.635846            5.000309e+06             1.190352e+06   
4   49.635846            5.000309e+06             1.190352e+06   
5   49.635846            5.000309e+06             1.190352e+06   
6   49.635846            5.000309e+06             1.190352e+06   
7   49.635846            5.000309e+06             1.190352e+06   
8   49.635846            5.000309e+06             1.190352e+06   
9   49.635846            5.000309e+06             1.190352e+06   

   Piston_Position_max  Piston_Position_min  Piston_Velocity_flat_ratio_t6  \
0            -0.000244            -0.051932                         0.9625   
1            -0.000244            -0.051932                         0.9625   
2            -0.000244            -0.05

In [23]:
import pandas as pd
import numpy as np
import os
import glob

# ----- 유틸 함수 정의 -----
def clean_time_column(col):
    return col.str.replace("초", "", regex=False).str.replace("s", "", regex=False).astype(float)

def compute_flat_ratio(time, signal):
    gradient = np.gradient(signal)
    return np.sum(np.abs(gradient) < 1e-4) / len(signal)

def compute_flat_ratio_after(time, signal, t_threshold):
    mask = time > t_threshold
    if not np.any(mask):
        return np.nan
    gradient = np.gradient(signal[mask])
    return np.sum(np.abs(gradient) < 1e-4) / len(gradient)

def compute_flat_duration_after(time, signal, t_threshold=8.0, dt=None):
    mask = time > t_threshold
    values = signal[mask]
    if len(values) == 0:
        return np.nan
    if dt is None:
        dt = np.mean(np.diff(time))
    gradient = np.gradient(values)
    flat_mask = np.abs(gradient) < 1e-4
    return np.sum(flat_mask) * dt

def compute_max_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.max(signal[mask]) if np.any(mask) else np.nan

def compute_mean_after(time, signal, t_threshold):
    mask = time > t_threshold
    return np.mean(signal[mask]) if np.any(mask) else np.nan

def compute_max_gradient_duration(time, signal, threshold=1e-5):
    abs_grad = np.abs(np.gradient(np.abs(signal)))
    dt = np.mean(np.diff(time))
    return np.sum(abs_grad > threshold) * dt

# ----- 이상 판정 룰 -----
def rule_label(feats):
    v = feats.get("Volume_max", np.nan)

    if v < 45:
        return "Short shot"

    elif v > 55:
        return "Overpacking / Flash"
        
    elif feats.get("Piston_Velocity_flat_ratio_t6", 0) < 0.9:
        return "Sticking / Slip - Late velocity stall"

    elif feats.get("Piston_Velocity_flat_duration_t8", 0) > 1.95:
        return "Sink mark - Velocity stalled during packing end"

    elif feats.get("Flow_Rate_jetting_duration", 0) > 0.6:
        return "Jetting - High flow surge"

    return "Normal"

# ----- 파일 읽기 및 처리 -----
csv_dir = "/mnt/c/Users/Admin/MATLAB/Projects/my_project/csv_results/"
variables = ["Volume", "Piston_Pressure", "Piston_Position", "Piston_Velocity", "Flow_Rate"]
file_dict = {var: sorted(glob.glob(os.path.join(csv_dir, f"{var}_*.csv"))) for var in variables}
num_files = len(next(iter(file_dict.values())))

rows = []
for i in range(num_files):
    row = {}
    for var in variables:
        df = pd.read_csv(file_dict[var][i])
        time_raw = df.iloc[:, 0].astype(str)
        time = clean_time_column(time_raw)
        signal = df.iloc[:, 1].astype(float).values

        if var == "Volume":
            row["Volume_max"] = np.max(signal)

        elif var == "Piston_Pressure":
            row["Piston_Pressure_max_t6"] = compute_max_after(time, signal, 6.0)
            row["Piston_Pressure_mean_t6"] = compute_mean_after(time, signal, 6.0)

        elif var == "Piston_Position":
            row["Piston_Position_max"] = np.max(signal)
            row["Piston_Position_min"] = np.min(signal)

        elif var == "Piston_Velocity":
            row["Piston_Velocity_flat_ratio_t6"] = compute_flat_ratio_after(time, signal, 6.0)
            row["Piston_Velocity_flat_duration_t8"] = compute_flat_duration_after(time, signal, 8.0)

        elif var == "Flow_Rate":
            row["Flow_Rate_flat_ratio"] = compute_flat_ratio(time, signal)
            row["Flow_Rate_jetting_duration"] = compute_max_gradient_duration(time, signal)

    row["scenario"] = rule_label(row)
    rows.append(row)

df_scenarios = pd.DataFrame(rows)
df_scenarios.to_csv("classified_scenarios_012.csv", index=False)
print(df_scenarios.head(10))
print("✅ classified_scenarios_012.csv 저장 완료")

   Volume_max  Piston_Pressure_max_t6  Piston_Pressure_mean_t6  \
0   49.635846            5.000309e+06             1.190352e+06   
1   49.635846            5.000309e+06             1.190352e+06   
2   49.635846            5.000309e+06             1.190352e+06   
3   49.635846            5.000309e+06             1.190352e+06   
4   49.635846            5.000309e+06             1.190352e+06   
5   49.635846            5.000309e+06             1.190352e+06   
6   49.635846            5.000309e+06             1.190352e+06   
7   49.635846            5.000309e+06             1.190352e+06   
8   49.635846            5.000309e+06             1.190352e+06   
9   49.635846            5.000309e+06             1.190352e+06   

   Piston_Position_max  Piston_Position_min  Piston_Velocity_flat_ratio_t6  \
0            -0.000244            -0.051932                         0.9625   
1            -0.000244            -0.051932                         0.9625   
2            -0.000244            -0.05