In [None]:
import numpy as np
import pandas as pd
import urllib
import os
from bs4 import BeautifulSoup
import re
import pickle
import nltk
import string
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import seaborn as sns

from nltk.tokenize import RegexpTokenizer
from nltk.stem import SnowballStemmer
from nltk.corpus import stopwords
from collections import Counter
from PIL import Image
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler

In [None]:
def loadFile(name):
    directory = str(os.getcwd())
    filepath = os.path.join(directory, name)
    with open(filepath,'r') as f:
        data = f.readlines()
    data = list(set(data))
    result = []
    for d in data:
        d = str(urllib.parse.unquote(d))
        result.append(d)
    return result

In [None]:
badQueries = loadFile("badqueries.txt")
goodQueries = loadFile("goodqueries.txt")

In [None]:
bad_df = pd.DataFrame(badQueries)
good_df = pd.DataFrame(goodQueries)

In [None]:
bad_df.columns = ["query"]
good_df.columns = ["query"]

In [None]:
bad_df.head()

In [None]:
good_df.head()

In [None]:
text = good_df.loc[0].str
text = text[:-2]
text

In [None]:
def clean_newline(column):
    column[:-2]
    return column[:-2]

In [None]:
good_df["query"] = good_df["query"].apply(clean_newline)

In [None]:
good_df.head()

In [None]:
bad_df["query"] = bad_df["query"].apply(clean_newline)

In [None]:
bad_df.head()

In [None]:
good_df["label"] = 0
bad_df["label"] = 1

In [None]:
good_df.shape[0] + bad_df.shape[0]

In [None]:
df = pd.concat([good_df, bad_df], axis=0)

In [None]:
df.shape[0]

In [None]:
df.head()

In [None]:
df = df.sample(frac=1).reset_index(drop=True)

In [None]:
df.head()

In [None]:
df["label"].value_counts().plot(kind="pie", autopct="%.2f%%").set_title("MALICIOUS - BENIGN")

In [None]:
def clear_first_char(column):
    if column.startswith("/"):
        return column[1:]
    else:
        return column

In [None]:
df["query"] = df["query"].apply(clear_first_char)

In [None]:
df.head()

# Preprocess

In [None]:
def xss_check(input_string):
    xss_pattern = re.compile(r'(<|>|&lt;|&gt;|script|alert|document\.|onload\=|onerror\=|eval\(|expression\(|prompt\(|confirm\()')
    if xss_pattern.search(input_string.split("/")[-1]):
        return 1
    else:
        return 0

In [None]:
df["is_xss"] = df["query"].apply(xss_check)

In [None]:
df[df["is_xss"] == 1].head()

In [None]:
def lfi_check(input_string):
    lfi_pattern = re.compile(r'(file\:\/\/|(\.\.\/)|(\.\.\\))')
    if "=" in input_string.split("/")[-1]:
        if lfi_pattern.search(input_string.split("/")[-1].split("=", 1)[1]):
            return 1
        else:
            return 0
    elif lfi_pattern.search(input_string.split("/")[-1]):
        return 1
    else:
        return 0

In [None]:
"scripts/misc/audio.php?recording=../version.in".split("=", 1)

In [None]:
df["is_lfi"] = df["query"].apply(lfi_check)

In [None]:
df[df["is_lfi"] == 1].sample(5)

In [None]:
def command_injection_check(input_string):
    cmd_injection_pattern = re.compile(r'(;|\||`|\$\(|\$\{)')

    if cmd_injection_pattern.search(input_string):
        return 1
    else:
        return 0

In [None]:
df["is_oci"] = df["query"].apply(command_injection_check)

In [None]:
df[df["is_oci"] == 1].sample(5)

In [None]:
def sql_injection_check(input_string):
    sql_injection_pattern = re.compile(r'(\b(SELECT|INSERT|UPDATE|DELETE|FROM|WHERE|AND|OR|UNION|ALL|EXEC|EXECUTE|DECLARE|CAST)\b)')

    if sql_injection_pattern.search(input_string):
        return 1
    else:
        return 0

In [None]:
df["is_sqli"] = df["query"].apply(sql_injection_check)

In [None]:
df[df["is_sqli"] == 1].sample(5)

In [None]:
def urllen(url):
    return len(url)

In [None]:
df["url_len"] = df["query"].apply(urllen)

In [None]:
df.head()

