In [5]:
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.covariance import EllipticEnvelope
from sklearn.neural_network import MLPClassifier
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_X_y, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.exceptions import NotFittedError
from sklearn.ensemble import VotingClassifier
import numpy as np

# Load the data
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")

# Convert timestamp column to numeric type before calling to_datetime
train_data["timestamp"] = pd.to_datetime(train_data["timestamp"].astype(int), unit="s", errors="coerce")
test_data["timestamp"] = pd.to_datetime(test_data["timestamp"].astype(int), unit="s", errors="coerce")

# Convert is_anomaly column to numeric type
train_data["is_anomaly"] = pd.to_numeric(train_data["is_anomaly"], errors="coerce")

# Split the data into features (X) and target (y)
X = train_data.drop(["is_anomaly", "timestamp"], axis=1)
y = train_data["is_anomaly"]

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale the data using StandardScaler
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Define a list of models to evaluate
models = [
    OneClassSVM(kernel="rbf", gamma=0.1, nu=0.1),
    LocalOutlierFactor(n_neighbors=20, contamination=0.1, novelty=True),
    EllipticEnvelope(contamination=0.1),
    MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000),
    RandomForestClassifier(n_estimators=100, random_state=42)
]

# Evaluate each model and print the F1 score
for model in models:
    model.fit(X_train_scaled, y_train)
    y_pred = model.predict(X_train_scaled)
    print(f"Model: {model.__class__.__name__}, F1 Score: {f1_score(y_train, y_pred, average='macro'):.4f}")

class IsolationForestClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, contamination=0.1, random_state=42):
        self.contamination = contamination
        self.random_state = random_state
        self.iforest_ = IsolationForest(contamination=contamination, random_state=random_state)

    def fit(self, X, y):
        self.iforest_.fit(X)
        return self

    def predict(self, X):
        scores = self.iforest_.decision_function(X)
        pred = np.where(scores > 0, 1, 0)
        return pred
    def predict_proba(self, X):
        # Local Outlier Factor does not provide probability estimates, so we can return a dummy probability
        return np.ones((X.shape[0], 2))

class OneClassSVMClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, kernel="rbf", gamma=0.1, nu=0.1):
        self.kernel = kernel
        self.gamma = gamma
        self.nu = nu
        self.ocsvm_ = OneClassSVM(kernel=kernel, gamma=gamma, nu=nu)

    def fit(self, X, y):
        self.ocsvm_.fit(X)
        return self

    def predict(self, X):
        pred = self.ocsvm_.predict(X)
        return np.where(pred == 1, 1, 0)

    def predict_proba(self, X):
        # One-class SVM does not provide probability estimates, so we can return a dummy probability
        return np.ones((X.shape[0], 2))

class LocalOutlierFactorClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, n_neighbors=20, contamination=0.1, novelty=True):
        self.n_neighbors = n_neighbors
        self.contamination = contamination
        self.novelty = novelty
        self.lof_ = LocalOutlierFactor(n_neighbors=n_neighbors, contamination=contamination, novelty=novelty)

    def fit(self, X, y):
        self.lof_.fit(X)
        return self

    def predict(self, X):
        pred = self.lof_.predict(X)
        return np.where(pred == 1, 1, 0)

    def predict_proba(self, X):
        # Local Outlier Factor does not provide probability estimates, so we can return a dummy probability
        return np.ones((X.shape[0], 2))

estimators = [
    ("one_class_svm", OneClassSVMClassifier(kernel="rbf", gamma=0.1, nu=0.1)),
    ("local_outlier_factor", LocalOutlierFactorClassifier(n_neighbors=20, contamination=0.1, novelty=True)),
    ("isolation_forest", IsolationForestClassifier(contamination=0.1, random_state=42)),
    ("mlp", MLPClassifier(hidden_layer_sizes=(100, 50), max_iter=1000)),
    ("random_forest", RandomForestClassifier(n_estimators=100, random_state=42))
]


voting_model = VotingClassifier(estimators=estimators, voting="soft")
voting_model.fit(X_train_scaled, y_train)

# Make predictions on the test data using the voting model
test_pred = voting_model.predict(scaler.transform(test_data.drop(["timestamp"], axis=1)))

# Create a submission.csv file
submission = pd.DataFrame({"timestamp": pd.to_numeric(test_data["timestamp"], downcast="integer"), "is_anomaly": test_pred})
submission.to_csv("submission.csv", index=False)

Model: OneClassSVM, F1 Score: 0.0239
Model: LocalOutlierFactor, F1 Score: 0.0206
Model: EllipticEnvelope, F1 Score: 0.0166
Model: MLPClassifier, F1 Score: 0.8513
Model: RandomForestClassifier, F1 Score: 0.9987
