In [None]:
!pip install optuna

In [None]:
deps = [
    "pandas",
    "numpy",
    "lightgbm",
    "optuna",
    "scikit-learn",
    "imblearn",
    "joblib",
    "matplotlib"
]

import pkg_resources
with open("/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/requirements_2.txt", "w") as f:
    for pkg in deps:
        try:
            v = pkg_resources.get_distribution(pkg).version
            f.write(f"{pkg}=={v}\n")
        except:
            pass

In [None]:
import pandas as pd
import numpy as np
import lightgbm as lgb
import optuna
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import f1_score, recall_score, roc_auc_score

In [None]:
!python --version

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
data = pd.read_csv('/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/preprocess/train_df.csv')

### **Data Visz**

In [None]:
data.info()

In [None]:
data.head()

In [None]:
data['label'].value_counts()

In [None]:
data['attack_cat_enc'].value_counts()

In [None]:
missing_values = data.isnull().sum()

missing_values = missing_values[missing_values > 0]

print("Missing values per kolom:")
print(missing_values)

In [None]:
missing_values = data.isnull().sum()

missing_percent = data.isnull().mean() * 100

missing = pd.concat([missing_values, missing_percent], axis=1)
missing.columns = ['Jumlah Missing', 'Persen']
missing = missing[missing['Jumlah Missing'] > 0]
print(missing)

In [None]:
df_label = data.copy()

data_label = df_label.drop(columns=['attack_cat_enc'])
data_attack = data[data['attack_cat_enc'] != 5].drop(columns=['label'])

### **Deteksi Serangan (1/0)**

In [None]:
data_label.head()

In [None]:
X_label = data_label.drop(columns=['label'])
y_label = data_label['label']

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_label, y_label, test_size=0.2, random_state=42, stratify=y_label)

In [None]:
def evaluate_binary_model(model, X_train, y_train, X_test, y_test):
    print("==== Evaluasi pada Binary Data Training ====")
    y_train_pred = model.predict(X_train)
    y_train_prob = model.predict_proba(X_train)[:, 1]  # Probabilitas class 1
    print(f"F1-score (class 1): {f1_score(y_train, y_train_pred, pos_label=1):.4f}")
    print(f"Recall (class 1):   {recall_score(y_train, y_train_pred, pos_label=1):.4f}")
    print(f"ROC-AUC score:      {roc_auc_score(y_train, y_train_prob):.4f}")

    print("\n==== Evaluasi pada Binary Data Testing ====")
    y_test_pred = model.predict(X_test)
    y_test_prob = model.predict_proba(X_test)[:, 1]
    print(f"F1-score (class 1): {f1_score(y_test, y_test_pred, pos_label=1):.4f}")
    print(f"Recall (class 1):   {recall_score(y_test, y_test_pred, pos_label=1):.4f}")
    print(f"ROC-AUC score:      {roc_auc_score(y_test, y_test_prob):.4f}")

**Optuna Param Tuning**

In [None]:
def objective(trial):
    max_depth = trial.suggest_int("max_depth", 3, 8)
    max_leaves = min(200, 2 ** max_depth)
    min_leaves = min(max_leaves, 2 ** (max_depth - 1))

    param = {
        "objective": "binary",
        "verbosity": -1,
        "boosting_type": "gbdt",
        "random_state": 42,
        "n_jobs": -1,
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
        "num_leaves": trial.suggest_int("num_leaves", min_leaves, max_leaves),
        "max_depth": max_depth,
        "min_child_samples": trial.suggest_int("min_child_samples", 10, 100),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
        "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "subsample_freq": trial.suggest_int("subsample_freq", 1, 5),
        "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True),
        "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True),
        "scale_pos_weight": scale_pos_weight,
    }

    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    f1_scores = []
    recall_scores = []
    roc_auc_scores = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train)):
        X_tr, X_val_ = X_train.iloc[train_idx], X_train.iloc[val_idx]
        y_tr, y_val_ = y_train.iloc[train_idx], y_train.iloc[val_idx]

        model = lgb.LGBMClassifier(n_estimators=1000, **param)
        model.fit(
            X_tr, y_tr,
            eval_set=[(X_val_, y_val_)],
            eval_metric="binary_logloss",
            callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)]
        )

        y_pred = model.predict(X_val_)
        y_proba = model.predict_proba(X_val_)[:, 1]  # diperlukan untuk ROC AUC

        f1 = f1_score(y_val_, y_pred, pos_label=1)
        recall = recall_score(y_val_, y_pred, pos_label=1)
        auc = roc_auc_score(y_val_, y_proba)

        print(f"Fold {fold+1} — F1: {f1:.4f} | Recall: {recall:.4f} | ROC AUC: {auc:.4f}")

        f1_scores.append(f1)
        recall_scores.append(recall)
        roc_auc_scores.append(auc)

    trial.set_user_attr("recall_macro", np.mean(recall_scores))
    trial.set_user_attr("roc_auc", np.mean(roc_auc_scores))

    return np.mean(f1_scores)

