In [1]:
#加载辅助函数、表
sql_dbname_words = [
    'schema', 'user', 'usr', 'pass', 'pwd', 
    'pswd', 'table', 'column', 'chr', 'char', 
    'dbms', 'type'
]

sql_query_words = [
    'select', 'from', 'where', 'all', 'exists',
    'and', 'or', 'not', 'join', 'using', 'asc', 'order',
    'null', 'between',  'except', 'delay', 'all'
    'union', 'create', 'insert', 'delete', 'update', 'alter', 
    'truncate', 'set', 'add', 'waitfor', 'then', 'else',
    'derp', 'herp'
]

sql_function_words = [
    'avg', 'min', 'max', 'sum', 'count', 'distinct', 'version',
    'group', 'having', 'sleep'
    'binary', 'raw', 'hex', 'quote'
]


# 特征提取函数
import numpy as np

def length_of(payload):
    return len(payload)


def sql_dbname_score(payload):
    payload = payload.lower()
    score = 0
    step = 0
    
    for word in sql_dbname_words:
        if word in payload:
            score += np.exp2(step)
            step += 1
    return score


def sql_query_score(payload):
    payload = payload.lower()
    score = 0
    step = 0
    
    for word in sql_query_words:
        if word in payload:
            score += np.exp2(step)
#             score += 1
            step += 1
    return score
    

def sql_function_score(payload):
    payload = payload.lower()
    score = 0
    step = 0
    
    for word in sql_function_words:
        if word in payload:
            score += np.exp2(step)
            step += 1
    return score


# 是否被注释符截断
def is_truncated(payload):
    from urllib import parse
    end = payload.lower()[-10:] # 切末尾十个字符检查切片
    for char in end:
        if parse.quote(char) in ('%00', '%23', parse.quote('--+'), parse.quote('--')):
            return 1
    return 0

def count_capital_case(payload):
    count = 0
    for char in payload:
        if char >= 'A' and char <= 'Z':
            count += 1
    return count
    
    
    
def get_feature_vec(payload):
    feature_vec = []
    feature_vec.append(length_of(payload))
    feature_vec.append(sql_dbname_score(payload))
    feature_vec.append(sql_query_score(payload))
    feature_vec.append(sql_function_score(payload))
    feature_vec.append(is_truncated(payload))
    feature_vec.append(count_capital_case(payload))
    return feature_vec

In [2]:
# 数据载入及清洗
import pandas as pd
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from urllib import parse

normal_df = pd.read_csv('normal.csv', header=None, names=['payload'], dtype=str)
normal_df['label'] = np.zeros(len(normal_df['payload']), dtype=int)

sqli_df = pd.read_csv('sqli.csv', header=None, names=['payload'], dtype=str)
sqli_df['label'] = np.ones(len(sqli_df['payload']), dtype=int)

payloads_df = shuffle(pd.concat([normal_df, sqli_df]))
payloads_df = payloads_df.dropna(axis=0)
# payloads_df.drop(payloads_df[np.isnan(payloads_df['payload'])].index, inplace=True)
# print(payloads_df)

payloads = np.array(payloads_df['payload'].values)
labels = np.array(payloads_df['label'].values)
# print(payloads)
# print(labels)

features = []
for payload in payloads:
    if type(payload) is not str:
        continue
#     print(parse.unquote(payload))
    features.append(get_feature_vec(parse.unquote(payload)))

features = np.array(features)
# print(features[:10])
# print(labels[:10])


In [3]:
# 训测数据集划分及数据预处理
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, random_state=None, test_size = 0.3, stratify=labels)
scaler = StandardScaler()
X_train_std = scaler.fit_transform(X_train)
X_test_std = scaler.transform(X_test)
print(X_test_std[:5])

[[-0.40922527 -0.47184717 -0.24133629 -0.28233137 -0.09552562 -0.43146416]
 [ 0.30832515 -0.47184717 -0.24133629 -0.28233137 -0.09552562 -0.65270789]
 [-0.96369604 -0.47184717 -0.49476121 -0.28233137 -0.09552562 -0.70801882]
 [-0.83323233 -0.47184717 -0.41028624 -0.28233137 -0.09552562 -0.45911963]
 [-0.72723056 -0.47184717 -0.49476121 -0.28233137 -0.09552562 -0.68036335]]


In [18]:
# 模型训练
from sklearn.tree import DecisionTreeClassifier
from sklearn import svm

dtree = DecisionTreeClassifier(random_state=None).fit(X_train_std, y_train)
print('训练完成')

训练完成


In [19]:
# 模型评估报告
from sklearn import metrics
predict_target = dtree.predict(X_test_std)
print(metrics.classification_report(y_test, predict_target, 
                                    target_names=['Normal payload', 'SQLi payload']))


                precision    recall  f1-score   support

Normal payload       0.96      0.98      0.97      1500
  SQLi payload       0.98      0.96      0.97      1487

      accuracy                           0.97      2987
     macro avg       0.97      0.97      0.97      2987
  weighted avg       0.97      0.97      0.97      2987



## 下面采用管道的形式对模型进行训练以及评估，使用了K折交叉验证

In [20]:
from sklearn import preprocessing
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import make_pipeline
from sklearn.tree import DecisionTreeClassifier
from sklearn import metrics

clf_pipline = make_pipeline(preprocessing.StandardScaler(),
                           DecisionTreeClassifier(random_state=None))
scores_pipeline_cv = cross_val_score(clf_pipline,X_train_std, y_train, 
                                     cv=10)
for ACC in scores_pipeline_cv:
    print('Accuracy: %.2f' % ACC)
print("Mean Accuracy: %0.2f (+/- %0.2f)" % (scores_pipeline_cv.mean(), 
                                       scores_pipeline_cv.std() * 2))


Accuracy: 0.98
Accuracy: 0.97
Accuracy: 0.97
Accuracy: 0.96
Accuracy: 0.97
Accuracy: 0.97
Accuracy: 0.97
Accuracy: 0.97
Accuracy: 0.96
Accuracy: 0.98
Mean Accuracy: 0.97 (+/- 0.01)


In [27]:
# 测试用例
payload1 = 'id=196112 union select TABLE_NAME from INFORMATION_SCHEMA.TABLES where TABLE_SCHEMA like "security" '
payload2 = 'name=admin&&pass=p@sSw0RD'
feature_vec = [get_feature_vec(payload2)]
conclusion = dtree.predict(feature_vec)[0]
print('SQLi payload') if conclusion == 1 else print('Normal payload')

Normal payload
