In [6]:
# PyTorch 라이브러리 임포트
import torch  # PyTorch 텐서 및 신경망 연산을 위한 핵심 라이브러리
import torch.nn as nn  # 신경망 모델을 정의할 때 필요한 클래스와 함수들
import torch.optim as optim  # 옵티마이저를 위한 라이브러리 (예: Adam, SGD)
import torch.nn.functional as F  # 신경망에서 자주 사용하는 함수들 (예: ReLU, CrossEntropyLoss)
from torch.utils.data import DataLoader, TensorDataset  # 데이터셋 처리 및 DataLoader 제공

# 데이터 처리 및 분석을 위한 라이브러리
import pandas as pd  # 데이터프레임을 다루는 데 사용되는 라이브러리 (CSV 파일 읽기 등)
import numpy as np  # 고성능 수치 연산을 위한 라이브러리 (배열 및 행렬 연산)

# 데이터 전처리를 위한 scikit-learn의 도구들
from sklearn.preprocessing import LabelEncoder, StandardScaler  # 범주형 데이터를 숫자로 변환 및 데이터 스케일링

# 모델과 전처리된 데이터를 파일로 저장하고 불러오기 위한 라이브러리
import joblib  # 객체를 파일로 직렬화하여 저장하거나 불러오는 데 사용