In [None]:
def delimitercount(url):
    return url.count(";") + url.count("_") + url.count("?") + url.count("=") + url.count("&") + url.count("|")

In [None]:
df["delim_count"] = df["query"].apply(delimitercount)

In [None]:
df.head()

In [None]:
def dotcount(url):
    return url.count(".")

In [None]:
df["dot_count"] = df["query"].apply(dotcount)

In [None]:
df.head()

In [None]:
def atcount(url):
    return url.count("@")

In [None]:
df["at_count"] = df["query"].apply(atcount)

In [None]:
df[df["at_count"] == 1].head()

In [None]:
df.sample(5)

In [None]:
def subdircount(url):
    return url.count("/")

In [None]:
df["subdir_count"] = df["query"].apply(subdircount)

In [None]:
df.head()

In [None]:
def query_count(query):
    if "=" in query:
        return len(query.split("=")[1])
    else:
        return 0

In [None]:
df["query_len"] = df["query"].apply(query_count)

In [None]:
df[df["query_len"] != 0].head()

In [None]:
def total_digits_in_url(url):
    total_digits = 0
    for text in list(map(str, "0123456789")):
        total_digits += url.lower().count(text)
        
    return total_digits

In [None]:
df["total_digits_url"] = df["query"].apply(total_digits_in_url)

In [None]:
df.sample(5)

In [None]:
def total_letter_in_url(url):
    total_letter = 0
    for text in url:
        if text not in "0123456789":
            if text not in string.punctuation:
                total_letter += 1
        
    return total_letter

In [None]:
df["total_letter_url"] = df["query"].apply(total_letter_in_url)

In [None]:
string.punctuation

In [None]:
text = "adas!!1231.@"
total_letter_in_url(text)

In [None]:
url = "/text/shell.php?acas"
url.split("/")[-1]

In [None]:
tokenizer = RegexpTokenizer(r"[A-Za-z]+")

In [None]:
df["url_tokenized"] = df["query"].apply(lambda x: tokenizer.tokenize(x))

In [None]:
df.sample(5)

In [None]:
stemmer = SnowballStemmer("english")

In [None]:
def stem_url(column):
    words = [stemmer.stem(word) for word in column]
    return " ".join(words)

In [None]:
df["url_stemmed"] = df["url_tokenized"].apply(stem_url)

In [None]:
df.sample(5)

In [None]:
cn_phishing = Counter()
cn_safe = Counter()

for text, phishing in zip(df.url_stemmed.values, df.label.values):
    for word in text.split():
        if len(word) >= 4:
            if phishing == 1:
                cn_phishing[word] += 1
            else:
                cn_safe[word] += 1
            
phishing_most_common_words = cn_phishing.most_common(30)
safe_most_common_words = cn_safe.most_common(30)

In [None]:
def word_freq(freq_top, LABEL):
    words = [word for word, _ in freq_top]
    counts = [count for _, count in freq_top]
    
    plt.figure(figsize=(20, 5))
    ax = sns.barplot(x=words, y=counts)
    plt.title(f"TOP 20 WORDS IN - {'BENIGN' if LABEL == 0 else 'MALICIOUS'}")
    plt.ylabel("Frequency")
    plt.xlabel("Words")
    plt.show()
    
    return freq_top

In [None]:
benign_top = word_freq(safe_most_common_words, 0)

In [None]:
malicious_top = word_freq(phishing_most_common_words, 1)

In [None]:
def print_wordcloud(dict_top, LABEL, mask):
    dict_top = dict(dict_top)
    wordcloud = WordCloud(width=350, height=350, mask=np.array(Image.open(mask)), background_color="white").generate_from_frequencies(dict_top)
    
    plt.figure()
    plt.imshow(wordcloud)
    plt.axis("off")
    plt.tight_layout(pad=0)
    plt.title(f"TOP 20 WORDS IN - {'BENIGN' if LABEL == 0 else 'MALICIOUS'}")
    plt.show()

In [None]:
print_wordcloud(benign_top, 0, mask="../masks-wordclouds/user.png")

In [None]:
print_wordcloud(malicious_top, 1, mask="../masks-wordclouds/star.png")

In [None]:
df.sample(10)

In [None]:
def total_digits_domain(url):
    if "/" in url:
        path = url.split("/")[-1]
        dom = url.replace(path, "")
        return total_digits_in_url(dom)
    else:
        return 0

