Importujemy biblioteki i wczytujemy pliki CSV

In [1]:
from datetime import datetime

from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import re
import pickle

DIRECTORY = 'data'
# DIRECTORY = 'generated_data'

spark = SparkSession.builder.appName("Preprocess data").getOrCreate()
ddos_tf_df = spark.read.format("csv").option("header", "true").load(DIRECTORY + "/ddos-tcp-syn-flood.csv")
normal_tf_df = spark.read.format("csv").option("header", "true").load(DIRECTORY + "/normal-traffic.csv")
port_scan_tf_df = spark.read.format("csv").option("header", "true").load(DIRECTORY+ "/port-scanning.csv")

data_frames = {
    "normal-traffic": normal_tf_df,
    "port-scanning": port_scan_tf_df,
    "ddos-tcp-syn-flood": ddos_tf_df,
}

24/06/04 00:03:28 WARN Utils: Your hostname, HP-Envy-Debian resolves to a loopback address: 127.0.1.1; using 172.17.0.1 instead (on interface docker0)
24/06/04 00:03:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/04 00:03:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Wybieramy sobie kolumny zawierające istotne iformacje. Można dodać więcej ale wtedy trzeba pamiętać o noramlizacji w kolejnej komórce.

In [2]:
selected_columns = [
    "frame-time",
    "arp-opcode",
    "arp-hw-size",
    "ip-src_host",
    "ip-dst_host",
    "tcp-ack",
    "tcp-ack_raw",
    "tcp-connection-fin",
    "tcp-connection-rst",
    "tcp-connection-syn",
    "tcp-connection-synack",
    "tcp-dstport",
    "tcp-flags_index",
    "tcp-flags-ack",
    "tcp-len",
    "tcp-seq",
    "tcp-srcport",
    "udp-port",
    "udp-stream",
    "udp-time_delta",
    "dns-qry-name",
    "dns-qry-name-len_index",
    "dns-qry-qu_index",
    "dns-qry-type",
    "dns-retransmission",
    "dns-retransmit_request",
    "dns-retransmit_request_in",
    "mqtt-conack-flags_index",
    "mqtt-conflag-cleansess",
    "mqtt-conflags_index",
    "mqtt-hdrflags_index",
    "mqtt-len",
    "mqtt-msg_index",
    "mqtt-msgtype",
    "mqtt-proto_len",
    "mqtt-protoname_index",
    "mqtt-topic_index",
    "mqtt-topic_len",
    "mqtt-ver",
    "Attack_type"
]

Iterujemy po wczytanych ramkach, zamieniamy nazwy kolumn na takie bez kropek i normalizujemy/kodujemy nieliczbowe kolumny (oprócz timestampów, ta kolumna jest modyfikowana później). Odchudzone dane zapisujemy do katalogu `preprocessed_data`

In [3]:
def timestamp_to_epoch(timestamp):
    dt = datetime.fromisoformat(timestamp)
    return dt.timestamp()