In [None]:
db_path = "/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/optuna_study.db"

neg, pos = np.bincount(y_train)
scale_pos_weight = neg / pos

study = optuna.create_study(
    study_name="lgbm_f1score_optuna_no_pruner",
    direction='maximize',
    storage=f"sqlite:///{db_path}",
    load_if_exists=True
)

study.optimize(objective, n_trials=100, show_progress_bar=True)

In [None]:
neg, pos = np.bincount(y_train)
scale_pos_weight = neg / pos

study = optuna.load_study(
    study_name="lgbm_f1score_optuna_no_pruner",
    storage = "sqlite:////content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/optuna_study.db"
)
study.optimize(objective, n_trials=26, show_progress_bar=True)

In [None]:
import json

with open("/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/lgbm_params.json", "w") as f:
    json.dump(study.best_trial.params, f)

In [None]:
print("Best trial:")
print("  Balanced Accuracy:", study.best_value)
print("  Best Params:")
for key, value in study.best_params.items():
    print(f"    {key}: {value}")

In [None]:
neg, pos = np.bincount(y_train)
scale_pos_weight = neg / pos

best_params = study.best_params
best_params.update({
    "objective": "binary",
    "random_state": 42,
    "scale_pos_weight": scale_pos_weight,
    "n_jobs": -1
})

lgbm_bin = lgb.LGBMClassifier(
    n_estimators=1000,
    **best_params
)

lgbm_bin.fit(X_train, y_train,
                   eval_set=[(X_test, y_test)],
                   eval_metric="auc",
                   callbacks=[lgb.early_stopping(50), lgb.log_evaluation(10)])
evaluate_binary_model(lgbm_bin, X_train, y_train, X_test, y_test)

In [None]:
import joblib

joblib.dump(lgbm_bin, '/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Model/lgbm_binary_model.pkl')

### **Deteksi Jenis Serangan**

In [None]:
data_attack.head()

In [None]:
X_att = data_attack.drop(columns=['attack_cat_enc'])
y_att = data_attack['attack_cat_enc']

In [None]:
X_train_att, X_test_att, y_train_att, y_test_att = train_test_split(X_att, y_att, test_size=0.2, random_state=42, stratify=y_att)

In [None]:
print(X_att.columns.tolist())

In [None]:
from imblearn.over_sampling import SMOTENC
from collections import Counter

categorical_cols = [
    'is_sm_ips_ports',
    'ct_state_ttl',
    'ct_flw_http_mthd',
    'is_ftp_login',
    'stime_hour',
    'stime_weekday',
    'ltime_hour',
    'ltime_weekday'
]

categorical_features = [X_att.columns.get_loc(col) for col in categorical_cols]

smote_nc = SMOTENC(categorical_features=categorical_features, random_state=42)
X_train_smt, y_train_smt = smote_nc.fit_resample(X_train_att, y_train_att)

print("Distribusi setelah SMOTENC:", Counter(y_train_smt))

In [None]:
from sklearn.metrics import f1_score, recall_score, classification_report, confusion_matrix

def evaluate_multiclass_model(model, X_train, y_train, X_test, y_test):
    print("==== Evaluasi pada Multi Class Data Training ====")
    y_train_pred = model.predict(X_train)
    print("F1-score (macro):", f1_score(y_train, y_train_pred, average='macro'))
    print("Recall (macro):  ", recall_score(y_train, y_train_pred, average='macro'))
    print("\nClassification Report (Train):")
    print(classification_report(y_train, y_train_pred))

    print("\n==== Evaluasi pada Multi Class Data Testing ====")
    y_test_pred = model.predict(X_test)
    print("F1-score (macro):", f1_score(y_test, y_test_pred, average='macro'))
    print("Recall (macro):  ", recall_score(y_test, y_test_pred, average='macro'))
    print("\nClassification Report (Test):")
    print(classification_report(y_test, y_test_pred))

**Optuna Param Tuning**

