In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
import pandas as pd

# Initialize Spark
spark = SparkSession.builder.appName("KDDCupAnomalyDetection").getOrCreate()

# Load column names
with open('/content/kddcup.names.txt', 'r') as f:
    col_names_raw = f.readlines()
col_names_cleaned = [line.split(':')[0].strip() for line in col_names_raw if ':' in line]
col_names_cleaned.append('result')

# Load dataset
df = spark.read.csv('/content/kddcup.data.corrected', header=False, inferSchema=True)
df = df.toDF(*col_names_cleaned)

# Filter http + normal
df_http = df.filter(col('service') == 'http')
df_http_normal = df_http.filter(col('result') == 'normal.')

# Drop non-numerical/unwanted cols
cols_to_drop = ['protocol_type', 'service', 'flag', 'land', 'logged_in',
                'is_host_login', 'is_guest_login', 'result',
                'wrong_fragment', 'urgent',
                'su_attempted', 'num_file_creations', 'num_outbound_cmds']
df_http_normal = df_http_normal.drop(*cols_to_drop)

# Cast all to float
for c in df_http_normal.columns:
    df_http_normal = df_http_normal.withColumn(c, col(c).cast(FloatType()))

# 5. Train/Test split (20% test, 80% train)
test, train = df_http_normal.randomSplit([0.2, 0.8], seed=42)
print(f"Train count = {train.count()}, Test count = {test.count()}")
# Assemble & scale on TRAIN
assembler = VectorAssembler(inputCols=train.columns, outputCol="features")
df_features = assembler.transform(df_http_normal)
train_feats = assembler.transform(train)
scaler     = StandardScaler(inputCol="features", outputCol="scaled_features",
                             withMean=True, withStd=True)
scaler_model = scaler.fit(train_feats)
train_scaled = scaler_model.transform(train_feats)
df_scaled = scaler_model.transform(df_features)

# PCA on TRAIN
k = 14
pca = PCA(k=k, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(train_scaled)
train_pca = pca_model.transform(train_scaled).select("pca_features")
df_pca = pca_model.transform(df_scaled).select("pca_features")
pdf = df_pca.toPandas()
pca_cols = ['PCA_' + str(i) for i in range(k)]
pdf_pca = pd.DataFrame(pdf['pca_features'].tolist(), columns=pca_cols)
pdf_pca.head()
pdf_train = train_pca.toPandas()
pdf_train_pca = pd.DataFrame(
    pdf_train['pca_features'].tolist(), columns=pca_cols
)

pca_cols = ['PCA_' + str(i) for i in range(k)]

# Sliding Windows
import numpy as np
import tqdm

def get_windows(df, window_size=10, stride=10):
    windows = []
    for i in tqdm.tqdm(range(0, len(df) - window_size + 1, stride)):
        windows.append(df.iloc[i:i + window_size].to_numpy())
    return np.array(windows)

window_size = 10
stride = 10
windows_arr = get_windows(pdf_train_pca, window_size, stride)

# Shuffle
np.random.shuffle(windows_arr)


Train count = 111358, Test count = 28115


100%|██████████| 11135/11135 [00:00<00:00, 63867.00it/s]


In [5]:
import tensorflow as tf
from tensorflow.keras.layers import LSTM, TimeDistributed, Dense, RepeatVector
from tensorflow.keras.models import Sequential
import tensorflow.keras.backend as K

K.clear_session()

# Encoder
encoder = Sequential([
    LSTM(80, return_sequences=True, activation='selu', input_shape=(window_size, k)),
    LSTM(50, return_sequences=True, activation='selu'),
    LSTM(20, activation='selu'),
], name='encoder')

# Decoder
decoder = Sequential([
    RepeatVector(window_size),
    LSTM(50, return_sequences=True, activation='selu'),
    LSTM(80, return_sequences=True, activation='selu'),
    TimeDistributed(Dense(k, activation='linear'))
], name='decoder')

# Autoencoder
autoencoder = Sequential([encoder, decoder], name='autoencoder')
autoencoder.compile(optimizer='adam', loss=tf.keras.losses.Huber(100.))
autoencoder.fit(windows_arr, windows_arr[:, :, ::-1], epochs=5, batch_size=64, validation_split=0.2)

# Save Model
autoencoder.save("autoencoder.h5")


  super().__init__(**kwargs)


Epoch 1/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 54ms/step - loss: 0.7974 - val_loss: 0.5839
Epoch 2/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 51ms/step - loss: 0.6491 - val_loss: 0.5707
Epoch 3/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 59ms/step - loss: 0.5706 - val_loss: 0.5643
Epoch 4/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 58ms/step - loss: 0.7109 - val_loss: 0.5466
Epoch 5/5
[1m140/140[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 47ms/step - loss: 0.4508 - val_loss: 0.5405




In [6]:
converter = tf.lite.TFLiteConverter.from_keras_model(autoencoder)
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS  # Enable Select TF ops
]
converter._experimental_lower_tensor_list_ops = False  # Avoid lowering TensorList ops
converter.experimental_enable_resource_variables = True  # Enable resource variable support
tflite_model = converter.convert()



# Save TFLite model
with open("autoencoder.tflite", "wb") as f:
    f.write(tflite_model)


Saved artifact at '/tmp/tmprga22hu5'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 10, 14), dtype=tf.float32, name='keras_tensor_4')
Output Type:
  TensorSpec(shape=(None, 10, 14), dtype=tf.float32, name=None)
