# MGL870 - TP2 - Utilisation de l’apprentissage machine pour la détection des anomalies
## Pierre Joseph, Jonathan Mésidor, Mohamed Fehd Soufi
## Automne 2024


## Requirements

`pip install jupyter logparser3 drain3`

## Import required libraries

In [None]:
import os
import pandas as pd
import sys
sys.path.append('../../')
from logparser.Drain import LogParser
import re


## HDFS_V1

### Drain 3 parser on HDFS.log

In [None]:
input_dir = './input/HDFS_v1/'  
output_dir = './results' 
log_file = 'HDFS.log' 

log_format = '<date> <Time> <Pid> <Level> <Component>: <Content>'
regex      = [
    r'blk_(|-)[0-9]+' , # block id
    r'(/|)([0-9]+\.){3}[0-9]+(:[0-9]+|)(:|)', # IP
    r'(?<=[^A-Za-z0-9])(\-?\+?\d+)(?=[^A-Za-z0-9])|[0-9]+$', # Numbers
]

st = 0.5 
depth = 4  

parser = LogParser(log_format, indir=input_dir, outdir=output_dir, depth=depth, st=st, rex=regex)
parser.parse(log_file)

Parsing file: ./input/HDFS_v1/HDFS.log
Total lines:  11175629
Processed 0.0% of log lines.
Processed 0.0% of log lines.
Processed 0.0% of log lines.
Processed 0.0% of log lines.
Processed 0.0% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.1% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.2% of log lines.
Processed 0.3% of log lines.
Processed 0.3% of log lines.
Processed 0.3% of log lines.
Processed 0.3% of log lines.
Processed 0.3% of log lines.
Processed 

### Mapping
We created the EventId mapping to have a clearer matrix of events. First we sort the EventId by order of magnitude then we associate a value E(x) = {E0, E1, E2 .... } to each one

In [None]:
def mapping():
    log_temp = pd.read_csv(log_templates_file).sort_values(by="Occurrences", ascending=False)
    log_temp_dict = {event: f"E{idx + 1}" for idx, event in enumerate(log_temp["EventId"])}
    
    # Sauvegarde du dictionnaire de mappage
    output_path = os.path.join(output_dir, "hdfs_log_templates.json")
    with open(output_path, "w") as f:
        json.dump(log_temp_dict, f)
    print("Mapping completed and saved to", output_path)
    return log_temp_dict

### Convert the structure of HDFS to have Blk
the code adds the columns BlockId, Label and Change the EventId to E1, E2, E3 ... so that we can have a complete structure.

In [None]:
input_dir = './HDFS_v1/'
output_dir = './HDFS_v1/output/'
csv_directory = './result'
json_file_path = os.path.join(output_dir, 'hdfs_log_templates.json')  # Chemin vers le fichier JSON contenant le mapping des EventId
anomaly_label_path = os.path.join(input_dir, "preprocessed/anomaly_label.csv")
# Charger le fichier CSV structuré
structured_log_path = os.path.join(csv_directory, 'HDFS.log_structured.csv')
df_structured = pd.read_csv(structured_log_path)

# Charger le fichier JSON contenant le mapping des EventId vers des labels
with open(json_file_path, 'r') as json_file:
    event_mapping = json.load(json_file)

# Charger le fichier anomaly_label.csv
df_labels = pd.read_csv(anomaly_label_path)
df_labels['Label'] = df_labels['Label'].replace({'Normal': 'Success', 'Anomaly': 'Fail'})

# Ajouter la colonne BlockId en extrayant les identifiants des blocs
df_structured['BlockId'] = df_structured['Content'].apply(lambda x: re.search(r'blk_(|-)[0-9]+', x).group(0) if re.search(r'blk_(|-)[0-9]+', x) else None)

# Supprimer les lignes où BlockId est NaN
df_structured = df_structured.dropna(subset=['BlockId'])

# Remplacer les EventId par les valeurs correspondantes dans le fichier JSON
df_structured['EventId'] = df_structured['EventId'].apply(lambda x: event_mapping.get(x, x))

# Fusionner les DataFrames pour ajouter la colonne Label
df_structured = pd.merge(df_structured, df_labels, on='BlockId', how='left')

# Réorganiser les colonnes pour que BlockId et Label soient les premières
columns = ['BlockId', 'Label'] + [col for col in df_structured.columns if col not in ['BlockId', 'Label']]
df_structured = df_structured[columns]

# Sauvegarder le nouveau fichier structuré
structured_log_path_with_blockid = os.path.join(csv_directory, 'HDFS.log_structured_blk.csv')
df_structured.to_csv(structured_log_path_with_blockid, index=False)

print(f"Le fichier structuré avec BlockId et les EventId remplacés est généré et sauvegardé dans {structured_log_path_with_blockid}")

Le fichier structuré avec BlockId est généré et sauvegardé dans ./results/HDFS.log_structured_blk.csv


### Sample

In [3]:
import os
import re
import pandas as pd
from collections import defaultdict
from tqdm import tqdm