In [None]:
df["total_digits_domain"] = df["query"].apply(total_digits_domain)

In [None]:
def total_letter_domain(url):
    if "/" in url:
        path = url.split("/")[-1]
        dom = url.replace(path, "")
        return total_letter_in_url(dom)
    else:
        return 0

In [None]:
df["total_letter_domain"] = df["query"].apply(total_letter_domain)

In [None]:
def total_digits_path(url):
    if "/" in url:
        path = url.split("/")[-1]
        return total_digits_in_url(path)
    else:
        return total_digits_in_url(url)

In [None]:
df["total_digits_path"] = df["query"].apply(total_digits_path)

In [None]:
def total_letter_path(url):
    if "/" in url:
        path = url.split("/")[-1]
        return total_letter_in_url(path)
    else:
        return total_letter_in_url(url)

In [None]:
df["total_letter_path"] = df["query"].apply(total_letter_path)

In [None]:
df.sample(10)

In [None]:
def get_histplot_central_tendency(df, field, target):
    v_dist_1 = df[field].values
    plt.figure()
    sns.histplot(v_dist_1, kde=True)

    mean = df[field].mean()
    median = df[field].median()
    mode = df[field].mode()

    plt.axvline(mean, color="r", linestyle="--", label="Mean")
    plt.axvline(median, color="g", linestyle="-", label="Median")
    plt.axvline(median, color="b", linestyle=":", label="Mode")
    plt.xlabel(f"count", fontsize=13, color="#333F4B")
    plt.ylabel(f"{field}", fontsize=13, color="#333F4B")
    plt.legend()
    plt.grid(False)
    plt.title(f"Representation Histogram for {field} - {target}", fontsize=18)
    plt.plot(color="white", lw=3)
    plt.show()

In [None]:
def has_extension(column):
    if "." in column.split("/")[-1]:
        return 1
    else:
        return 0

In [None]:
df["has_extension"] = df["query"].apply(has_extension)

In [None]:
df[df["has_extension"] == 1].sample(5)

In [None]:
def find_extension(column):
    text = column.split("/")[-1]
    if "." in text:
        if "?" in text:
            return text.split("?", 1)[0]
        else:
            return text.split(".", 1)[1]
    else:
        return ""

In [None]:
df["extension"] = df["query"].apply(find_extension)

In [None]:
df[df["has_extension"] == 1].sample(5)

In [None]:
cn_phishing_extension = Counter()
cn_safe_extension = Counter()

for text, phishing in zip(df.extension.values, df.label.values):
    for word in text.split():
        if len(word) != "":
            if phishing == 1:
                cn_phishing_extension[word] += 1
            else:
                cn_safe_extension[word] += 1
            
phishing_most_common_extensions = cn_phishing_extension.most_common(30)
safe_most_common_extensions = cn_safe_extension.most_common(30)

In [None]:
phishing_extension_top = word_freq(phishing_most_common_extensions, 1)

In [None]:
safe_extension_top = word_freq(safe_most_common_extensions, 0)

In [None]:
def has_parameter(column):
    if "?" in column.split("/")[-1]:
        return 1
    else:
        return 0

In [None]:
df["has_parameter"] = df["query"].apply(has_parameter)

In [None]:
df[df["has_parameter"] == 1].sample(5)

In [None]:
df[df["has_parameter"] == 1].sample(1).T

In [None]:
df.columns

In [None]:
features = ['is_xss', 'is_lfi', 'is_oci', 'is_sqli', 'url_len',
       'delim_count', 'dot_count', 'at_count', 'subdir_count', 'query_len',
       'total_digits_url', 'total_letter_url', 'total_digits_domain', 
       'total_letter_domain', 'total_digits_path',
       'total_letter_path', 'has_extension', 'has_parameter']

In [None]:
for feature in features:
    get_histplot_central_tendency(df[df["label"] == 0], feature, "Good URLs")

In [None]:
for feature in features:
    get_histplot_central_tendency(df[df["label"] == 1], feature, "Bad URLs")

In [None]:
df.head()

# Model

In [None]:
X = df.drop(["query", "label", "url_tokenized", "url_stemmed", "extension"], axis=1)
y = df["label"]

In [None]:
ss = StandardScaler()
X_scaled = ss.fit_transform(X)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

In [None]:
logreg = LogisticRegression()
logreg.fit(X_train, y_train)

