## XGboost

In [None]:
import os
import numpy as np
import pandas as pd
from scipy.signal import find_peaks
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
import joblib

# Import XGBoost's sklearn API
from xgboost import XGBClassifier

def extract_features_from_csv(csv_path):
    """
    Reads a CSV with columns: [Time (s), Frame Length, Direction, Interarrival]
    Returns a dictionary of features.
    """
    df = pd.read_csv(csv_path)
    df.sort_values(by='Time (s)', inplace=True)

    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia
        #'num_peaks_ia': num_peaks_ia

        #'duration': duration
    }

    return features

def main():
    # Folders containing your Class A/B CSV files
    #classA_dir = 'classA/csv_a'
    #classB_dir = 'classB/csv_b'
    classA_dir = "tst/csv/a_o" # CSV files for Class A
    classB_dir = "tst/csv/b_o"  # CSV files for Class B

    X = []
    y = []

    # Load Class A
    for filename in os.listdir(classA_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classA_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(0)  # label=0 for Class A

    # Load Class B
    for filename in os.listdir(classB_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classB_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(1)  # label=1 for Class B

    # Convert feature dicts to a DataFrame
    df_features = pd.DataFrame(X)
    df_features['label'] = y

    # Separate features and labels
    feature_cols = [c for c in df_features.columns if c != 'label']
    X_data = df_features[feature_cols].values
    y_data = df_features['label'].values

    # Check if you have at least 2 samples per class
    # If not, consider removing stratify or adjusting test_size
    # For demonstration, we do a normal train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X_data, y_data, test_size=0.3, random_state=42, stratify=y_data
    )

    # Create an XGBoost classifier
    # You can tune hyperparameters like n_estimators, learning_rate, max_depth, etc.
    xgb_clf = XGBClassifier(
        n_estimators=30,
        learning_rate=0.01,
        max_depth=3,
        random_state=43,
        use_label_encoder=False,
        eval_metric='logloss'  # needed in newer XGBoost versions
    )

    # Train the model
    xgb_clf.fit(X_train, y_train)

    # Evaluate on test set
    y_pred = xgb_clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy (XGBoost): {acc:.3f}")

    # Cross-validation for a more robust estimate
    from sklearn.model_selection import cross_val_score
    scores = cross_val_score(xgb_clf, X_data, y_data, cv=5)
    print(f"5-Fold CV Accuracy: {scores.mean():.3f} ± {scores.std():.3f}")

    # Save the trained XGBoost model + feature columns
    joblib.dump((xgb_clf, feature_cols), 'xgb_model_no.pkl')
    print("Trained XGBoost model saved as 'xgb_model_no.pkl'")

if __name__ == '__main__':
    main()


### Test result of XGBoost

In [None]:
import os
import pandas as pd
import numpy as np
import joblib
from scipy.signal import find_peaks

def extract_features_from_csv(csv_path):
    df = pd.read_csv(csv_path)
    df.sort_values(by='Time (s)', inplace=True)

    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia
        #'num_peaks_ia': num_peaks_ia

        #'duration': duration
    }

    return features

def main():
    # Instead of a single CSV file, we will process all CSVs in this folder:
    #folder_name = "classA/testA"  # <-- Change to your folder path containing test CSVs
    #folder_name = "classB/testB"
    #folder_name = "tst/csv"
    folder_name = "Test/csv_all"
    # 1. Load the trained XGBoost model + feature columns
    xgb_clf, feature_cols = joblib.load('xgb_model_no.pkl')

    # Loop over all CSV files in the specified folder
    for filename in os.listdir(folder_name):
        if filename.endswith('.csv'):
            csv_file = os.path.join(folder_name, filename)

            # 2. Extract features from this CSV
            feat_dict = extract_features_from_csv(csv_file)

            # 3. Arrange features in the same order as training
            feat_values = [feat_dict[col] for col in feature_cols]
            X_new = np.array([feat_values])  # shape (1, num_features)

            # 4. Predict
            predicted_class = xgb_clf.predict(X_new)[0]

            # Print the result
            if predicted_class == 0:
                print(f"{filename} => Class A")
            else:
                print(f"{filename} => Class B")

if __name__ == '__main__':
    main()


## Random Forest

In [None]:
import os
import numpy as np
import pandas as pd
from scipy.signal import find_peaks

# Scikit-learn imports
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
import joblib

def extract_features_from_csv(csv_path):
    """
    Reads a CSV with columns: [Time (s), Frame Length, Direction, Interarrival]
    Returns a dictionary of features.
    """
    df = pd.read_csv(csv_path)

    # Sort by time column (adjust if your CSV has a different time column name)
    df.sort_values(by='Time (s)', inplace=True)

    # Extract numeric arrays
    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    # Frame Length features
    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    # Direction features
    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    # Interarrival features
    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    # Duration
    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia,
        #'num_peaks_ia': num_peaks_ia

        #'duration': duration
    }

    return features

