### Conversión de dataset de tipo de HDFS de JSON a CSV

In [24]:
import pandas as pd
import re
import csv
from io import StringIO
import json

In [25]:
# Corregir posibles errores en la línea
def repair_json_line(line):
    # Asegurar que todas las claves estén entre comillas dobles
    line = re.sub(r'(?<=,|\{)\s*([^"{\s][^:]*?)\s*:', r' "\1":', line.replace("'", '"'))
    # Añadir comas necesarias si no es el final del archivo y no termina en coma o cierre
    if line.strip() and not line.strip().endswith(('}', ']', ',')):
        line += ','
    return line

In [26]:
# Limpiar y cargar un archivo CSV
def load_and_clean_json(filename):
    try:
        with open(filename, 'r') as file:
            data = json.load(file)
        return pd.DataFrame(data)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        # Devolver un DataFrame vacío en caso de error
        return pd.DataFrame()  


In [27]:
# Obtener el contenido de una línea de un fichero
def get_line_content(filename, line_number):
    try:
        with open(filename, 'r') as file:
            for current_line, content in enumerate(file, start=1):
                if current_line == line_number:
                    return content
    except Exception as e:
        return f"Error reading line: {str(e)}"

In [28]:
# Cargar y limpiar el archivo JSON
df = load_and_clean_json('0X_nombre_dataset.json')
if df is not None and not df.empty:
    print(df.head())
else:
    print("Fallo al cargar el archivo JSON.")

             timestamp                   hostname  \
0  2008-11-09T20:35:18                      block   
1  2008-11-09T20:35:18  NameSystem.allocateBlock:   
2  2008-11-09T20:35:19                      block   
3  2008-11-09T20:35:19                      block   
4  2008-11-09T20:35:19                          1   

                                             service  \
0                                -160899968791986290   
1  /hadoop/mapred/system/job_200811092030_0001/jo...   
2                                -160899968791986290   
3                                -160899968791986290   
4                                                      

                                             message  
0  src: /10.250.19.102:54106 dest: /10.250.19.102...  
1                           blk_-1608999687919862906  
2   src: /10.250.10.6:40524 dest: /10.250.10.6:50010  
3  src: /10.250.14.224:42420 dest: /10.250.14.224...  
4         block blk_-1608999687919862906 terminating  


In [29]:
# Suponiendo que df ya está cargado con la data del JSON
try:
    df['time'] = pd.to_datetime(df['timestamp'], errors='coerce', infer_datetime_format=True)
    df['YYYY'] = df['time'].dt.year.apply(lambda x: f'{x:4}')
    df['MM'] = df['time'].dt.month.apply(lambda x: f'{x:02}')
    df['DD'] = df['time'].dt.day.apply(lambda x: f'{x:02}')
    df['hh:mm'] = df['time'].dt.strftime('%H:%M')

    # Muestra los resultados de la conversión y las nuevas columnas
    print(df[['time', 'YYYY', 'MM', 'DD', 'hh:mm']])
except Exception as e:
    print(f"Error processing dates: {e}")

  df['time'] = pd.to_datetime(df['timestamp'], errors='coerce', infer_datetime_format=True)


                      time  YYYY  MM  DD  hh:mm
0      2008-11-09 20:35:18  2008  11  09  20:35
1      2008-11-09 20:35:18  2008  11  09  20:35
2      2008-11-09 20:35:19  2008  11  09  20:35
3      2008-11-09 20:35:19  2008  11  09  20:35
4      2008-11-09 20:35:19  2008  11  09  20:35
...                    ...   ...  ..  ..    ...
276148 2008-11-09 21:11:11  2008  11  09  21:11
276149 2008-11-09 21:11:11  2008  11  09  21:11
276150 2008-11-09 21:11:11  2008  11  09  21:11
276151 2008-11-09 21:11:11  2008  11  09  21:11
276152 2008-11-09 21:11:11  2008  11  09  21:11

[276153 rows x 5 columns]


In [30]:
# Funciones para extraer las distintas columnas del campo principal 'message'
def extract_ip(message):
    match = re.search(r'\b(?:\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b', message)
    return match.group(0) if match else None

df['IP'] = df['message'].apply(extract_ip)

def extract_port(message):
    common_ports = {
        '21': 'FTP', '22': 'SSH', '23': 'Telnet', '25': 'SMTP', '53': 'DNS',
        '80': 'HTTP', '110': 'POP3', '143': 'IMAP', '443': 'HTTPS', '587': 'SMTP',
        '3306': 'MySQL', '5432': 'PostgreSQL', '8080': 'HTTP-alt'
    }
    matches = re.findall(r'\b\d{2,5}\b', message)
    ports = [port for port in matches if port in common_ports]
    return ports[0] if ports else None