def hdfs_sampling(log_file, output_path="result/HDFS_sequence.csv", window='session', window_size=0):
    assert window == 'session', "Only window=session is supported for HDFS dataset."
    print("Loading", log_file)
    
    # Charger le fichier structuré en mémoire avec des types optimisés pour les grands fichiers
    struct_log = pd.read_csv(log_file, engine='c', na_filter=False, memory_map=True, dtype={'Time': str})
    struct_log['Time'] = struct_log['Time'].str.zfill(6)  # Ajouter des zéros pour standardiser le format HHMMSS

    # Extraire BlockId pour chaque ligne et compter le nombre de `Fail` directement
    struct_log['BlockId'] = struct_log['Content'].str.extract(r'(blk_-?\d+)')
    struct_log['EventId'] = struct_log['EventId'].fillna('')
    struct_log['Label'] = struct_log['Label'].apply(lambda x: 1 if x == 'Fail' else 0)

    # Initialiser les dictionnaires pour stocker les résultats
    data_dict = defaultdict(list)
    time_dict = defaultdict(list)
    type_count = defaultdict(int)

    # Grouper par BlockId pour des opérations en bloc
    grouped = struct_log.groupby('BlockId')
    for block_id, group in tqdm(grouped, total=len(grouped)):
        data_dict[block_id] = group['EventId'].tolist()
        time_dict[block_id] = pd.to_datetime(group['Time'], format='%H%M%S', errors='coerce').dropna()
        type_count[block_id] = group['Label'].sum()  # Compter les occurrences de "Fail"

    # Construire le DataFrame final
    rows = []
    for block_id, events in tqdm(data_dict.items(), total=len(data_dict)):
        features = [event for event in events if event]
        
        times = time_dict[block_id]
        if len(times) > 1:
            time_intervals = [(times.iloc[i] - times.iloc[i - 1]).total_seconds() for i in range(1, len(times))]
            latency = (times.iloc[-1] - times.iloc[0]).total_seconds()
        else:
            time_intervals = []
            latency = 0

        label = 'Fail' if type_count[block_id] > 0 else 'Success'

        rows.append({
            "BlockId": block_id,
            "Label": label,
            "Type": type_count[block_id],
            "Features": str(features),
            "TimeInterval": str(time_intervals),
            "Latency": latency
        })

    data_df = pd.DataFrame(rows, columns=['BlockId', 'Label', 'Type', 'Features', 'TimeInterval', 'Latency'])
    data_df.to_csv(output_path, index=False)
    print(f"HDFS sampling completed. Output saved to {output_path}")

# Appeler la fonction avec le fichier approprié
hdfs_sampling('result/HDFS.log_structured_blk.csv')


Loading result/HDFS.log_structured_blk.csv


100%|██████████| 433357/433357 [07:51<00:00, 918.85it/s] 
100%|██████████| 433357/433357 [10:10<00:00, 710.33it/s]


HDFS sampling completed. Output saved to result/HDFS_sequence.csv


### Event Occurence Matrix

In [1]:
import os
import pandas as pd
import re
input_dir = './HDFS_v1/'
event_traces_file = os.path.join(input_dir, "preprocessed/Event_traces.csv")
anomaly_label_file = os.path.join(input_dir, "preprocessed/anomaly_label.csv")
# Charger les fichiers
output_dir = './HDFS_v1/output/'
output_file = os.path.join(output_dir, "Event_occurence_matrix.csv")

# Charger le fichier anomaly_label.csv et mapper les labels
anomaly_labels = pd.read_csv(anomaly_label_file)
anomaly_labels['Label'] = anomaly_labels['Label'].apply(lambda x: 'Fail' if x == 'Anomaly' else 'Success')
label_dict = anomaly_labels.set_index('BlockId')['Label'].to_dict()

# Charger le fichier Event_traces.csv
event_traces = pd.read_csv(event_traces_file)

# Initialiser les colonnes d'événements (E1 à E29)
event_columns = [f"E{i}" for i in range(1, 30)]
occurrence_matrix = []

# Itérer sur chaque ligne pour construire la matrice d'occurrence
for _, row in event_traces.iterrows():
    block_id = row['BlockId']
    label = label_dict.get(block_id, 'Unknown')
    features = row['Features']
    event_list = re.findall(r"E\d+", features)

    # Compter les occurrences des événements
    event_counts = {event: event_list.count(event) for event in event_columns}
    
    # Ajouter le résultat à la matrice
    occurrence_matrix.append({
        "BlockId": block_id,
        "Label": label,
        "Type": row['Type'] if pd.notna(row['Type']) else '',
        **event_counts
    })

# Convertir en DataFrame et sauvegarder
occurrence_matrix_df = pd.DataFrame(occurrence_matrix)
occurrence_matrix_df = occurrence_matrix_df[['BlockId', 'Label', 'Type'] + event_columns]
occurrence_matrix_df.to_csv(output_file, index=False)
print(f"Event occurrence matrix saved to {output_file}")


Event occurrence matrix saved to ./HDFS_v1/output/Event_occurence_matrix.csv