def main():
    # Paths to your training data
    #classA_dir = 'classA/csv_a'  # Folder containing CSVs for Class A
    #classB_dir = 'classB/csv_b'  # Folder containing CSVs for Class B
    classA_dir = "tst/csv/a_o" # CSV files for Class A
    classB_dir = "tst/csv/b_o"  # CSV files for Class B

    X = []
    y = []

    # Load Class A
    for filename in os.listdir(classA_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classA_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(0)  # Label 0 for Class A

    # Load Class B
    for filename in os.listdir(classB_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classB_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(1)  # Label 1 for Class B

    # Convert list of feature dicts to a DataFrame
    df_features = pd.DataFrame(X)
    df_features['label'] = y

    # Separate features and labels
    feature_cols = [c for c in df_features.columns if c != 'label']
    X_data = df_features[feature_cols].values
    y_data = df_features['label'].values

    # Train-test split (70% train, 30% test)
    X_train, X_test, y_train, y_test = train_test_split(
        X_data, y_data, test_size=0.3, random_state=42
    )

    # Create a Random Forest
    clf = RandomForestClassifier(n_estimators=60, random_state=42)
    clf.fit(X_train, y_train)

    # Evaluate on the test set
    y_pred = clf.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy (Random Forest): {acc:.3f}")

    # Cross-validation for a more robust estimate
    scores = cross_val_score(clf, X_data, y_data, cv=5) #5
    print(f"5-Fold CV Accuracy: {scores.mean():.3f} ± {scores.std():.3f}")

    # Save the trained model
    joblib.dump((clf, feature_cols), 'rf_model_nos.pkl')
    print("Trained Random Forest model saved as 'rf_model_no.pkl'")

if __name__ == '__main__':
    main()


### Test result of Random forest

In [None]:
import os
import numpy as np
import pandas as pd
import joblib
from scipy.signal import find_peaks

def extract_features_from_csv(csv_path):
    df = pd.read_csv(csv_path)
    df.sort_values(by='Time (s)', inplace=True)

    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia
        #'num_peaks_ia': num_peaks_ia

       # 'duration': duration
    }

    return features

def main():
    # Instead of a single CSV, we'll process all CSVs in this folder:
    #folder_name = "classA/testA"  # <-- Change this to your folder path
    #folder_name = "classB/testB" 
    folder_name = "Test/csv_all"

    # 1. Load the trained Random Forest model and feature columns
    clf, feature_cols = joblib.load('rf_model_nos.pkl')

    # Loop over all CSV files in the specified folder
    for filename in os.listdir(folder_name):
        if filename.endswith('.csv'):
            csv_file = os.path.join(folder_name, filename)

            # 2. Extract features from the CSV
            feat_dict = extract_features_from_csv(csv_file)

            # 3. Arrange feature values in the correct order
            feat_values = [feat_dict[col] for col in feature_cols]
            X_new = np.array([feat_values])  # shape (1, num_features)

            # 4. Predict
            predicted_class = clf.predict(X_new)[0]

            # Print result
            if predicted_class == 0:
                print(f"{filename} => Class A")
            else:
                print(f"{filename} => Class B")

if __name__ == '__main__':
    main()


## SVM Model

In [None]:
import os
import numpy as np
import pandas as pd
from scipy.signal import find_peaks

# Scikit-learn imports
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import accuracy_score
from sklearn.pipeline import Pipeline

# For saving the trained model
import joblib

def extract_features_from_csv(csv_path):
    """
    Reads a CSV with columns: [Time (s), Frame Length, Direction, Interarrival]
    Returns a dictionary of features.
    """
    df = pd.read_csv(csv_path)
    df.sort_values(by='Time (s)', inplace=True)

    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    # Frame Length features
    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    # Direction features
    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    # Interarrival features
    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    # Duration
    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia
        #'num_peaks_ia': num_peaks_ia

        #'duration': duration
    }

    return features