In [4]:
for df_name, df in data_frames.items():

    for col_name in df.columns:
        new_col_name = re.sub(r'\.', '-', col_name)
        df = df.withColumnRenamed(col_name, new_col_name)
    
    tcp_flags_indexer = StringIndexer(inputCol="tcp-flags", outputCol="tcp-flags_index")
    indexed_df = tcp_flags_indexer.fit(df).transform(df)

    dns_qry_name_len_indexer = StringIndexer(inputCol="dns-qry-name-len", outputCol="dns-qry-name-len_index")
    indexed_df = dns_qry_name_len_indexer.fit(indexed_df).transform(indexed_df)
    dns_qry_qu_indexer = StringIndexer(inputCol="dns-qry-qu", outputCol="dns-qry-qu_index")
    indexed_df = dns_qry_qu_indexer.fit(indexed_df).transform(indexed_df)

    mqtt_conack_flags_indexer = StringIndexer(inputCol="mqtt-conack-flags", outputCol="mqtt-conack-flags_index")
    indexed_df = mqtt_conack_flags_indexer.fit(indexed_df).transform(indexed_df)
    mqtt_conflags = StringIndexer(inputCol="mqtt-conflags", outputCol="mqtt-conflags_index")
    indexed_df = mqtt_conflags.fit(indexed_df).transform(indexed_df)

    mqtt_hdrflags = StringIndexer(inputCol="mqtt-hdrflags", outputCol="mqtt-hdrflags_index")
    indexed_df = mqtt_hdrflags.fit(indexed_df).transform(indexed_df)
    mqtt_msg = StringIndexer(inputCol="mqtt-msg", outputCol="mqtt-msg_index")
    indexed_df = mqtt_msg.fit(indexed_df).transform(indexed_df)

    mqtt_protoname = StringIndexer(inputCol="mqtt-protoname", outputCol="mqtt-protoname_index")
    indexed_df = mqtt_protoname.fit(indexed_df).transform(indexed_df)

    mqtt_topic = StringIndexer(inputCol="mqtt-topic", outputCol="mqtt-topic_index")
    indexed_df = mqtt_topic.fit(indexed_df).transform(indexed_df)

    pandas_df = indexed_df.select(selected_columns).toPandas()

    all_ips = pd.concat([pandas_df["ip-src_host"], pandas_df["ip-dst_host"]]).unique()
    label_encoder = LabelEncoder()
    label_encoder.fit(all_ips)
    pandas_df["ip-src_host"] = label_encoder.transform(pandas_df["ip-src_host"])
    pandas_df["ip-dst_host"] = label_encoder.transform(pandas_df["ip-dst_host"])
    pandas_df['frame-time'] = pandas_df['frame-time'].apply(timestamp_to_epoch)

    pandas_df.to_csv(f'{DIRECTORY}/{df_name}-preprocessed.csv', index=False)

24/06/04 00:03:45 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/06/04 00:03:59 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Wczytujemy zapisane pliki csv i tworzymy próbki z danymi, gdzie jedna próbka X to lista zawierająca kolejne 32 logi gdzie od każdego timestampa został odjęty timestamp pierwszego loga z listy (w ten sposób timestampy są niewielkimi wartościami liczbowymi a jednocześnie przechowują informację o odległości pomiędzy kolejnymi logami), a próbka Y to pojedynczy numer określający typ ataku/ruchu normalnego dla zagregowanych logów.

In [5]:
def logs_to_series(df, logs_per_bucket):
    del df['Attack_type']
    buckets = []

    for i in range(0, df.shape[0], logs_per_bucket):
        if df.shape[0] >= i + logs_per_bucket:
            bucket = df.iloc[i:i + logs_per_bucket]
            bucket['frame-time'] = bucket['frame-time'] - bucket['frame-time'].iloc[0]
            buckets.append(bucket)

    return buckets

In [6]:
encoded_attacks = {
    "normal-traffic": 0,
    "port-scanning": 1,
    "ddos-tcp-syn-flood": 2
}
x_data = []
y_data = []
DIRECTORY = 'generated_data'
for df_name in data_frames.keys():
    df = pd.read_csv(f'{DIRECTORY}/{df_name}-preprocessed.csv', parse_dates=['frame-time'])
    log_series = logs_to_series(df, 32)
    x_data.extend(log_series)
    y_data.extend([encoded_attacks.get(df_name)] * len(log_series))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  bucket['frame-time'] = bucket['frame-time'] - bucket['frame-time'].iloc[0]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  bucket['frame-time'] = bucket['frame-time'] - bucket['frame-time'].iloc[0]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  bucket['frame-time'] = bucket['frame-time'] - bucket['f

Przed treningem modeli należy jeszcze pomieszać próbki z danymi oraz podzielić na zbiory treningowe i testowe. W sumie dobrze by też było dodać jakiś padding dla przypadków gdzie jednak próbka nie ma 32 logów.

In [7]:
output_path = f'{DIRECTORY}/processed_data.pkl'
print("Writing log series into ", output_path)
with open(output_path, 'wb') as f:
    pickle.dump((x_data, y_data), f)

Writing log series into  generated_data/processed_data.pkl