# CNN 모델 정의
class CNN(nn.Module):
    def __init__(self, input_size, num_classes):
        super(CNN, self).__init__()
        # 1D Convolutional Layer 정의
        self.conv1 = nn.Conv1d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        # Max Pooling Layer 정의
        self.pool = nn.MaxPool1d(2)
        # Fully Connected Layer 정의
        self.fc1 = nn.Linear(64 * (input_size // 2), 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        # Conv1D 입력을 위한 채널 추가
        x = x.unsqueeze(1)  
        x = F.relu(self.conv1(x))  # 첫 번째 Convolutional Layer
        x = self.pool(F.relu(self.conv2(x)))  # 두 번째 Convolutional Layer + Max Pooling
        x = x.view(x.size(0), -1)  # Flatten
        x = F.relu(self.fc1(x))  # Fully Connected Layer 1
        x = self.fc2(x)  # 출력 레이어
        return x

# 데이터 준비 함수
def prepare_training_data(data_path, is_training=True):
    data = pd.read_csv(data_path)
    
    # 데이터에서 'id' 컬럼을 삭제
    if 'id' in data.columns:
        data = data.drop('id', axis=1)
    
    # 공격 카테고리 정의 및 매핑
    attack_categories = ['Normal', 'Fuzzers', 'Analysis', 'Backdoor', 'DoS', 'Exploits', 
                         'Generic', 'Reconnaissance', 'Shellcode', 'Worms']
    attack_cat_map = {cat: idx for idx, cat in enumerate(attack_categories)}
    data['attack_cat'] = data['attack_cat'].map(attack_cat_map)

    # Null 값이 있는 행을 삭제
    if data['attack_cat'].isnull().any():
        data = data.dropna(subset=['attack_cat'])

    # 훈련일 경우, 'attack_cat'과 'label'을 제외한 데이터를 X로 설정하고 'attack_cat'을 y로 설정
    if is_training:
        y = data['attack_cat'].values
        X = data.drop(['attack_cat', 'label'], axis=1)
    else:
        X = data.drop(['attack_cat', 'label'], axis=1)

    # 범주형 컬럼에 대해 라벨 인코딩
    categorical_cols = ['proto', 'service', 'state', 'is_ftp_login', 'ct_ftp_cmd', 'ct_flw_http_mthd']
    label_encoders = {}
    for col in categorical_cols:
        label_encoders[col] = LabelEncoder()
        X[col] = label_encoders[col].fit_transform(X[col].astype(str))

    # 데이터 스케일링
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # numpy float32로 형 변환
    X = X.astype(np.float32)

    # 인코더와 스케일러 저장
    joblib.dump(label_encoders, 'label_encoders.pkl')
    joblib.dump(scaler, 'scaler.pkl')
    
    return X, y, label_encoders, scaler

def prepare_test_data(data_path):
    # 데이터 로드: CSV 파일에서 데이터 읽기
    data = pd.read_csv(data_path)
    
    # 'id' 컬럼이 있으면 제거
    if 'id' in data.columns:
        data = data.drop('id', axis=1)
    
    # 공격 카테고리 목록 정의
    attack_categories = ['Normal', 'Fuzzers', 'Analysis', 'Backdoor', 'DoS', 'Exploits', 
                         'Generic', 'Reconnaissance', 'Shellcode', 'Worms']
    # 공격 카테고리별 인덱스 매핑
    attack_cat_map = {cat: idx for idx, cat in enumerate(attack_categories)}

    # 'attack_cat' 컬럼의 문자열 값을 정수로 변환
    data['attack_cat'] = data['attack_cat'].map(attack_cat_map)

    # 'attack_cat' 값이 NaN인 행 제거
    if data['attack_cat'].isnull().any():
        data = data.dropna(subset=['attack_cat'])

    # 'attack_cat'과 'label' 컬럼을 제외한 모든 컬럼을 X로 설정
    X = data.drop(['attack_cat', 'label'], axis=1)

    # 학습된 레이블 인코더와 스케일러 불러오기
    label_encoders = joblib.load('label_encoders.pkl')
    scaler = joblib.load('scaler.pkl')

    # 범주형 컬럼을 정수로 변환
    categorical_cols = ['proto', 'service', 'state', 'is_ftp_login', 'ct_ftp_cmd', 'ct_flw_http_mthd']
    for col in categorical_cols:
        # 각 범주형 컬럼에 대해 인코딩 적용
        X[col] = label_encoders[col].transform(X[col].astype(str))

    # 스케일러를 사용하여 특성 값 정규화
    X = scaler.transform(X)

    # X를 float32 타입으로 변환 (모델에 맞는 데이터 타입)
    X = X.astype(np.float32)

    return X
    
# 훈련 함수
def train(device, model, train_loader, criterion, optimizer):
    model.train()
    total_loss = 0
    # 미니배치로 훈련
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)  # 데이터와 레이블을 device로 이동
        optimizer.zero_grad()  # 이전 그래디언트 초기화
        output = model(x)  # 모델을 통해 예측
        loss = criterion(output, y)  # 손실 계산
        loss.backward()  # 역전파
        optimizer.step()  # 옵티마이저 단계 진행
        total_loss += loss.item()
    return total_loss / len(train_loader)  # 평균 손실 반환

# 정확도 계산 함수
def accuracy(y_true, y_pred):
    return accuracy_score(y_true, y_pred)

# 검증 함수
def validate(device, model, test_loader, criterion):
    model.eval()
    total_loss = 0
    correct = 0
    # 평가 모드로 변환 후, 기울기 계산 안함
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(device), y.to(device)  # 데이터와 레이블을 device로 이동
            output = model(x)  # 모델을 통해 예측
            loss = criterion(output, y)  # 손실 계산
            total_loss += loss.item()
            pred = output.argmax(dim=1)  # 예측값이 가장 큰 인덱스 선택
            correct += pred.eq(y).sum().item()  # 정확도 계산
    accuracy = correct / len(test_loader.dataset)
    return total_loss / len(test_loader), accuracy  # 평균 손실과 정확도 반환


# 테스트 함수
def test(model, test_loader, device):
    model.eval()
    predictions = []
    
    with torch.no_grad():
        for X_batch in test_loader:
            X_batch = X_batch[0].to(device)  # 배치 데이터를 device로 이동
            outputs = model(X_batch)  # 모델 예측
            _, predicted = torch.max(outputs, 1)  # 가장 큰 값의 인덱스를 예측값으로 사용
            predictions.extend(predicted.cpu().numpy())  # 예측값 저장

    return predictions

# 공격 카테고리 개수 출력 함수
def check_attack_category_counts(data_path):
    data = pd.read_csv(data_path)
    
    # 공격 카테고리 목록
    attack_categories = ['Normal', 'Fuzzers', 'Analysis', 'Backdoor', 'DoS', 'Exploits', 
                         'Generic', 'Reconnaissance', 'Shellcode', 'Worms']
    
    # 각 공격 카테고리의 샘플 개수 출력
    attack_counts = data['attack_cat'].value_counts()
    print("Attack category counts in test data:")
    for category in attack_categories:
        count = attack_counts.get(category, 0)
        print(f"{category}: {count} samples")

# 메인 실행 코드
if __name__ == '__main__':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # GPU 또는 CPU 설정
    filepath = './training-set.csv'
    
    # 훈련 데이터 준비
    X_train, y_train, label_encoders, scaler = prepare_training_data('./training-set.csv', is_training=True)
    
    from sklearn.model_selection import train_test_split
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

    # 데이터셋 및 DataLoader 생성
    train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.long))
    val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.long))
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

    # 모델, 손실 함수, 옵티마이저 설정
    model = CNN(input_size=X_train.shape[1], num_classes=len(np.unique(y_train))).to(device)
    print("num_classes:", len(np.unique(y_train)))
    criterion = nn.CrossEntropyLoss()  # CrossEntropyLoss는 다중 클래스 분류에서 자주 사용됨
    optimizer = optim.Adam(model.parameters(), lr=1e-4)  # Adam 옵티마이저 사용
    
    # 학습 루프 설정
    epochs = 100
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)  # 학습률 스케줄러 설정
    for epoch in range(epochs):
        # 훈련 및 검증
        train_loss = train(device, model, train_loader, criterion, optimizer)
        val_loss, val_acc = validate(device, model, val_loader, criterion)
        scheduler.step()  # 학습률 스케줄러 업데이트
        # 에폭, 훈련 손실, 검증 손실, 검증 정확도 출력
        print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc*100:.2f}%')

    # 모델 저장
    model_path = './model.pth'
    torch.save(model.state_dict(), model_path)  # 모델 상태 저장
    print(f'Model saved to {model_path}')

    # 테스트 데이터 준비
    X_test = prepare_test_data(filepath)
    test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32))
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
    
    # 테스트
    predictions = test(model, test_loader, device)
    
    # 결과 출력
    print("Test predictions:", predictions)


