In [1]:
# Amazon Braket SDK 설치
!pip install amazon-braket-sdk pandas matplotlib

# 필요한 라이브러리 임포트
import boto3
from braket.circuits import Circuit
from braket.aws import AwsQuantumTask
from braket.devices import LocalSimulator
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt



In [2]:
# Colab에서 Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

# 필요한 라이브러리 설치
!pip install --upgrade pip
!pip install tensorflow==2.7
!pip install sympy
import pandas as pd
from sklearn.utils import resample
from sklearn.model_selection import train_test_split
import numpy as np

# 데이터 로드
file_path = "/content/drive/MyDrive/QNN/archive/NF-UNSW-NB15.csv"
df = pd.read_csv(file_path)

# 특성 선택 및 전처리
selected_features = [
    'PROTOCOL', 'L7_PROTO', 'IN_BYTES', 'OUT_BYTES',
    'IN_PKTS', 'OUT_PKTS', 'TCP_FLAGS', 'FLOW_DURATION_MILLISECONDS', 'Label'
]
df_selected = df[selected_features]

# 불균형 데이터 처리
benign = df_selected[df_selected['Label'] == 0]
malicious = df_selected[df_selected['Label'] == 1]
benign_downsampled = resample(benign, replace=False, n_samples=len(malicious), random_state=123)
df_balanced = pd.concat([benign_downsampled, malicious])

# 데이터 분할
train, test = train_test_split(df_balanced, test_size=0.15, random_state=1)

# 특성 인코딩
def encode_features(df, features):
    df_encoded = df.copy()
    for feature in features:
        if feature in df_encoded.columns:
            max_value = df_encoded[feature].max()
            df_encoded[feature] = df_encoded[feature] / max_value * np.pi
            df_encoded[feature] = np.round(df_encoded[feature] / 0.25) * 0.25  # 양자화
    return df_encoded

features_to_encode = ['PROTOCOL', 'L7_PROTO', 'IN_BYTES', 'OUT_BYTES',
                      'IN_PKTS', 'OUT_PKTS', 'TCP_FLAGS', 'FLOW_DURATION_MILLISECONDS']

train_encoded = encode_features(train, features_to_encode)
test_encoded = encode_features(test, features_to_encode)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
[31mERROR: Could not find a version that satisfies the requirement tensorflow==2.7 (from versions: 2.8.0rc0, 2.8.0rc1, 2.8.0, 2.8.1, 2.8.2, 2.8.3, 2.8.4, 2.9.0rc0, 2.9.0rc1, 2.9.0rc2, 2.9.0, 2.9.1, 2.9.2, 2.9.3, 2.10.0rc0, 2.10.0rc1, 2.10.0rc2, 2.10.0rc3, 2.10.0, 2.10.1, 2.11.0rc0, 2.11.0rc1, 2.11.0rc2, 2.11.0, 2.11.1, 2.12.0rc0, 2.12.0rc1, 2.12.0, 2.12.1, 2.13.0rc0, 2.13.0rc1, 2.13.0rc2, 2.13.0, 2.13.1, 2.14.0rc0, 2.14.0rc1, 2.14.0, 2.14.1, 2.15.0rc0, 2.15.0rc1, 2.15.0, 2.15.0.post1, 2.15.1, 2.16.0rc0, 2.16.1, 2.16.2, 2.17.0rc0, 2.17.0rc1, 2.17.0)[0m[31m
[0m[31mERROR: No matching distribution found for tensorflow==2.7[0m[31m


In [3]:
# QNN 모델을 위한 양자 회로 생성 함수
def create_quantum_model(num_qubits, num_layers):
    circuit = Circuit()

    # 각 특성값을 양자 회로에 인코딩
    for i in range(num_qubits):
        circuit.rx(i, 0)  # 큐비트 i에 rx 게이트 적용

    # QNN 레이어 추가
    for _ in range(num_layers):
        for i in range(num_qubits - 1):
            circuit.cnot(i, i + 1)
            circuit.rz(i, np.pi/4)
            circuit.cnot(i, i + 1)

    return circuit

# 회로 실행 및 결과 가져오기
def run_circuit(circuit, shots=1000):
    device = LocalSimulator()  # 로컬 시뮬레이터 사용
    result = device.run(circuit, shots=shots).result()
    counts = result.measurement_counts
    return counts

# 결과 정규화 함수
def normalize_counts(counts):
    total = sum(counts.values())
    normalized_counts = {k: v / total for k, v in counts.items()}
    return normalized_counts

# 데이터를 양자 회로로 변환
def convert_to_circuits(data, num_qubits, num_layers):
    circuits = []
    for sample in data:
        circuit = create_quantum_model(num_qubits, num_layers)
        for i, x in enumerate(sample):
            circuit.rx(i, x)
        circuits.append(circuit)
    return circuits

num_qubits = len(features_to_encode)
num_layers = 2  # Simple QNN의 레이어 수

# 학습 및 테스트 데이터 변환
train_circuits = convert_to_circuits(train_encoded.drop(columns=['Label']).values.tolist(), num_qubits, num_layers)
test_circuits = convert_to_circuits(test_encoded.drop(columns=['Label']).values.tolist(), num_qubits, num_layers)

In [None]:
# 각 회로를 실행하여 결과를 가져옴
def execute_circuits(circuits):
    results = []
    for circuit in circuits:
        counts = run_circuit(circuit)
        normalized_counts = normalize_counts(counts)
        results.append(normalized_counts)
    return results

# 학습 및 테스트 데이터 실행
train_results = execute_circuits(train_circuits)
test_results = execute_circuits(test_circuits)

# 간단한 예시로, '000' 상태가 많이 측정된 경우를 정상(0), 그렇지 않은 경우를 악성(1)으로 분류
def classify(result):
    return 0 if result.get('000', 0) > 0.5 else 1

# 학습 데이터 예측
y_train_pred = [classify(result) for result in train_results]
y_test_pred = [classify(result) for result in test_results]

# 정확도 계산
train_accuracy = np.mean(train_encoded['Label'].values == y_train_pred)
test_accuracy = np.mean(test_encoded['Label'].values == y_test_pred)

print(f"Train Accuracy: {train_accuracy}")
print(f"Test Accuracy: {test_accuracy}")

Train Accuracy: 0.5009342757332034
Test Accuracy: 0.49470582819261577
