## Prepare the Labelled Log Data

In [6]:
import pandas as pd
import re

## Read Hadoop log File
##### Each line of log comprises of the Date, Time, PID, Level, Event

In [9]:
# Read log data from the .log file
with open('HDFS_5k.log', 'r') as log_file:
    log_lines = log_file.readlines()  # Read all lines from the log file

log_df = pd.DataFrame(log_lines, columns=['Log'])

In [11]:
print(log_df.shape)

(4519, 1)


In [13]:
pd.set_option('display.max_colwidth', None)
log_df.head()

Unnamed: 0,Log
0,081109 203518 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.19.102:54106 dest: /10.250.19.102:50010\n
1,081109 203518 35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar. blk_-1608999687919862906\n
2,081109 203519 143 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.10.6:40524 dest: /10.250.10.6:50010\n
3,081109 203519 145 INFO dfs.DataNode$DataXceiver: Receiving block blk_-1608999687919862906 src: /10.250.14.224:42420 dest: /10.250.14.224:50010\n
4,081109 203519 145 INFO dfs.DataNode$PacketResponder: PacketResponder 1 for block blk_-1608999687919862906 terminating\n


## Read Labels
##### Each BlockId is labelled Normal or Anomaly

In [16]:
# Read anomaly labels from CSV
anomaly_labels_df = pd.read_csv('./anomaly_label.csv')  # Contains 'BlockId' and 'Label'

# Convert anomaly labels to a dictionary for quick lookup
anomaly_labels_data = dict(zip(anomaly_labels_df['BlockId'], anomaly_labels_df['Label']))

# Create a list to hold combined data
combined_data = []

In [18]:
anomaly_labels_df.head()

Unnamed: 0,BlockId,Label
0,blk_-1608999687919862906,Normal
1,blk_7503483334202473044,Normal
2,blk_-3544583377289625738,Anomaly
3,blk_-9073992586687739851,Normal
4,blk_7854771516489510256,Normal


## Associate Event Log and Labels
##### Each BlockId is associated to a set of Events. Tag each Event to be either Normal or Anomaly.

In [30]:
# Process each log line
for index, row in log_df.iterrows():
    line = row['Log']
    # Extract block IDs from the log line
    block_ids = re.findall(r'blk_[\-\d]+', line)
    
    if block_ids:
        # Get the label for each block ID found
        for block_id in block_ids:
            label = anomaly_labels_data.get(block_id, 'Unknown')
            combined_data.append({'Log': line.strip(), 'BlockId': block_id, 'Label': label})

# Convert combined data to a DataFrame
combined_df = pd.DataFrame(combined_data)

# Display the combined DataFrame
pd.set_option('display.max_colwidth', 35)
print(combined_df)

                                      Log                   BlockId    Label
0      081109 203518 143 INFO dfs.Data...  blk_-1608999687919862906   Normal
1      081109 203518 35 INFO dfs.FSNam...  blk_-1608999687919862906   Normal
2      081109 203519 143 INFO dfs.Data...  blk_-1608999687919862906   Normal
3      081109 203519 145 INFO dfs.Data...  blk_-1608999687919862906   Normal
4      081109 203519 145 INFO dfs.Data...  blk_-1608999687919862906   Normal
...                                   ...                       ...      ...
19407  081110 105225 19 INFO dfs.FSDat...    blk_872694497849122755  Anomaly
19408  081110 105400 19 INFO dfs.FSDat...   blk_3947106522258141922  Anomaly
19409  081110 105400 19 INFO dfs.FSDat...   blk_3947106522258141922  Anomaly
19410  081110 105446 19 INFO dfs.FSDat...   blk_-774246298521956028   Normal
19411  081110 105446 19 INFO dfs.FSDat...   blk_-774246298521956028   Normal

[19412 rows x 3 columns]


## Clean the Data
##### Remove Date, Time, and other unnecessary information

In [33]:
combined_df = combined_df.drop(['BlockId'],axis=1)

In [35]:
# Function to process the logs
def clean_log(log):
    # Remove date and time (assumes format 'DDMMYY HHMMSS')
    log = re.sub(r'^\d{6} \d{6} ', '', log)
    # Remove all occurrences of 'blk_-xxx' patterns
    log = re.sub(r'blk_-?\d+', '', log)
    return log.strip()

# Apply the cleaning function to the 'Log' column
combined_df['Log'] = combined_df['Log'].apply(clean_log)

In [39]:
pd.set_option('display.max_colwidth', None)
combined_df.head()

Unnamed: 0,Log,Label
0,143 INFO dfs.DataNode$DataXceiver: Receiving block src: /10.250.19.102:54106 dest: /10.250.19.102:50010,Normal
1,35 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /mnt/hadoop/mapred/system/job_200811092030_0001/job.jar.,Normal
2,143 INFO dfs.DataNode$DataXceiver: Receiving block src: /10.250.10.6:40524 dest: /10.250.10.6:50010,Normal
3,145 INFO dfs.DataNode$DataXceiver: Receiving block src: /10.250.14.224:42420 dest: /10.250.14.224:50010,Normal
4,145 INFO dfs.DataNode$PacketResponder: PacketResponder 1 for block terminating,Normal


In [41]:
# Save the combined DataFrame to a new CSV file
combined_df.to_csv('combined_logs_with_labels.csv', index=False)