In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
import sqlite3
import pandas as pd
from datetime import datetime

In [2]:
def load_logs_from_database(db_path):
    conn = sqlite3.connect(db_path)
    
    # SQL 쿼리 실행
    query = """
    SELECT
        [Event_Date/Time_-_UTC_(yyyy-mm-dd)] AS time,
        Current_File_Name AS file_name,
        File_Operation AS operation
    FROM
        LogFile_Analysis
    WHERE
        File_Operation IN ('Create', 'Modify', 'Delete')
    ORDER BY
        Current_File_Name, [Event_Date/Time_-_UTC_(yyyy-mm-dd)];
    """
    df = pd.read_sql(query, conn)
    
    conn.close()

    # None 값이 있는 경우 처리
    df['time'] = df['time'].fillna('')  # inplace 사용 안 함
    df['file_name'] = df['file_name'].fillna('unknown')  # inplace 사용 안 함

    logs = list(df.itertuples(index=False, name=None))
    
    return logs


In [3]:
# 날짜 변환 함수
def parse_datetime(dt_str):
    # dt_str이 None이거나 빈 문자열인 경우 기본값을 설정
    if dt_str is None or dt_str == '':
        return 0  # 기본값으로 0을 반환

    try:
        # '%Y-%m-%d %H:%M:%S.%f' 형식으로 시간을 파싱
        dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S.%f')
        return int(dt.timestamp())  # 유닉스 타임스탬프로 변환
    except ValueError:
        print(f"Invalid datetime format: {dt_str}")
        return 0  # 변환에 실패할 경우 기본값 0을 반환

In [4]:
# 데이터셋 생성 함수
def create_dataset(logs, time_threshold=300, min_modifications=3):
    sequences = []
    labels = []

    current_sequence = []
    modification_count = 0
    for i in range(1, len(logs)):
        log_time = parse_datetime(logs[i][0])
        prev_log_time = parse_datetime(logs[i-1][0])

        if log_time is None or prev_log_time is None:
            continue  # 시간 값이 None인 경우 해당 로그 건너뜀

        file_name = logs[i][1]
        operation = logs[i][2]
        prev_file_name = logs[i-1][1]
        prev_operation = logs[i-1][2]

        if file_name == prev_file_name and operation == 'Modify':
            modification_count += 1
            current_sequence.append(logs[i])

            # 시간 차이가 짧고 수정 횟수가 임계치를 넘었을 경우 비정상으로 처리
            if (log_time - prev_log_time) <= time_threshold and modification_count >= min_modifications:
                sequences.append(current_sequence)
                labels.append(1)  # 비정상 행위로 라벨링
                current_sequence = []
                modification_count = 0
        else:
            if len(current_sequence) > 0:
                sequences.append(current_sequence)
                labels.append(0)  # 정상 행위로 라벨링
                current_sequence = []
            modification_count = 0

    return sequences, labels

In [16]:
# 로그 시퀀스에서 파일 이름과 같은 문자열 데이터를 숫자로 변환하는 함수
def encode_sequence(sequence):
    encoded = []
    for log in sequence:
        log_time, file_name, operation = log
        
        # 시간 값을 float 또는 int로 변환 (log_time이 이미 숫자여야 함)
        if isinstance(log_time, str):
            log_time = parse_datetime(log_time)  # 혹은 적절히 변환

        # 파일 이름을 숫자로 변환
        file_enc = hash(file_name) % 1000 if file_name else 0  # 'unknown' 파일도 숫자로 변환
        
        # 작업 유형을 숫자로 인코딩
        op_enc = 1 if operation == 'Modify' else 0
        
        encoded.append([log_time, file_enc, op_enc])
    return encoded

In [17]:
# LSTM 모델 정의
class LogLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(LogLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])  # 마지막 타임스텝의 출력만 사용
        return torch.sigmoid(out)

In [18]:
# 데이터 로드 및 전처리
db_path = r"C:\Users\addy0\OneDrive\바탕 화면\normalization.db"
logs = load_logs_from_database(db_path)

In [19]:
# 시퀀스와 라벨을 생성한 후 출력해 확인
sequences, labels = create_dataset(logs)

In [None]:
print(f"Sequences: {sequences}")
print(f"Labels: {labels}")

In [20]:
# 시퀀스를 텐서로 변환
encoded_sequences = [encode_sequence(seq) for seq in sequences]

# 패딩하여 시퀀스 길이를 10으로 맞추기
encoded_sequences = np.array([np.pad(seq, ((0, 10 - len(seq)), (0, 0)), 'constant', constant_values=0) for seq in encoded_sequences])

# 라벨 배열도 numpy로 변환
labels = np.array(labels)

# 데이터셋 분할 (train/test)
X_train, X_test, y_train, y_test = train_test_split(encoded_sequences, labels, test_size=0.2)

In [21]:
# 텐서로 변환 (모든 데이터가 float 또는 int여야 함)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)  # 라벨도 텐서로 변환
y_test = torch.tensor(y_test, dtype=torch.float32).unsqueeze(1)

In [23]:
# 모델 학습 설정
input_size = 3  # time, file_name, operation 3개의 피처
hidden_size = 64
output_size = 1  # 이진 분류
num_layers = 1
model = LogLSTM(input_size, hidden_size, output_size, num_layers)

In [24]:
criterion = nn.BCELoss()  # 이진 교차 엔트로피 손실 함수
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [25]:
# 모델 학습
epochs = 10
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()

    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    loss.backward()
    optimizer.step()

    print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item()}")

Epoch [1/10], Loss: 0.675111711025238
Epoch [2/10], Loss: 0.6711539030075073
Epoch [3/10], Loss: 0.6671518087387085
Epoch [4/10], Loss: 0.6630805134773254
Epoch [5/10], Loss: 0.6589135527610779
Epoch [6/10], Loss: 0.6546230912208557
Epoch [7/10], Loss: 0.6501808762550354
Epoch [8/10], Loss: 0.6455557346343994
Epoch [9/10], Loss: 0.6407127976417542
Epoch [10/10], Loss: 0.6356114745140076


In [26]:
# 모델 평가
model.eval()
with torch.no_grad():
    predictions = model(X_test)
    predictions = (predictions > 0.5).float()
    accuracy = (predictions == y_test).float().mean()
    print(f"Test Accuracy: {accuracy.item() * 100:.2f}%")

Test Accuracy: 76.85%