In [None]:
pred_test = logreg.predict(X_test)

In [None]:
pred_train = logreg.predict(X_train)

In [None]:
train_score = accuracy_score(y_train, pred_train)
print("Train Score: ", train_score)

In [None]:
test_score = accuracy_score(y_test, pred_test)
print("Test Score:", test_score)

In [None]:
pickle.dump(logreg, open("waf/website/models/logreg.pkl", "wb"))
pickle.dump(ss, open("waf/website/models/logreg_ss.pkl", "wb"))

In [None]:
cm = confusion_matrix(pred_test, y_test)
plt.figure()
sns.heatmap(cm, annot=True, cmap="Pastel1", fmt=".0f")
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.show()

In [None]:
df.columns

# Test

In [None]:
columns = ['query', 'is_xss', 'is_lfi', 'is_oci', 'is_sqli', 'url_len',
       'delim_count', 'dot_count', 'at_count', 'subdir_count', 'query_len',
       'total_digits_url', 'total_letter_url', 'url_tokenized', 'url_stemmed',
       'total_digits_domain', 'total_letter_domain', 'total_digits_path',
       'total_letter_path', 'has_extension', 'extension', 'has_parameter']

In [None]:
test1 = 'hrttz9fj.dll?<script>document.cookie="testtbjy=7334;"</script>' # 1
test2 = 'index.php?option=com_mailto&tmpl=component&link=aHR0cDovL2FkdmVudHVyZ' # 1
test3 = 'nba/player/_/id/3457/brandon-rush' # 0
test4 = '?q=anthony-hamilton-soulife' # 0
test5 = 'site/relationship_detail.php?name=Martin-Brodeur&celebid=12150&relid=11371' # 0

In [None]:
test_df = pd.DataFrame(columns=columns)

In [None]:
test_df.loc[0, "query"] = test1
test_df.loc[1, "query"] = test2
test_df.loc[2, "query"] = test3
test_df.loc[3, "query"] = test4
test_df.loc[4, "query"] = test5

In [None]:
test_df.head()

In [None]:
stemmer = SnowballStemmer("english")
tokenizer = RegexpTokenizer(r"[A-Za-z]+")

In [None]:
def preprocess(test_df):
    stemmer = SnowballStemmer("english")
    tokenizer = RegexpTokenizer(r"[A-Za-z]+")
    test_df["is_xss"] = test_df["query"].apply(xss_check)
    test_df["is_lfi"] = test_df["query"].apply(lfi_check)
    test_df["is_oci"] = test_df["query"].apply(command_injection_check)
    test_df["is_sqli"] = test_df["query"].apply(sql_injection_check)
    test_df["url_len"] = test_df["query"].apply(urllen)
    test_df["delim_count"] = test_df["query"].apply(delimitercount)
    test_df["dot_count"] = test_df["query"].apply(dotcount)
    test_df["at_count"] = test_df["query"].apply(atcount)
    test_df["subdir_count"] = test_df["query"].apply(subdircount)
    test_df["query_len"] = test_df["query"].apply(query_count)
    test_df["total_digits_url"] = test_df["query"].apply(total_digits_in_url)
    test_df["total_letter_url"] = test_df["query"].apply(total_letter_in_url)
    test_df["url_tokenized"] = test_df["query"].apply(lambda x: tokenizer.tokenize(x))
    test_df["url_stemmed"] = test_df["url_tokenized"].apply(stem_url)
    test_df["total_digits_domain"] = test_df["query"].apply(total_digits_domain)
    test_df["total_letter_domain"] = test_df["query"].apply(total_letter_domain)
    test_df["total_digits_path"] = test_df["query"].apply(total_digits_path)
    test_df["total_letter_path"] = test_df["query"].apply(total_letter_path)
    test_df["has_extension"] = test_df["query"].apply(has_extension)
    test_df["extension"] = test_df["query"].apply(find_extension)
    test_df["has_parameter"] = test_df["query"].apply(has_parameter)
    return test_df

In [None]:
test_df = preprocess(test_df)

In [None]:
test_df.head()

In [None]:
dropped = test_df.drop(["query", "url_tokenized", "url_stemmed", "extension"], axis=1)

In [None]:
dropped_scaled = ss.transform(dropped)

In [None]:
logreg.predict(dropped_scaled)

In [None]:
log = pd.