In [9]:
import math
import numpy as np
import pandas as pd
import math
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report

In [10]:
def get_amp_phase(data, position, width, is_calibrated=False, calibration=None, lookup_size=100):
    """
    Calculate the amplitude and phase of the Lissajous curve.

    :param data: List of tuples [(x1, y1), (x2, y2), ...] representing the data points.
    :param position: Central position in the data to analyze.
    :param width: Window width around the position.
    :param is_calibrated: Whether to apply calibration to amplitude.
    :param calibration: Dictionary with calibration factors {'voltage': float, 'amplitude': float}.
    :param lookup_size: Size of the lookup range for filtering distant points.
    :return: Tuple (amplitude, phase) where amplitude is the distance and phase is in degrees.
    """
    n_points = len(data)
    p1 = max(0, position - width // 2)
    p2 = min(n_points - 1, position + width // 2)

    # Initialize indices and min/max values
    A = B = C = D = p1
    min_x = max_x = data[p1][0]
    min_y = max_y = data[p1][1]

    # Find extreme points based on X values
    for i in range(p1 + 1, p2 + 1):
        if data[i][0] < min_x:
            min_x = data[i][0]
            A = i
        if data[i][0] > max_x:
            max_x = data[i][0]
            B = i

    # Find extreme points based on Y values
    for i in range(p1 + 1, p2 + 1):
        if data[i][1] < min_y:
            min_y = data[i][1]
            C = i
        if data[i][1] > max_y:
            max_y = data[i][1]
            D = i

    # Adjust ranges for lookup
    if C < A:
        A, C = C, A
    if D < B:
        B, D = D, B

    if (C - A) > lookup_size:
        tA = (A + C - lookup_size) // 2
        tC = (A + C + lookup_size) // 2
        A, C = tA, tC
    if (D - B) > lookup_size:
        tB = (B + D - lookup_size) // 2
        tD = (B + D + lookup_size) // 2
        B, D = tB, tD

    # Find the most distant points
    max_dist = 0
    q1 = q2 = None
    for s1 in range(A, C + 1):
        for s2 in range(B, D + 1):
            dx = data[s1][0] - data[s2][0]
            dy = data[s1][1] - data[s2][1]
            dist = dx**2 + dy**2
            if dist > max_dist:
                max_dist = dist
                q1, q2 = s1, s2

    # Ensure indices are valid
    q1 = min(q1, n_points - 1)
    q2 = min(q2, n_points - 1)

    # Swap if necessary to maintain order
    if q1 < q2:
        q1, q2 = q2, q1

    # Calculate amplitude and phase
    x1, y1 = data[q1]
    x2, y2 = data[q2]
    amplitude = math.sqrt((x2 - x1)**2 + (y2 - y1)**2)
    if is_calibrated and calibration:
        amplitude *= calibration['voltage'] / calibration['amplitude']

    dx = x1 - x2
    dy = y2 - y1
    phase = math.degrees(math.atan2(dy, dx))
    phase = phase % 360  # Normalize phase to [0, 360)

    return amplitude, phase

def extract_features(slice_df):
    """
    Extract amplitude and phase features for each slice of data.
    :param slice_df: DataFrame containing x and y data points (columns x0 and x1).
    :return: Feature vector containing amplitude and phase.
    """
    # Prepare data as a list of tuples [(x, y), ...]
    data = list(zip(slice_df["0"], slice_df["1"]))

    # Compute amplitude and phase around the center of the slice
    position = len(data) // 2  # Use center of the slice
    width = len(data)         # Use the full slice length as the window
    amplitude, phase = get_amp_phase(data, position, width)

    return np.array([amplitude, phase])

In [11]:
df = pd.read_csv("saved/train_data/data_with_defects.csv")
print(df.columns)

Index(['0_X', '0_Y', '1_X', '1_Y', '2_X', '2_Y', '3_X', '3_Y', 'slice_number',
       'phase_0', 'phase_1', 'phase_2', 'phase_3'],
      dtype='object')


In [14]:
first_pair_df = df[['0_X', '0_Y', 'slice_number', 'phase_0']]
print(first_pair_df)
print(df['slice_number'].unique())

           0_X       0_Y  slice_number  phase_0
0    -6.918437  2.223775           NaN      NaN
1    -6.908662  2.203251           NaN      NaN
2    -6.930338  2.226994           NaN      NaN
3    -6.918437  2.223775           NaN      NaN
4    -6.925968  2.205377           NaN      NaN
...        ...       ...           ...      ...
5047 -0.860414 -0.140208           NaN      NaN
5048 -0.856103 -0.133712           NaN      NaN
5049 -0.853917 -0.144520           NaN      NaN
5050 -0.870130 -0.147798           NaN      NaN
5051 -0.869037 -0.153202           NaN      NaN

[5052 rows x 4 columns]
[nan -1.  3.  4.  1.  2.]


In [15]:
first_pair_df['target'] = np.where(first_pair_df['slice_number'].isna(), 0, 1)
print(first_pair_df)
print(first_pair_df['target'].unique())
print(len(first_pair_df[first_pair_df['target']==1]))

           0_X       0_Y  slice_number  phase_0  target
0    -6.918437  2.223775           NaN      NaN       0
1    -6.908662  2.203251           NaN      NaN       0
2    -6.930338  2.226994           NaN      NaN       0
3    -6.918437  2.223775           NaN      NaN       0
4    -6.925968  2.205377           NaN      NaN       0
...        ...       ...           ...      ...     ...
5047 -0.860414 -0.140208           NaN      NaN       0
5048 -0.856103 -0.133712           NaN      NaN       0
5049 -0.853917 -0.144520           NaN      NaN       0
5050 -0.870130 -0.147798           NaN      NaN       0
5051 -0.869037 -0.153202           NaN      NaN       0

[5052 rows x 5 columns]
[0 1]
180


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  first_pair_df['target'] = np.where(first_pair_df['slice_number'].isna(), 0, 1)


In [17]:
X = first_pair_df[['0_X', '0_Y']]
y = first_pair_df['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

rf = GradientBoostingClassifier(n_estimators=300, max_depth=5, random_state=42)
rf.fit(X_train, y_train)

y_pred = rf.predict(X_test)
print("Random Forest Test Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:")
print(classification_report(y_test, y_pred))

Random Forest Test Accuracy: 0.973293768545994
Classification Report:
              precision    recall  f1-score   support

           0       0.98      0.99      0.99       974
           1       0.73      0.43      0.54        37

    accuracy                           0.97      1011
   macro avg       0.85      0.71      0.76      1011
weighted avg       0.97      0.97      0.97      1011



In [18]:
y_pred_probs = rf.predict_proba(X_test)[:, 1]
y_pred = (y_pred_probs > 0.15).astype(int)
print("Test Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:")
print(classification_report(y_test, y_pred))

Test Accuracy: 0.9614243323442137
Classification Report:
              precision    recall  f1-score   support

           0       0.98      0.98      0.98       974
           1       0.47      0.46      0.47        37

    accuracy                           0.96      1011
   macro avg       0.73      0.72      0.72      1011
weighted avg       0.96      0.96      0.96      1011



In [24]:

# 1. Load your new data
new_df = pd.read_csv("saved/test/test2.csv")


# 3. If your model was simply trained on columns [0, 1], do:
X_new = new_df[['0_X', '0_Y']]

# 5. If you want probabilities (for example, to apply a threshold), use:
y_new_pred_proba = rf.predict_proba(X_new)[:, 1]

# Add 'defect_proba_1' column to the DataFrame
new_df['defect_proba_1'] = y_new_pred_proba

# Save the updated DataFrame back to the same file
new_df.to_csv("saved/test_gboost.csv", index=False)