In [1]:
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np

In [2]:
df_clean = pd.read_csv('dataset_clean_sql_injection.csv')
df_sql = pd.read_csv('with_sql_injection_dataset.csv')
df_combined = pd.concat([df_clean,df_sql], ignore_index=False)

In [3]:
df = df_combined.sample(frac=1, random_state=42).reset_index(drop=True)
df['is_sql_injection'] = df['is_sql_injection'].astype(int)
df['method'] = pd.Categorical(df['method']).codes

In [4]:
df.head()

Unnamed: 0,method,entropy,packet_length,sqli_keywords,number_of_special_chars,url_length,query_param_count,query_param_length,path_depth,is_sql_injection
0,1,4.061977,0,5,7,36,0,0,1,1
1,0,2.0,0,0,1,4,0,0,1,0
2,1,4.564986,0,5,7,39,0,0,1,1
3,1,4.081495,0,7,9,39,0,0,1,1
4,1,3.83804,0,0,3,48,1,10,1,0


In [6]:
from sklearn.model_selection import train_test_split

X = df.drop(columns=['is_sql_injection'])
y = df['is_sql_injection']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print("Training set size:", X_train.shape)
print("Testing set size:", X_test.shape)

Training set size: (716, 9)
Testing set size: (180, 9)


In [7]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

model = LogisticRegression()
model.fit(X_train, y_train)

y_pred = model.predict(X_test)


print("Accuracy:", accuracy_score(y_test, y_pred))
print("\nClassification Report:\n", classification_report(y_test, y_pred))
print("\nConfusion Matrix:\n", confusion_matrix(y_test, y_pred))

Accuracy: 0.9944444444444445

Classification Report:
               precision    recall  f1-score   support

           0       1.00      0.99      0.99        91
           1       0.99      1.00      0.99        89

    accuracy                           0.99       180
   macro avg       0.99      0.99      0.99       180
weighted avg       0.99      0.99      0.99       180


Confusion Matrix:
 [[90  1]
 [ 0 89]]


## Pre-process data

In [67]:
from collections import Counter
import math
from urllib.parse import urlparse, parse_qs
from urllib.parse import unquote

def calculate_entropy(text: str) -> float:
    if not isinstance(text, str) or not text.strip():
        return 0.0
    
    freq = Counter(text)
    total_length = len(text)

    entropy = sum(
        (-count / total_length) * math.log2(count / total_length) 
        for count in freq.values()
    )
    
    return entropy

In [111]:
def packet_length(packet_length: str) -> int:
    if not isinstance(packet_length, str) or not packet_length.strip():
        return 0
    body = unquote(packet_length)
    return len(body)

In [156]:
import re

sql_injection_patterns = [
    r"\bSELECT\b", r"\bINSERT\b", r"\bUPDATE\b", r"\bDELETE\b", r"\bDROP\b", r"\bUNION\b", r"\bJOIN\b",
    r"\bWHERE\b", r"\bORDER\s+BY\b", r"\bGROUP\s+BY\b", r"\bHAVING\b", r"\bEXEC\b", r"\bDECLARE\b",
    r"\bCASE\b", r"\bWHEN\b", r"\bTHEN\b", r"\bEND\b", r"\bIF\b", r"\bELSE\b", r"\bCAST\b", r"\bCONVERT\b",
    r"\bTRUNCATE\b", r"\bALTER\b", r"\bCREATE\b", r"\bREPLACE\b", r"\bRENAME\b", r"\bGRANT\b", r"\bREVOKE\b",
    r"\bMERGE\b", r"\bINTERSECT\b", r"\bEXCEPT\b", r"\bEXECUTE\b", r"\bFETCH\b", r"\bOPEN\b", r"\bCLOSE\b",
    r"\bDEALLOCATE\b", r"\bUSE\b", r"\bLIMIT\b", r"\bOFFSET\b", r"\bISNULL\b", r"\bCOALESCE\b",
    r"\bXP_CMDSHELL\b", r"\bWAITFOR\s+DELAY\b", r"\bBENCHMARK\b", r"\bCHAR\b", r"\bASCII\b", r"\bHEX\b",
    r"\bCONCAT\b", r"\bSUBSTRING\b", r"\bMID\b", r"\bIFNULL\b", r"\bLOAD_FILE\b", r"\bOUTFILE\b",
    r"\bINTO\b", r"\bDUMPFILE\b", r"\bFLOOR\b", r"\bRAND\b", r"\bMD5\b", r"\bSHA1\b",
    r"\bCURRENT_USER\b", r"\bSESSION_USER\b", r"\bSYSTEM_USER\b", r"\bUSER\b", r"\bVERSION\b",
    r"\bFOUND_ROWS\b", r"\bROW_COUNT\b", r"\bDATABASE\(\)\b", r"\bSCHEMA\(\)\b", r"\bTABLE_NAME\b",
    r"\bCOLUMN_NAME\b", r"\bCURRENT_TIMESTAMP\b", r"\bCURRENT_DATE\b", r"\bCURRENT_TIME\b",
    r"\bSESSION_ID\b", r"\bWAITFOR\s+TIME\b", r"\bEXEC\s+sp_executesql\b", r"\bEXEC\s+sp_sqlexec\b",
    r"\bSYSOBJECTS\b", r"\bSYSCOLUMNS\b", r"\bPG_SLEEP\b", r"\bEXTRACTVALUE\b", r"\bUPDATEXML\b",
    r"\bLTRIM\b", r"\bRTRIM\b", r"\bUPPER\b", r"\bLOWER\b", r"\bSYSADMIN\b", r"\bEXEC\s+MASTER\.DBO\.XP_CMDSHELL\b",
    r"\bINFORMATION_SCHEMA\.TABLES\b", r"\bINFORMATION_SCHEMA\.COLUMNS\b",
    r"--", r";", r"'", r"\"", r"/\*", r"\*/", r"\(", r"\)", r"\{", r"\}", r"\[", r"\]", r"<", r">",
    r"\|\|", r"\|", r"\^", r"\\", r"\@", r"\#", r"\%", r"\!", r"\$", r"\+", r"-", r"/", r"\*",
    r"\bOR\b", r"\bAND\b", r"\bLIKE\b", r"\bSLEEP\b", r"\bREGEXP\b", r"\bRLIKE\b",
    r"ORDER\s+BY\s+1--", r"UNION\s+SELECT", r"DROP\s+TABLE", r"ALTER\s+TABLE", r"INTO\s+OUTFILE",
    r"\b\d+\s*(=|!=|<|>|<=|>=)\s*\d+\b",
    r"\b\d+\s*[\+\-\*/%]\s*\d+\b",
    r"(\|\|)", r"\bCONCAT\s*\(", r"\bCHAR\s*\(", r"\bASCII\s*\(", r"\bUNHEX\s*\(",
    r"\b0x[0-9A-Fa-f]+\b",
    r"\b\d+\s*(&|\||\^)\s*\d+\b",
    r"--", r"#", r"/\*", r"\*/",
    r"\)\s*AND\s*\(",
    r"\bAND\b.*&.*\b",
    r"::", r"\@\@", r"\bAS\s+\w+",
    r"-\d+'?",
    r"^-?\d+'?",
]