num_classes: 10
Epoch 1/100, Train Loss: 0.7735, Val Loss: 0.6535, Val Acc: 76.37%
Epoch 2/100, Train Loss: 0.6247, Val Loss: 0.6110, Val Acc: 76.97%
Epoch 3/100, Train Loss: 0.5878, Val Loss: 0.5780, Val Acc: 78.27%
Epoch 4/100, Train Loss: 0.5645, Val Loss: 0.5590, Val Acc: 78.71%
Epoch 5/100, Train Loss: 0.5503, Val Loss: 0.5465, Val Acc: 79.18%
Epoch 6/100, Train Loss: 0.5391, Val Loss: 0.5324, Val Acc: 79.63%
Epoch 7/100, Train Loss: 0.5312, Val Loss: 0.5300, Val Acc: 79.64%
Epoch 8/100, Train Loss: 0.5249, Val Loss: 0.5261, Val Acc: 79.97%
Epoch 9/100, Train Loss: 0.5189, Val Loss: 0.5153, Val Acc: 79.69%
Epoch 10/100, Train Loss: 0.5137, Val Loss: 0.5183, Val Acc: 79.71%
Epoch 11/100, Train Loss: 0.5092, Val Loss: 0.5130, Val Acc: 80.12%
Epoch 12/100, Train Loss: 0.5050, Val Loss: 0.5095, Val Acc: 79.92%
Epoch 13/100, Train Loss: 0.5009, Val Loss: 0.5020, Val Acc: 80.36%
Epoch 14/100, Train Loss: 0.4972, Val Loss: 0.5071, Val Acc: 79.97%
Epoch 15/100, Train Loss: 0.4944, Val Los