# 1.Характеристика логов: количетво записей, структура, поля

In [39]:
import os, re, zipfile, glob
from datetime import datetime
from typing import Optional, Dict, Iterator
import pandas as pd
import numpy as np

ZIP_CANDIDATES = [
    "HDFS_v1 (1).zip", "HDFS_v1.zip",
    "/mnt/data/HDFS_v1 (1).zip", "/mnt/data/HDFS_v1.zip"
]
EXTRACT_DIR = "HDFS_v1_extracted"
RAW_LIMIT = None

#распаковка
def unzip_hdfs(zip_candidates, target_dir) -> str:
    zip_path = next((z for z in zip_candidates if os.path.exists(z)), None)
    if not zip_path and os.path.isdir(target_dir):
        print("Используем распакованное:", os.path.abspath(target_dir))
        return os.path.abspath(target_dir)
    if not zip_path:
        raise FileNotFoundError("HDFS_v1.zip не найден")
    os.makedirs (target_dir, exist_ok =True)
    with zipfile.ZipFile(zip_path, "r") as zf:
        zf.extractall(target_dir)
    print("Распаковано:", os.path.abspath(target_dir))
    return os.path.abspath(target_dir)

root = unzip_hdfs(ZIP_CANDIDATES, EXTRACT_DIR)

#поиск нужных файлов
def find_one(root: str, name: str) -> Optional[str]:
    hits = glob.glob(os.path.join(root, "**", name), recursive =True)
    return hits[0] if hits else None

p_log  = find_one(root,"HDFS.log")
p_feat = find_one(root, "Event_occurrence_matrix.csv")  
p_lab  = find_one(root, "anomaly_label.csv")

print("Найдено:")
print("HDFS.log:", p_log or '-')
print("Event_occurrence_matrix.csv:", p_feat or '-')
print("anomaly_label.csv:", p_lab or "-")

#паттерны для парсинга строк HDFS
#паттерн: YYMMDD PID LEVEL COMPONENT: MESSAGE
RE_MAIN = re.compile(
    r"""^(?P<date>\d{6})\s(?P<time>\d{6})\s+
        (?P<pid>\d+)\s+
        (?P<level>[A-Z]+)\s+
        (?P<component>[^:]+):\s?
        (?P<message>.*)$
    """, re.VERBOSE
)

#Фоллбек (без pid)
RE_FALLBACK = re.compile(
    r"""^(?P<date>\d{6})\s(?P<time>\d{6})\s+
        (?P<level>[A-Z]+)\s+
        (?P<component>[^:]+):\s?
        (?P<message>.*)$
    """, re.VERBOSE
)
# Паттерн для поиска BlockId в тексте сообщения (используется внутри parse_line)
RE_BLOCK = re.compile(r"\bblk_[\-\d]+")

def parse_line(line: str) -> Optional[Dict]:
    s = line.rstrip("\n")
    m = RE_MAIN.match(s) or RE_FALLBACK.match(s)
    if not m: 
        return None 
    g = m.groupdict()
    try: 
        ts = datetime.strptime(g["date"] + " " + g["time"], "%y%m%d %H%M%S")
    except Exception:
        ts = None
    msg = g.get("message", "")
    bid = None
    m_blk = RE_BLOCK.search(msg)
    if m_blk:
        bid = m_blk.group(0)
    return {
    "timestamp": ts, 
    "level": g.get("level"),
    "component": (g.get("component")or "").strip(),
    "message": msg, 
    "BlockId": bid, 
    "pid": g.get("pid")
    }
    
def stream_parse(path: str, limit:Optional[int]) -> Iterator[Dict]:
    with open(path, "r", errors="ignore") as f:
        for i, line in enumerate(f):
            rec = parse_line(line)
            if rec:
                yield rec
            if limit is not None and (i+1) >= limit:
                break

# Если найден HDFS.log — извлекаем из него реальные поля (timestamp, level, component, message, BlockId, pid)
# и формируем структурированный DataFrame
if p_log and os.path.exists(p_log):
    df = pd.DataFrame(stream_parse(p_log, RAW_LIMIT),
                      columns=["timestamp","level","component","message","BlockId","pid"])
    if df.empty:
        raise RuntimeError("Парсер не распознал ни одной строки. Пришли 3–5 строк из HDFS.log для подстройки шаблона.")
        
    # Приводим уровень логов (level) к верхнему регистру и проверяем корректность значений (INFO/WARN/ERROR)
    df["level"] = df["level"].astype(str).str.upper()
    bad_levels = df.loc[~df["level"].str.fullmatch(r"[A-Z]+", na=False), "level"].unique()
    if len(bad_levels):
        print("Подозрительные значения level:", bad_levels[:10])

    df["component"] = df["component"].astype("string")
    df["message"] = df["message"].astype("string")
    df["BlockId"] = df["BlockId"].astype("string")
    print(f"\n Собран RAW DataFrame: {df.shape}")
    display(df.head(5))
    
    # Характеристика логов (структура после парсинга)
    print("\n Характеристика логов (raw)")
    print("Строк:" f"{len(df):,}")
    print("Колонки:", list(df.columns))
    display(pd.DataFrame(df.dtypes, columns=["dtype"]))
    if df["timestamp"].notna().any():
        print("Диапозон дат:", df["timestamp"].min(), "->", df["timestamp"].max())
    if df["level"].notna().any():
        print("Уровни:", df["level"].value_counts().to_dict())
    if df["component"].notna().any():
        print("Компоненты (топ-10):")
        display(df["component"].value_counts().head(10).to_frame("count"))
    if df["BlockId"].notna().any():
        print("Уникальных BlockId (в сэмпле):", df["BlockId"].nunique())
    print("Примеры сообщений:")
    display(df["message"].head(5))