In [None]:
def objective_multiclass(trial):
    num_class = y_train_att.nunique()

    max_depth = trial.suggest_int("max_depth", 3, 10)
    max_leaves = min(255, 2 ** max_depth)
    min_leaves = min(max_leaves, 2 ** (max_depth - 1))

    param = {
        "objective": "multiclass",
        "num_class": num_class,
        "verbosity": -1,
        "boosting_type": "gbdt",
        "random_state": 42,
        "n_jobs": -1,
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
        "num_leaves": trial.suggest_int("num_leaves", min_leaves, max_leaves),
        "max_depth": max_depth,
        "min_child_samples": trial.suggest_int("min_child_samples", 10, 100),
        "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
        "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
        "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
        "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
        "subsample_freq": trial.suggest_int("subsample_freq", 1, 5),
        "lambda_l1": trial.suggest_float("lambda_l1", 1e-8, 10.0, log=True),
        "lambda_l2": trial.suggest_float("lambda_l2", 1e-8, 10.0, log=True),
        "class_weight": "balanced"  # Tambahkan ini
    }

    kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    f1_scores = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_smt, y_train_smt)):
        X_tr, X_val = X_train_smt.iloc[train_idx], X_train_smt.iloc[val_idx]
        y_tr, y_val = y_train_smt.iloc[train_idx], y_train_smt.iloc[val_idx]

        model = lgb.LGBMClassifier(n_estimators=1000, **param)

        model.fit(
            X_tr, y_tr,
            eval_set=[(X_val, y_val)],
            eval_metric="multi_logloss",
            callbacks=[lgb.early_stopping(50), lgb.log_evaluation(0)]
        )

        y_pred = model.predict(X_val)
        f1 = f1_score(y_val, y_pred, average="macro")
        f1_scores.append(f1)

        print(f"Fold {fold+1} — Macro F1: {f1:.4f}")

    return np.mean(f1_scores)

In [None]:
study_multi = optuna.create_study(
    study_name="lgbm_multiclass_optuna",
    direction="maximize",
    storage="sqlite:////content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/study_multiclass_classweight.db",
    load_if_exists=True
)

study_multi.optimize(objective_multiclass, n_trials=50, show_progress_bar=True)

In [None]:
study_multi = optuna.load_study(
    study_name="lgbm_multiclass_optuna",
    storage = "sqlite:////content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/study_multiclass_classweight.db"
)
study_multi.optimize(objective_multiclass, n_trials=12, show_progress_bar=True)

In [None]:
import json

with open("/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/lgbm_params_multi_classweight.json", "w") as f:
    json.dump(study_multi.best_trial.params, f)

In [None]:
import json

with open("/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Dataset/optuna/lgbm_params_multi_classweight.json", "r") as f:
    best_params_multi = json.load(f)

In [None]:
# best_params_multi = study_multi.best_params
best_params_multi.update({
    "objective": "multiclass",
    "num_class": y_train_smt.nunique(),
    "random_state": 42,
    "n_jobs": -1,
})

lgbm_multi = lgb.LGBMClassifier(n_estimators=1000, **best_params_multi)

lgbm_multi.fit(
    X_train_smt, y_train_smt,
    eval_set=[(X_test_att, y_test_att)],
    eval_metric="multi_logloss",
    callbacks=[lgb.early_stopping(50), lgb.log_evaluation(10)]
)

evaluate_multiclass_model(lgbm_multi, X_train_smt, y_train_smt, X_test_att, y_test_att)

In [None]:
def predict_with_confidence(model, X, threshold=0.6):
    proba = model.predict_proba(X)
    max_confidence = np.max(proba, axis=1)
    preds = np.argmax(proba, axis=1)

    preds_with_unknown = np.where(max_confidence < threshold, -1, preds)

    return preds_with_unknown, max_confidence

In [None]:
from sklearn.metrics import classification_report

y_pred, confidences = predict_with_confidence(lgbm_multi, X_test_att, threshold=0.6)

mask_valid = y_pred != -1
print(classification_report(y_test_att[mask_valid], y_pred[mask_valid]))

print("Total prediksi unknown:", np.sum(y_pred == -1))

In [None]:
y_proba = lgbm_multi.predict_proba(X_test_att)

max_proba = y_proba.max(axis=1)

In [None]:
import matplotlib.pyplot as plt
plt.hist(max_proba, bins=20)
plt.title("Distribusi Confidence Prediksi")
plt.show()

In [None]:
import joblib

joblib.dump(lgbm_multi, '/content/drive/MyDrive/Datathon 2025 (HidupJ0kow1)/Model/lgbm_multiclass_model.pkl')