def main():
    # Paths to your training data
    #classA_dir = 'classA/csv_a'  # CSV files for Class A
    #classB_dir = 'classB/csv_b'  # CSV files for Class B
    classA_dir = "tst/csv/a_o" # CSV files for Class A
    classB_dir = "tst/csv/b_o"  # CSV files for Class B

    X = []
    y = []

    # Load Class A
    for filename in os.listdir(classA_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classA_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(0)  # label 0 for Class A

    # Load Class B
    for filename in os.listdir(classB_dir):
        if filename.endswith('.csv'):
            csv_path = os.path.join(classB_dir, filename)
            feat_dict = extract_features_from_csv(csv_path)
            X.append(feat_dict)
            y.append(1)  # label 1 for Class B

    # Convert to DataFrame
    df_features = pd.DataFrame(X)
    df_features['label'] = y

    # Split into features & labels
    feature_cols = [c for c in df_features.columns if c != 'label']
    X_data = df_features[feature_cols].values
    y_data = df_features['label'].values

    # Train-test split
    X_train, X_test, y_train, y_test = train_test_split(
        X_data, y_data, test_size=0.3, random_state=42, stratify=y_data
    )

    # Create a pipeline: scaling + SVM
    pipe = Pipeline([
        ('scaler', StandardScaler()),
        ('svm', SVC(kernel='rbf', random_state=42))
    ])

    # Fit the pipeline
    pipe.fit(X_train, y_train)

    # Evaluate on test set
    y_pred = pipe.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    print(f"Test Accuracy: {acc:.3f}")

    # Optional cross-validation
    scores = cross_val_score(pipe, X_data, y_data, cv=5) #5
    print(f"5-Fold CV Accuracy: {scores.mean():.3f} ± {scores.std():.3f}")

    # Save the trained pipeline to a file (model.pkl)
    joblib.dump(pipe, 'model_svm_no.pkl')
    print("Trained model saved as model_svm_no.pkl")

if __name__ == '__main__':
    main()


### Test result of SVM

In [None]:
import os
import pandas as pd
import joblib
import numpy as np
from scipy.signal import find_peaks

def extract_features_from_csv(csv_path):
    df = pd.read_csv(csv_path)
    df.sort_values(by='Time (s)', inplace=True)

    frame_vals = df['Frame Length'].values
    direction_vals = df['Direction'].values
    interarrival_vals = df['Interarrival'].values

    mean_frame = np.mean(frame_vals)
    std_frame = np.std(frame_vals)
    max_frame = np.max(frame_vals)
    min_frame = np.min(frame_vals)
    peaks_frame, _ = find_peaks(frame_vals)
    num_peaks_frame = len(peaks_frame)

    avg_direction = np.mean(direction_vals)
    proportion_uplink = (direction_vals > 0).mean()
    proportion_downlink = (direction_vals < 0).mean()

    mean_ia = np.mean(interarrival_vals)
    std_ia = np.std(interarrival_vals)
    max_ia = np.max(interarrival_vals)
    min_ia = np.min(interarrival_vals)
    peaks_ia, _ = find_peaks(interarrival_vals)
    num_peaks_ia = len(peaks_ia)

    #duration = df['Time (s)'].iloc[-1] - df['Time (s)'].iloc[0]

    features = {
        'mean_frame': mean_frame,
        'std_frame': std_frame,
        #'max_frame': max_frame,
        #'min_frame': min_frame,
        #'num_peaks_frame': num_peaks_frame,

        'avg_direction': avg_direction,
        'proportion_uplink': proportion_uplink,
        'proportion_downlink': proportion_downlink,

        'mean_ia': mean_ia,
        'std_ia': std_ia,
        'max_ia': max_ia,
        'min_ia': min_ia
        #'num_peaks_ia': num_peaks_ia,

        #'duration': duration
    }

    return features

def main():
    # Folder containing new CSV files to predict
    #folder_name = "classA/testA"  # <-- Replace with your actual folder name
    #folder_name = "classB/testB" 
    #folder_name = "tst/csv" 
    #folder_name = "tst/csv/t"
    folder_name = "Test/csv_all"

    # 1. Load the saved model (pipeline)
    model = joblib.load('model_svm_no.pkl')

    # 2. Loop over all CSV files in that folder
    for filename in os.listdir(folder_name):
        if filename.endswith(".csv"):
            csv_path = os.path.join(folder_name, filename)

            # Extract features
            feat_dict = extract_features_from_csv(csv_path)

            # Convert dict to the same order of columns used in training
            feature_cols = [
                'mean_frame', 'std_frame',
                'avg_direction', 'proportion_uplink', 'proportion_downlink',
                'mean_ia', 'std_ia', 'max_ia', 'min_ia'
            ]
            feat_values = [feat_dict[col] for col in feature_cols]
            X_new = np.array([feat_values])  # shape (1, 14) if you have 14 features

            # Predict
            predicted_class = model.predict(X_new)[0]

            # Print the result
            if predicted_class == 0:
                print(f"{filename} => Class A")
            else:
                print(f"{filename} => Class B")

if __name__ == '__main__':
    main()


## Using the results make the accuracy calculation