# Если HDFS.log отсутствует — используем готовые предобработанные данные (features + labels)
else:
    assert p_feat and p_lab, "Нет HDFS.log и не был найден Event_occurrence_matrix.csv/anomaly_label.csv"
    #читаем features; если есть колонка Label в feautures - исключим ее
    feat_cols = pd.read_csv(p_feat, nrows=0).columns.tolist()
    usecols_feat = [c for c in feat_cols if c.lower() != 'label']
    df_feat = pd.read_csv(p_feat, usecols=usecols_feat)
    df_lab  = pd.read_csv(p_lab)

# Нормализация ключевого поля для корректного объединения данных
    df_feat.rename(columns={"block_id":"BlockId","blockid":"BlockId"}, inplace = True)
    df_lab.rename(columns={"block_id":"BlockId", "blockid":"BlockId"}, inplace = True)

    df = df_feat.merge(df_lab[["BlockId", "Label"]], on = "BlockId", how = "left")
#Нормализация меток
    norm = {"normal":"Success", "anomaly":"Fail", "success":"Success","fail":"Fail"}
    df['Label'] = df['Label'].map(lambda x: norm.get(str(x).lower(), x) if pd.notna(x) else x)
    df["y"] = pd.Series(np.where(df['Label'].eq("Fail"),1,0), dtype="Int8").where(df["Label"].notna(), other = pd.NA)

# Преобразуем признаки E# в компактный тип uint16 для экономии памяти
    e_cols = [c for c in df.columns if c.startswith("E") and c[1:].isdigit()]
    if e_cols:
        df[e_cols] = df[e_cols].fillna(0).astype('uint16')
    print(f"\n RAW логов нет. Собран FEATURES DataFrame: {df.shape}")
    display(df.head(5))

#Характеристика таблицы признаков
    print("\n ХАРАКТЕРИСТИКА ПРИЗНАКОВ (features+labels)")
    print("Строк:", f"{len(df):,}")
    print("Колонок:", df.shape[1])
    print("Колонки (первые 20):", df.columns.tolist()[:20], "…")
    display(pd.DataFrame(df.dtypes, columns=["dtype"]).head(20))
    print("Пропуски (топ-15):")
    display(df.isna().sum().sort_values(ascending=False).head(15).to_frame("nulls"))
    print("Примеры строк:")
    display(df.head(5))

df.to_parquet("hdfs_logs.parquet")

Распаковано: /Users/slvic/Applications/HDFS_v1_extracted
Найдено:
HDFS.log: /Users/slvic/Applications/HDFS_v1_extracted/HDFS.log
Event_occurrence_matrix.csv: /Users/slvic/Applications/HDFS_v1_extracted/preprocessed/Event_occurrence_matrix.csv
anomaly_label.csv: /Users/slvic/Applications/HDFS_v1_extracted/preprocessed/anomaly_label.csv

 Собран RAW DataFrame: (11175629, 6)


Unnamed: 0,timestamp,level,component,message,BlockId,pid
0,2008-11-09 20:35:18,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,143
1,2008-11-09 20:35:18,INFO,dfs.FSNamesystem,BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...,blk_-1608999687919862906,35
2,2008-11-09 20:35:19,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,143
3,2008-11-09 20:35:19,INFO,dfs.DataNode$DataXceiver,Receiving block blk_-1608999687919862906 src: ...,blk_-1608999687919862906,145
4,2008-11-09 20:35:19,INFO,dfs.DataNode$PacketResponder,PacketResponder 1 for block blk_-1608999687919...,blk_-1608999687919862906,145



 Характеристика логов (raw)
Строк:11,175,629
Колонки: ['timestamp', 'level', 'component', 'message', 'BlockId', 'pid']


Unnamed: 0,dtype
timestamp,datetime64[ns]
level,object
component,string[python]
message,string[python]
BlockId,string[python]
pid,object


Диапозон дат: 2008-11-09 20:35:18 -> 2008-11-11 11:16:28
Уровни: {'INFO': 10812836, 'WARN': 362793}
Компоненты (топ-10):


Unnamed: 0_level_0,count
component,Unnamed: 1_level_1
dfs.FSNamesystem,3700245
dfs.DataNode$PacketResponder,3413350
dfs.DataNode$DataXceiver,2518678
dfs.FSDataset,1407597
dfs.DataBlockScanner,120046
dfs.DataNode,7002
dfs.DataNode$DataTransfer,6946
dfs.DataNode$BlockReceiver,1718
dfs.PendingReplicationBlocks$PendingReplicationMonitor,47


Уникальных BlockId (в сэмпле): 575061
Примеры сообщений:


0    Receiving block blk_-1608999687919862906 src: ...
1    BLOCK* NameSystem.allocateBlock: /mnt/hadoop/m...
2    Receiving block blk_-1608999687919862906 src: ...
3    Receiving block blk_-1608999687919862906 src: ...
4    PacketResponder 1 for block blk_-1608999687919...
Name: message, dtype: string