sql_injection_regex = re.compile("|".join(sql_injection_patterns), re.IGNORECASE)

def count_sql_keywords(body: str) -> int:
    if not body:
        return 0

    matches = sql_injection_regex.findall(body)
    return len(matches)

In [157]:
def number_of_special_chars(request: str) -> int:
    if not isinstance(request, str) or not request.strip():
        return 0
    special_chars_pattern = r"[!@#$%^&*()_+\-=\[\]{};:'\"\\|,.<>?/`~]"
    special_chars = re.findall(special_chars_pattern, request)    
    return len(special_chars)

In [158]:
from typing import Union, Dict

def url_length(request: Union[Dict, str]) -> int:
    if isinstance(request, str):
        return len(request)
    elif isinstance(request, dict):
        url = request.get('url', '')
        return len(url)
    else:
        return 0

In [159]:
def process_row(row):
    if row['method'] == 'GET':
        if pd.isna(row['url']):
            row['url'] = None
    else:
        if pd.isna(row['body']):
            row['body'] = None
    return row

In [160]:
def count_query_params(url: str) -> int:
    parsed_url = urlparse(url)
    query_params = parse_qs(parsed_url.query)
    return len(query_params)

In [161]:
def calculate_query_param_length(url: str) -> int:
    parsed_url = urlparse(url)
    query_string = parsed_url.query
    return len(query_string)

In [162]:
def calculate_path_depth(url: str) -> int:
    parsed_url = urlparse(url)
    path_segments = parsed_url.path.strip("/").split("/")
    return len(path_segments)

In [194]:
request_test = {
    "request": {
        "url": "http://testphp.vulnweb.com/login.php",
        "method": "POST",
        "headers": {
            "Content-Length": "77",
            "Host": "testphp.vulnweb.com",
            "Cache-Control": "max-age=0",
            "Origin": "http://testphp.vulnweb.com",
            "Content-Type": "application/x-www-form-urlencoded",
            "Upgrade-Insecure-Requests": "1",
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8",
            "Sec-GPC": "1",
            "Accept-Language": "en-US,en;q=0.5",
            "Referer": "http://testphp.vulnweb.com/login.php",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "close"
        },
        "body": "uname=admin' UNION/*+*/SELECT/**/1,0x73656C656374,3--&pass=123"
    }
}

In [195]:
url = request_test["request"]["url"]
method = request_test["request"]["method"]
headers = request_test["request"]["headers"]
body = request_test["request"]["body"]

features = {
    "method": 0 if method == "GET" else 1,
    "entropy": calculate_entropy(body) if method == "POST" else calculate_entropy(url),
    "packet_length": packet_length(body),
    "sqli_keywords": count_sql_keywords(body) if method == "POST" else count_sql_keywords(url),
    "number_of_special_chars": number_of_special_chars(body) if method == "POST" else number_of_special_chars(url),
    "url_length": url_length(url),
    "query_param_count": count_query_params(url),
    "query_param_length": calculate_query_param_length(url),
    "path_depth": calculate_path_depth(url),
}
features_df = pd.DataFrame([features])

In [196]:
prediction = model.predict(features_df)
label = "SQL Injection" if prediction[0] == 1 else "Legitimate"

print(f"Prediction: {label}")

Prediction: SQL Injection
