<a href="https://colab.research.google.com/github/jullazarovych/SSHbruteforsedetection_ml/blob/main/brute_force_attack_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime
import re
from datetime import datetime
import tensorflow as tf
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score
from tensorflow.keras import layers
from sklearn.utils import class_weight
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
def calculate_average_time_between_attempts(timestamps, window_size=3600):
    if len(timestamps) <= 1:
        return 0

    sorted_times = sorted(timestamps)
    time_array = np.array([ts.timestamp() for ts in sorted_times])
    time_diffs = np.diff(time_array)

    if len(time_diffs) == 0:
        return 0

    weights = np.exp(-time_diffs / window_size)

    weight_sum = np.sum(weights)
    if weight_sum < 1e-10:
        return float(np.mean(time_diffs))

    weights = np.divide(weights, weight_sum, where=(weight_sum != 0))

    try:
        result = float(np.average(time_diffs, weights=weights))
        return result if not np.isnan(result) else float(np.mean(time_diffs))
    except:
        return float(np.mean(time_diffs))

In [None]:
w_avg_time_between_attempts = -10
w_total_attempts = 2
w_unique_usernames = 3
w_root_attempts = 3
w_error_failed_password = 2
w_error_invalid_user =3
w_error_unable_to_negotiate = 1
w_error_max_authentication = 4
threshold = 105

In [None]:
def process_ip_data(current_ip_data, time_threshold_seconds=600):
    data = []

    for ip, ip_info in current_ip_data.items():
        avg_time_between_attempts = calculate_average_time_between_attempts(
            ip_info['timestamps'],
            time_threshold_seconds
        )

        entry = {
            'ip': ip,
            'avg_time_between_attempts': avg_time_between_attempts,
            'unique_usernames': len(ip_info['usernames']),
            'total_attempts': ip_info['attempts'],
            'root_attempts': ip_info['root_attempts']
        }

        for error_type in ['failed password', 'invalid user', 'unable to negotiate', "max authentication"]:
            entry[f'error_{error_type.lower().replace(" ", "_")}'] = \
                ip_info['error_types'].get(error_type, 0)

        data.append(entry)

    df = pd.DataFrame(data)

    df['norm_time_between'] = 1.0 / (1.0 + df['avg_time_between_attempts']/ 10)

    df['bruteforce_score'] = (
        w_avg_time_between_attempts * df['norm_time_between'] +
        w_total_attempts * np.log1p(df['total_attempts']) +
        w_unique_usernames * df['unique_usernames'] +
        w_root_attempts * df['root_attempts'] +
        w_error_failed_password * df['error_failed_password'] +
        w_error_invalid_user * df['error_invalid_user'] +
        w_error_unable_to_negotiate * df['error_unable_to_negotiate'] +
        w_error_max_authentication * df['error_max_authentication']
    )
    df['bruteforce'] = (df['bruteforce_score'] > threshold).astype(int)

    return df

