# Untersuchung der Datensätze

In [2]:
import os
import pandas as pd
import numpy as np
import logging
import multiprocessing as mp
import dask.dataframe as dd
from dask.distributed import Client

In [None]:
# Maximale Zeilen und Spalten anzeigen
pd.set_option('display.max_rows', None)  # Zeilen
pd.set_option('display.max_columns', None)  # Spalten

# Logging Parameter
logging.basicConfig(
    #filename='20_investigation.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Verzeichnis der CSV-Dateien
verzeichnis_ids17 = '../01_Datensaetze/improved_cic-ids-2017'
verzeichnis_ids18 = '../01_Datensaetze/improved_cse-cic-ids-2018'

# Dask Client starten
client = Client()  # Dask Client starten
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 7
Total threads: 28,Total memory: 39.17 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36115,Workers: 7
Dashboard: http://127.0.0.1:8787/status,Total threads: 28
Started: Just now,Total memory: 39.17 GiB

0,1
Comm: tcp://127.0.0.1:43335,Total threads: 4
Dashboard: http://127.0.0.1:35113/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:41551,
Local directory: /tmp/dask-scratch-space/worker-ct2tu4y1,Local directory: /tmp/dask-scratch-space/worker-ct2tu4y1

0,1
Comm: tcp://127.0.0.1:43039,Total threads: 4
Dashboard: http://127.0.0.1:40085/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:34845,
Local directory: /tmp/dask-scratch-space/worker-5qawb05p,Local directory: /tmp/dask-scratch-space/worker-5qawb05p

0,1
Comm: tcp://127.0.0.1:40561,Total threads: 4
Dashboard: http://127.0.0.1:45491/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:43075,
Local directory: /tmp/dask-scratch-space/worker-wp_do3z3,Local directory: /tmp/dask-scratch-space/worker-wp_do3z3

0,1
Comm: tcp://127.0.0.1:41335,Total threads: 4
Dashboard: http://127.0.0.1:45545/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:36657,
Local directory: /tmp/dask-scratch-space/worker-s72b5aqp,Local directory: /tmp/dask-scratch-space/worker-s72b5aqp

0,1
Comm: tcp://127.0.0.1:42955,Total threads: 4
Dashboard: http://127.0.0.1:45595/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:43037,
Local directory: /tmp/dask-scratch-space/worker-wt199cnf,Local directory: /tmp/dask-scratch-space/worker-wt199cnf

0,1
Comm: tcp://127.0.0.1:43245,Total threads: 4
Dashboard: http://127.0.0.1:44505/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:40207,
Local directory: /tmp/dask-scratch-space/worker-90php22c,Local directory: /tmp/dask-scratch-space/worker-90php22c

0,1
Comm: tcp://127.0.0.1:39479,Total threads: 4
Dashboard: http://127.0.0.1:33379/status,Memory: 5.60 GiB
Nanny: tcp://127.0.0.1:36481,
Local directory: /tmp/dask-scratch-space/worker-25f0kgow,Local directory: /tmp/dask-scratch-space/worker-25f0kgow


## Entpacken der Datensätze

In [None]:
# ids17
!rm -r ../01_Datensaetze/improved_cic-ids-2017/*.csv
!unzip -o ../01_Datensaetze/improved_cic-ids-2017/CICIDS2017_improved.zip -d ../01_Datensaetze/improved_cic-ids-2017/

In [None]:
# ids18
!rm -r ../01_Datensaetze/improved_cse-cic-ids-2018/*.csv
!unzip -o ../01_Datensaetze/improved_cse-cic-ids-2018/CSECICIDS2018_improved.zip -d ../01_Datensaetze/improved_cse-cic-ids-2018/

In [1]:
# Parquet Verzeichnisse leeren
!rm -r ../01_Datensaetze/improved_cic-ids-2017/ids17_parquet
!rm -r ../01_Datensaetze/improved_cse-cic-ids-2018/ids18_parquet

rm: cannot remove '../01_Datensaetze/improved_cic-ids-2017/ids17_parquet_prep_1': No such file or directory
rm: cannot remove '../01_Datensaetze/improved_cse-cic-ids-2018/ids18_parquet_prep_1': No such file or directory


In [2]:
!rm -r ../01_Datensaetze/improved_cic-ids-2017/ids17_parquet_prep_0
!rm -r ../01_Datensaetze/improved_cse-cic-ids-2018/ids18_parquet_prep_0

## Prüfen der Spaltenanzahl beider Datensätze

In [6]:
def count_columns(file):
    df = pd.read_csv(file, nrows=1)
    return {file:df.shape[1]}

def count_columns_in_directory(directory):
    try:
        logging.info(f"Start counting columns in directory {directory}")
        with mp.Pool(processes=5) as pool:
            result = pool.map(count_columns, [os.path.join(directory, datei) for datei in os.listdir(directory) if datei.endswith('.csv')])

        logging.info(f"Successfully counted columns in directory {directory}")
        return result
    except Exception as e:
        logging.error(f"Error counting columns in directory {directory}: {e}")
        return []

In [None]:
count_columns_in_directory(verzeichnis_ids17)

In [None]:
count_columns_in_directory(verzeichnis_ids18)

Im Gegensatz zu den originalen Datensätzen ist hier die Spaltenanzahl in beiden Datensätzen gleich.

## Prüfen der Spaltenbezeichnungen beider Datensätze

In [None]:
# ids17
ids17_col_hash = []
for datei in os.listdir(verzeichnis_ids17):
    if datei.endswith('.csv'):
        pfad_zur_datei = os.path.join(verzeichnis_ids17, datei)
        df = pd.read_csv(pfad_zur_datei, nrows=2, skipinitialspace=True)
        ids17_col_hash.append(hash(tuple(df.columns)))

# Prüfen, ob alle Elemente in der Liste gleich sind
all_equal = all(x == ids17_col_hash[0] for x in ids17_col_hash)
print(f'Alle Elemente und somit alle Header in ids17 sind gleich:', all_equal)  

if all_equal:
    ids17_columns = df.columns

del df

In [None]:
ids18_col_hash = []
for datei in os.listdir(verzeichnis_ids18):
    if datei.endswith('.csv'):
        pfad_zur_datei = os.path.join(verzeichnis_ids18, datei)
        df = pd.read_csv(pfad_zur_datei, nrows=2)
        ids18_col_hash.append(hash(tuple(df.columns)))

# Prüfen, ob alle Elemente in der Liste gleich sind
all_equal = all(x == ids17_col_hash[0] for x in ids17_col_hash)
print(f'Alle Elemente und somit alle Header in ids18 sind gleich:', all_equal)  

if all_equal:
    ids18_columns = df.columns

del df

Prüfen, welche Spaltennamen in beiden Datensätzen (un)gleich sind

In [None]:
# Element für Element Vergleich
for col17, col18 in zip(ids17_columns, ids18_columns):
    if col17 == col18:
        print(f"Die Spalten sind gleich:".ljust(40), f'{col17}')
    else:
        print(f"Die Spalten sind unterschiedlich:".ljust(40), f"{col17} != {col18}")

Da sich die Bezeichnung der Spalten zwischen ids17 und ids18 nicht wie in den originalen Datensätzen unterscheidet muss für die verbesserten Datensätze hier keine Anpassung erfolgen.

## CSV Dateien in dask.dataframe laden

In [4]:
def load_csvs_to_dask_dataframe(directory):
    try:
        logging.info(f"Lade alle CSV-Dateien aus dem Verzeichnis {directory} in ein Dask DataFrame")
        ddf = dd.read_csv(os.path.join(directory, '*.csv'), assume_missing=True, blocksize='64MB')
        logging.info(f"Dask DataFrame erfolgreich erstellt.")
        return ddf
    except Exception as e:
        logging.error(f"Fehler beim Laden der CSV-Dateien aus dem Verzeichnis {directory}: {e}")
        return None
    
def get_shape(df):
    # Anzahl der Zeilen
    n_rows = df.shape[0].compute()
    n_cols = len(df.columns)
    return n_rows, n_cols

In [5]:
ddf_ids17 = load_csvs_to_dask_dataframe(verzeichnis_ids17)

In [6]:
ddf_ids18 = load_csvs_to_dask_dataframe(verzeichnis_ids18)

In [7]:
print(get_shape(ddf_ids17))
print(get_shape(ddf_ids18))

(2099976, 91)
(63195145, 91)


## Auf NaN und Inf Werte prüfen

In [31]:
def check_nan_and_inf_in_dask_dataframe(df):
    # Prüfen auf NaN-Werte
    nan_count = df.isna().sum().compute()
    logging.info("NaN values in each column:")
    logging.info(nan_count)

    # Prüfen auf Inf-Werte (sowohl positive als auch negative)
    inf_count = ((df == np.inf) | (df == -np.inf)).sum().compute()
    logging.info("Inf values in each column:")
    logging.info(inf_count)

In [32]:
logging.info("Prüfe NaN und Inf Werte in ids17")
check_nan_and_inf_in_dask_dataframe(ddf_ids17)

In [14]:
logging.info("Prüfe NaN und Inf Werte in ids18")
check_nan_and_inf_in_dask_dataframe(ddf_ids18)

## Werte der Spalten untersuchen

In [15]:
def inspect_column_values(df):
    # Inspektion der Werte in jeder Spalte
    logging.info("Inspecting column values...")
    for column in df.columns:
        try:
            unique_values = df[column].unique().compute()
            min_value = df[column].min().compute()
            max_value = df[column].max().compute()
            print(f"Spalte: {column}")
            print(f"  Einzigartige Werte: {unique_values[:5]}{'...' if len(unique_values) > 5 else ''}")
            print(f"  Min-Wert: {min_value}")
            print(f"  Max-Wert: {max_value}")
        except Exception as e:
            logging.error(f"Error inspecting column '{column}': {e}")
            print(f"  Fehler bei der Inspektion von Spalte '{column}': {e}")


In [None]:
inspect_column_values(ddf_ids17)

## Korrelation berechnen

In [None]:
# Berechnung
correlation_matrix = ddf_ids17.corr().compute()
print(correlation_matrix)
import matplotlib.pyplot as plt
import seaborn as sns

# Visualisierung
plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix, annot=False, cmap='coolwarm')
plt.title('Correlation Heatmap')
plt.show()

In [None]:
# Berechnung
correlation_matrix = ddf_ids18.corr().compute()
print(correlation_matrix)
import matplotlib.pyplot as plt
import seaborn as sns

# Visualisierung
plt.figure(figsize=(12, 10))
sns.heatmap(correlation_matrix, annot=False, cmap='coolwarm')
plt.title('Correlation Heatmap')
plt.show()

## Speichern der Datensätze als Parquet

In [8]:
def save_dask_dataframe(df, file_path):
    # Speichern des Dask DataFrames als Parquet-Datei für effizientes Wiederladen
    df.to_parquet(file_path, write_index=False)
    logging.info(f"DataFrame successfully saved to {file_path}")

In [None]:
save_dask_dataframe(ddf_ids17, verzeichnis_ids17 + '/ids17_parquet')

In [12]:
save_dask_dataframe(ddf_ids18, verzeichnis_ids18 + '/ids18_parquet')

## Dask Client beenden

In [13]:
client.close()