Captures:
  138684717502864: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717511504: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717514576: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717510544: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717511312: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717514384: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717513616: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717513424: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684717510928: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684715516752: TensorSpec(shape=(), dtype=tf.resource, name=None)
  138684715519

In [7]:
!pip install tflite-runtime


Collecting tflite-runtime
  Downloading tflite_runtime-2.14.0-cp311-cp311-manylinux2014_x86_64.whl.metadata (1.4 kB)
Downloading tflite_runtime-2.14.0-cp311-cp311-manylinux2014_x86_64.whl (2.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m21.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tflite-runtime
Successfully installed tflite-runtime-2.14.0


In [8]:
!pip install numpy<2


/bin/bash: line 1: 2: No such file or directory


In [9]:

import tensorflow as tf

interpreter = tf.lite.Interpreter(model_path="autoencoder.tflite")

interpreter.allocate_tensors()

# Get input/output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Function to predict a single sample
def predict_tflite(sample_batch):
    predictions = []
    for i in range(sample_batch.shape[0]):
        sample = np.expand_dims(sample_batch[i], axis=0).astype(np.float32)
        interpreter.set_tensor(input_details[0]['index'], sample)
        interpreter.invoke()
        output = interpreter.get_tensor(output_details[0]['index'])
        predictions.append(output[0])
    return np.array(predictions)

# Load test data
test_df = spark.read.csv('/content/kddcup.data.corrected', header=False, inferSchema=True)
test_df = test_df.toDF(*col_names_cleaned)
test_df_http = test_df.filter(col('service') == 'http')
test_df_http = test_df_http.withColumn("anomaly_indicator", when(col('result') == 'normal.', 0).otherwise(1))
test_df_http = test_df_http.drop(*cols_to_drop)

# Cast all test columns
for c in test_df_http.columns:
    if c != "anomaly_indicator":
        test_df_http = test_df_http.withColumn(c, col(c).cast(FloatType()))

assembler_test = VectorAssembler(inputCols=[c for c in test_df_http.columns if c != "anomaly_indicator"], outputCol="features")
test_df_features = assembler_test.transform(test_df_http)
test_df_scaled = scaler_model.transform(test_df_features)
test_df_pca = pca_model.transform(test_df_scaled).select("pca_features")

pdf_test = test_df_pca.toPandas()
pdf_test_pca = pd.DataFrame(pdf_test['pca_features'].tolist(), columns=pca_cols)

test_windows = get_windows(pdf_test_pca, window_size=10, stride=10)
tflite_preds = predict_tflite(test_windows)



100%|██████████| 22369/22369 [00:00<00:00, 30234.30it/s]


In [10]:
# Calculate reconstruction errors
def get_recon_errors(true_windows, pred_windows):
    recon_errors = []
    for i in range(true_windows.shape[0]):
        diff = true_windows[i] - pred_windows[i]
        error = np.mean(np.linalg.norm(diff, axis=1))
        recon_errors.append(error)
    return np.array(recon_errors).reshape(-1, 1)

recon_errors = get_recon_errors(test_windows, tflite_preds)

from sklearn.preprocessing import MinMaxScaler
mm_scaler = MinMaxScaler()
anomaly_scores = mm_scaler.fit_transform(recon_errors).flatten()

from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

test_labels_series = test_df_http.select("anomaly_indicator").toPandas()
test_labels_arr = test_labels_series.values.flatten()
test_window_labels = [1 if np.sum(test_labels_arr[i:i+window_size]) > 0 else 0
                      for i in range(0, len(test_labels_arr) - window_size + 1, stride)]

thresholds = np.linspace(0.0, 1.0, num=50)
anomaly_combinations = [(anomaly_scores > thr).astype(int) for thr in thresholds]
f1_scores = [f1_score(test_window_labels, pred) for pred in anomaly_combinations]
max_f1_score = np.max(f1_scores)
best_threshold = thresholds[f1_scores.index(max_f1_score)]
print('Best threshold =', best_threshold)

anomaly_indicator = (anomaly_scores > best_threshold).astype(int)
print('Precision:', precision_score(test_window_labels, anomaly_indicator))
print('Recall:', recall_score(test_window_labels, anomaly_indicator))
print('F1:', f1_score(test_window_labels, anomaly_indicator))
print('Accuracy:', accuracy_score(test_window_labels, anomaly_indicator))


Best threshold = 0.3877551020408163
Precision: 1.0
Recall: 1.0
F1: 1.0
Accuracy: 1.0


In [11]:
with open('/content/kddcup.names.txt', 'r') as f:
    col_names_raw = f.readlines()
col_names_cleaned = [line.split(':')[0].strip() for line in col_names_raw if ':' in line]
col_names_cleaned.append('result')

# Load dataset
df = spark.read.csv('/content/kddcup.data.corrected', header=False, inferSchema=True)
df = df.toDF(*col_names_cleaned)

In [12]:
df

DataFrame[duration: int, protocol_type: string, service: string, flag: string, src_bytes: int, dst_bytes: int, land: int, wrong_fragment: int, urgent: int, hot: int, num_failed_logins: int, logged_in: int, num_compromised: int, root_shell: int, su_attempted: int, num_root: int, num_file_creations: int, num_shells: int, num_access_files: int, num_outbound_cmds: int, is_host_login: int, is_guest_login: int, count: int, srv_count: int, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: int, dst_host_srv_count: int, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, result: string]

In [13]:
df.write.csv("/content/cleaned_data_spark", header=True)



In [14]:
# Zip the folder
!zip -r /content/cleaned_data_spark.zip /content/cleaned_data_spark

# Download
from google.colab import files
files.download("/content/cleaned_data_spark.zip")


  adding: content/cleaned_data_spark/ (stored 0%)
  adding: content/cleaned_data_spark/_SUCCESS (stored 0%)
  adding: content/cleaned_data_spark/part-00000-471da543-a3aa-49fd-b157-e3f2493d3242-c000.csv (deflated 94%)
  adding: content/cleaned_data_spark/._SUCCESS.crc (stored 0%)
  adding: content/cleaned_data_spark/.part-00000-471da543-a3aa-49fd-b157-e3f2493d3242-c000.csv.crc (deflated 15%)
  adding: content/cleaned_data_spark/part-00001-471da543-a3aa-49fd-b157-e3f2493d3242-c000.csv (deflated 93%)
  adding: content/cleaned_data_spark/.part-00001-471da543-a3aa-49fd-b157-e3f2493d3242-c000.csv.crc (deflated 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [15]:
import pandas as pd

# Create a DataFrame to store results
results_df = pd.DataFrame({
    'Anomaly_Score': anomaly_scores,
    'Predicted_Label': anomaly_indicator,
    'True_Label': test_window_labels
})

# Save to CSV
results_df.to_csv("anomaly_detection_results.csv", index=False)
print("Results saved to 'anomaly_detection_results.csv'")


Results saved to 'anomaly_detection_results.csv'


In [16]:
results_df["Reconstruction_Error"] = recon_errors.flatten()


In [17]:
results_df.to_csv("anomaly_detection_results.csv", index=False)

In [18]:
import joblib
import json
import os

# Make a folder to store metadata
os.makedirs("model_metadata", exist_ok=True)

# Save Spark StandardScaler model
scaler_model.save("model_metadata/scaler_model")

# Save Spark PCA model
pca_model.save("model_metadata/pca_model")

# Save column names used for features
with open("model_metadata/feature_columns.json", "w") as f:
    json.dump(df_http_normal.columns, f)

# Save PCA column names (for Pandas)
with open("model_metadata/pca_columns.json", "w") as f:
    json.dump(pca_cols, f)

# Save window size and stride
params = {
    "window_size": window_size,
    "stride": stride
}
with open("model_metadata/preprocessing_params.json", "w") as f:
    json.dump(params, f)


In [19]:
with open('/content/kddcup.names.txt', 'r') as f:
    col_names_raw = f.readlines()
col_names_cleaned = [line.split(':')[0].strip() for line in col_names_raw if ':' in line]
col_names_cleaned.append('result')

In [20]:
col_names_cleaned

['duration',
 'protocol_type',
 'service',
 'flag',
 'src_bytes',
 'dst_bytes',
 'land',
 'wrong_fragment',
 'urgent',
 'hot',
 'num_failed_logins',
 'logged_in',
 'num_compromised',
 'root_shell',
 'su_attempted',
 'num_root',
 'num_file_creations',
 'num_shells',
 'num_access_files',
 'num_outbound_cmds',
 'is_host_login',
 'is_guest_login',
 'count',
 'srv_count',
 'serror_rate',
 'srv_serror_rate',
 'rerror_rate',
 'srv_rerror_rate',
 'same_srv_rate',
 'diff_srv_rate',
 'srv_diff_host_rate',
 'dst_host_count',
 'dst_host_srv_count',
 'dst_host_same_srv_rate',
 'dst_host_diff_srv_rate',
 'dst_host_same_src_port_rate',
 'dst_host_srv_diff_host_rate',
 'dst_host_serror_rate',
 'dst_host_srv_serror_rate',
 'dst_host_rerror_rate',
 'dst_host_srv_rerror_rate',
 'result']

In [21]:
!unzip "kddcup.data.corrected.zip"

unzip:  cannot find or open kddcup.data.corrected.zip, kddcup.data.corrected.zip.zip or kddcup.data.corrected.zip.ZIP.


In [22]:
# 1) Zip the folder
!zip -r model_metadata.zip model_metadata

# 2) Download the zip to your computer
from google.colab import files
files.download("model_metadata.zip")


  adding: model_metadata/ (stored 0%)
  adding: model_metadata/pca_columns.json (deflated 62%)
  adding: model_metadata/scaler_model/ (stored 0%)
  adding: model_metadata/scaler_model/data/ (stored 0%)
  adding: model_metadata/scaler_model/data/part-00000-036fa528-e5e4-4368-9972-3ab9c37d8492-c000.snappy.parquet (deflated 58%)
  adding: model_metadata/scaler_model/data/_SUCCESS (stored 0%)
  adding: model_metadata/scaler_model/data/.part-00000-036fa528-e5e4-4368-9972-3ab9c37d8492-c000.snappy.parquet.crc (stored 0%)
  adding: model_metadata/scaler_model/data/._SUCCESS.crc (stored 0%)
  adding: model_metadata/scaler_model/metadata/ (stored 0%)
  adding: model_metadata/scaler_model/metadata/_SUCCESS (stored 0%)
  adding: model_metadata/scaler_model/metadata/._SUCCESS.crc (stored 0%)
  adding: model_metadata/scaler_model/metadata/.part-00000.crc (stored 0%)
  adding: model_metadata/scaler_model/metadata/part-00000 (deflated 43%)
  adding: model_metadata/preprocessing_params.json (deflated 9

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>