In [None]:
def process_log_file(file_path, time_threshold_seconds=600):
    with open(file_path, 'r') as file:
        logs = file.readlines()

    data = []
    timestamp_pattern = r'^(\w+ \d+ \d+:\d+:\d+)'
    ip_pattern = r'(\d+\.\d+\.\d+\.\d+)'
    username_pattern1 = r'user (\w+)'
    username_pattern2 = r'for (\w+) from'
    error_pattern = r': (.+?) for'
    current_ip_data = {}

    for line in logs:
        line = line.strip()
        repeat_match = re.search(r"message repeated (\d+) times", line)
        parts = line.split(None, 4)
        if len(parts) >= 5:
            message = parts[4]
        else:
            continue

        if any(skip in line for skip in ["PAM", "pam", "Received disconnect", "kex_exchange_identification",
                                         "error: type", "Connection", "Disconnected", "Disconnecting"]):
            continue

        try:
            timestamp_match = re.search(timestamp_pattern, line)
            if timestamp_match:
                timestamp = datetime.strptime(timestamp_match.group(1), '%b %d %H:%M:%S')

            ip_match = re.search(ip_pattern, line)
            if ip_match:
                ip = ip_match.group(1)

                if ip not in current_ip_data:
                    current_ip_data[ip] = {
                        'timestamps': [],
                        'usernames': set(),
                        'error_types': {},
                        'attempts': 0,
                        'root_attempts': 0
                    }

                current_ip_data[ip]['timestamps'].append(timestamp)

                username = None
                username_match1 = re.search(username_pattern1, line)
                username_match2 = re.search(username_pattern2, line)

                if username_match1:
                    username = username_match1.group(1)
                elif username_match2:
                    username = username_match2.group(1)

                if username:
                    current_ip_data[ip]['usernames'].add(username)
                    if username == 'root':
                        current_ip_data[ip]['root_attempts'] += 1

                if repeat_match:
                    repeat_count = int(repeat_match.group(1))
                else:
                    repeat_count = 0

                if "Failed password" in line:
                    error_type = "failed password"
                elif "Invalid user" in line or "Failed none" in line:
                    error_type = "invalid user"
                elif "Unable to negotiate" in line:
                    error_type = "unable to negotiate"
                elif "error: maximum authentication" in line:
                    error_type = "max authentication"

                current_ip_data[ip]['error_types'][error_type] = \
                    current_ip_data[ip]['error_types'].get(error_type, 0) + 1 + repeat_count

                current_ip_data[ip]['attempts'] += 1 + repeat_count
                current_ip_data[ip]['root_attempts'] += repeat_count

        except Exception as e:
            print(f"Error processing line: {line}")
            print(f"Error: {str(e)}")
            continue

    return process_ip_data(current_ip_data)

In [None]:
def prepare_features(df):
    feature_columns = [
        'avg_time_between_attempts',
        'unique_usernames',
        'total_attempts',
        'root_attempts',
        'error_failed_password',
        'error_invalid_user',
        'error_unable_to_negotiate',
        'error_max_authentication'
    ]

    for col in feature_columns:
        if col not in df.columns:
            df[col] = 0

    return df[['ip', 'bruteforce'] + feature_columns]

In [None]:
def create_nn_model(input_shape):
    model = keras.Sequential([
        layers.Dense(16, activation="relu", input_shape=input_shape),
        layers.Dense(8, activation="relu"),
        layers.Dense(1, activation="sigmoid")
    ])

    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model

In [None]:
def evaluate_model(y_true, y_pred, model_name):
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)

    print(f"\n--- {model_name} ---")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print("Classification Report:")
    print(classification_report(y_true, y_pred))

    return {
        'model': model_name,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [None]:
print("Processing training data...")
train_df = process_log_file('/content/drive/MyDrive/data/train.log')
train_features = prepare_features(train_df)
print(train_df)
train_df.to_csv('df.csv', index=False)
print("\nResults have been saved to 'df.csv'.")
X_train = train_features.drop(columns=["ip", "bruteforce"])
y_train = train_features["bruteforce"]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)

Processing training data...
                  ip  avg_time_between_attempts  unique_usernames  \
0       218.92.0.213                1042.415321                 0   
1      92.255.57.132                  15.216826                65   
2       218.92.0.204                1061.001869                 0   
3     36.110.228.254                   2.810713                13   
4     159.89.170.239                   2.144905                25   
...              ...                        ...               ...   
996   185.213.165.72                  41.192834                 7   
997   77.221.147.239                  40.303067                 8   
998     51.75.20.198                  36.182043                 6   
999    54.37.233.240                  39.517384                 7   
1000    14.103.27.46                   2.000000                 1   

      total_attempts  root_attempts  error_failed_password  \
0                 53              0                      0   
1                15

TEST

In [None]:
print("\nTraining Neural Network...")
model = create_nn_model((X_train_scaled.shape[1],))
history = model.fit(X_train_scaled, y_train, epochs=10, batch_size=8, validation_split=0.2)