df['Port'] = df['message'].apply(extract_port)

def extract_keyword(message):
    keywords = ['error', 'fatal', 'failure', 'exception', 'warning', 'critical', 'denied', 'unreachable', 'timeout', 'failed']
    pattern = re.compile('|'.join(keywords), re.IGNORECASE)
    match = pattern.search(message)
    return match.group(0) if match else None

def extract_action(message):
    actions = ['restarted', 'stopped', 'started', 'deployed']
    pattern = r'\b(' + '|'.join(actions) + r')\b'
    match = re.search(pattern, message, re.IGNORECASE)
    return match.group(0) if match else None

df['Action'] = df['message'].apply(extract_action)

df['Keyword'] = df['message'].apply(extract_keyword)

df['User'] = df['message'].apply(lambda x: re.findall(r'user\s+(\w+)', x)[0] if re.findall(r'user\s+(\w+)', x) else None)

def extract_interface(message):
    pattern = r'\b(eth0|eth[12]|wlan0|lo)\b'
    match = re.search(pattern, message)
    return match.group(0) if match else None

df['Interface'] = df['message'].apply(extract_interface)


df['UID'] = df['message'].apply(lambda x: re.findall(r'uid=(\d+)', x)[0] if re.findall(r'uid=(\d+)', x) else None)

df['Protocol'] = df['message'].apply(lambda x: re.findall(r'\b(TCP|UDP)\b', x, re.IGNORECASE)[0] if re.findall(r'\b(TCP|UDP)\b', x, re.IGNORECASE) else None)

df['Component'] = df['message'].apply(lambda x: re.findall(r'\b(RAS)\b', x)[0] if re.findall(r'\b(RAS)\b', x) else None)

df['Severity'] = df['message'].apply(lambda x: re.findall(r'\b(KERNEL|DISCOVERY)\b', x)[0] if re.findall(r'\b(KERNEL|DISCOVERY)\b', x) else None)

df['Type'] = df['message'].apply(lambda x: re.findall(r'\b(INFO|FATAL|SEVERE|WARNING|ERROR)\b', x, re.IGNORECASE)[0] if re.findall(r'\b(INFO|FATAL|SEVERE|WARNING|ERROR)\b', x, re.IGNORECASE) else None)

df['Thread ID'] = df['message'].apply(lambda x: re.findall(r'\bthread\s+(\d+)\b', x, re.IGNORECASE)[0] if re.findall(r'\bthread\s+(\d+)\b', x, re.IGNORECASE) else None)

df['message'] = df['message'].apply(lambda x: x.replace('\t', ' ').replace('"', '""'))

df = df[['YYYY', 'MM', 'DD', 'hh:mm', 'hostname', 'service', 'User', 'IP', 'Port', 'Keyword', 'Interface', 'UID', 'Action', 'Protocol', 'Component', 'Severity', 'Type', 'Thread ID', 'message']]

output = StringIO()
df.to_csv(output, sep='\t', index=False, header=False)
output.seek(0)

lines = output.readlines()
formatted_lines = []

# Definición de las columnas del dataset en el orden correcto
headers = ['YYYY', 'MM', 'DD', 'hh:mm', 'Hostname', 'Service', 'User', 'IP', 'Port', 'Keyword', 'Interface', 'UID', 'Action', 'Protocol', 'Component', 'Severity', 'Type', 'Thread ID', 'Message']
max_widths = [max(max(len(row[i]) for row in (line.split('\t') for line in lines)), len(header)) for i, header in enumerate(headers)]

# Preparación del formato de las columnas
header_line = "".join(f'{header:<{max_widths[i]}}\t' for i, header in enumerate(headers))
formatted_lines.append(header_line.strip() + '\n')

In [31]:
# Formateo de las líneas del dataset
for line in lines:
    row = line.split('\t')
    # Verificación del número de columnas
    if len(row) == len(max_widths):
        formatted_line = "".join(f'{item.strip():<{max_widths[i]}}\t' for i, item in enumerate(row))
        formatted_lines.append(formatted_line.strip() + '\n')
    else:
        print("Error: Row has incorrect number of columns", row)

In [32]:
# Escritura de las líneas formateadas en un archivo CSV
with open('0X_nombre_dataset.csv', 'w') as file:
    file.writelines(formatted_lines)