Training Neural Network...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 5ms/step - accuracy: 0.5306 - loss: 0.6793 - val_accuracy: 0.9204 - val_loss: 0.5976
Epoch 2/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.7743 - loss: 0.5721 - val_accuracy: 0.9851 - val_loss: 0.3965
Epoch 3/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - accuracy: 0.8249 - loss: 0.4350 - val_accuracy: 0.9851 - val_loss: 0.2708
Epoch 4/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.8152 - loss: 0.3727 - val_accuracy: 0.9851 - val_loss: 0.2079
Epoch 5/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.8416 - loss: 0.3383 - val_accuracy: 0.9851 - val_loss: 0.1758
Epoch 6/10
[1m100/100[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.9701 - loss: 0.2930 - val_accuracy: 0.9801 - val_loss: 0.1384
Epoch 7/10
[1m100/100[0m 

In [None]:
print("\nTraining Random Forest...")
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)


Training Random Forest...


In [None]:
print("\nTraining SVM...")
svm_model = SVC(kernel='rbf', C=1.0, random_state=42)
svm_model.fit(X_train_scaled, y_train)


Training SVM...


In [None]:
print("Processing test data...")
test_df = process_log_file('/content/drive/MyDrive/data/test.log')
test_features = prepare_features(test_df)

X_test = test_features.drop(columns=["ip", "bruteforce"])
y_test = test_features["bruteforce"]

X_test_scaled = scaler.transform(X_test)

Processing test data...


In [None]:
nn_predictions = (model.predict(X_test_scaled) > 0.5).astype(int).flatten()
nn_results = evaluate_model(y_test, nn_predictions, "Neural Network")

[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step

--- Neural Network ---
Accuracy: 0.9809
Precision: 0.9958
Recall: 0.9696
F1 Score: 0.9825
Classification Report:
              precision    recall  f1-score   support

           0       0.96      0.99      0.98       399
           1       1.00      0.97      0.98       493

    accuracy                           0.98       892
   macro avg       0.98      0.98      0.98       892
weighted avg       0.98      0.98      0.98       892



In [None]:
rf_predictions = rf_model.predict(X_test)
rf_results = evaluate_model(y_test, rf_predictions, "Random Forest")


--- Random Forest ---
Accuracy: 0.9910
Precision: 1.0000
Recall: 0.9838
F1 Score: 0.9918
Classification Report:
              precision    recall  f1-score   support

           0       0.98      1.00      0.99       399
           1       1.00      0.98      0.99       493

    accuracy                           0.99       892
   macro avg       0.99      0.99      0.99       892
weighted avg       0.99      0.99      0.99       892



In [None]:
svm_predictions = svm_model.predict(X_test_scaled)
svm_results = evaluate_model(y_test, svm_predictions, "Support Vector Machine")


--- Support Vector Machine ---
Accuracy: 0.9585
Precision: 0.9978
Recall: 0.9270
F1 Score: 0.9611
Classification Report:
              precision    recall  f1-score   support

           0       0.92      1.00      0.96       399
           1       1.00      0.93      0.96       493

    accuracy                           0.96       892
   macro avg       0.96      0.96      0.96       892
weighted avg       0.96      0.96      0.96       892



In [None]:
print("\nEvaluating model on test data...")
results = [nn_results, rf_results, svm_results]
results_df = pd.DataFrame(results)
plt.figure(figsize=(12, 8))
metrics = ['accuracy', 'precision', 'recall', 'f1']
models = results_df['model']

for i, metric in enumerate(metrics, 1):
    plt.subplot(2, 2, i)
    sns.barplot(x='model', y=metric, data=results_df)
    plt.title(f"{metric.capitalize()} by Model")
    plt.ylabel(metric.capitalize())
    plt.xlabel("")
    plt.ylim(0, 1)

plt.tight_layout()
plt.savefig('model_comparison.png')
plt.close()
results_df.to_csv('model_comparison_results.csv', index=False)
print("Detailed results saved to model_comparison_results.csv")


Evaluating model on test data...
Detailed results saved to model_comparison_results.csv
