# Pontos
가이아 학습 전 보안을 위한 시스템

In [334]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, datasets, models
from torch.utils.data import Dataset, DataLoader
import torchvision.models as models

import random
import os
import glob
import numpy as np
import matplotlib.pyplot as plt

from PIL import Image
import cv2

In [335]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

## 하이퍼 파라미터

In [1]:
EPOCHS     = 300
BATCH_SIZE = 2

IMG_HEIGHT = 256
IMG_WIDTH = 256

MODEL_PATH = ""

## FGSM 어택

In [337]:
def fgsm_attack(image, epsilon, gradient):
    # 기울기값의 원소의 sign 값을 구함
    sign_gradient = gradient.sign()
    # 이미지 각 픽셀의 값을 sign_gradient 방향으로 epsilon 만큼 조절
    perturbed_image = image + epsilon * sign_gradient
    # [0,1] 범위를 벗어나는 값을 조절
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

In [338]:
def getPertubedImage(img_tensor, original_image, epsilon, model_path):
    model = torch.load(model_path)

    img_tensor.requires_grad_(True)
    output = model(img_tensor)

    # 오차값 구하기 (레이블 263은 웰시코기)
    loss = nn.MSELoss(output, original_image) 
    
    # 기울기값 구하기
    model.zero_grad()
    loss.backward()

    # 이미지의 기울기값을 추출
    gradient = img_tensor.grad.data
    print(gradient)
    perturbed_data = fgsm_attack(img_tensor, epsilon, gradient)
    return perturbed_data

## 데이터셋

In [339]:
trainlist = [file for file in glob.glob(os.getcwd()+"/DatasetTrain/*") if file.endswith('.png') or file.endswith('.PNG')]
testlist = [file for file in glob.glob(os.getcwd()+"/DatasetTest/*") if file.endswith('.png') or file.endswith('.PNG')]

print(trainlist)

['F:\\KaliNode\\programming\\Project-Gaea\\Project-Gaea\\Pontos/DatasetTrain\\001-Fighter01.png', 'F:\\KaliNode\\programming\\Project-Gaea\\Project-Gaea\\Pontos/DatasetTrain\\002-Fighter02.png', 'F:\\KaliNode\\programming\\Project-Gaea\\Project-Gaea\\Pontos/DatasetTrain\\003-Fighter03.png', 'F:\\KaliNode\\programming\\Project-Gaea\\Project-Gaea\\Pontos/DatasetTrain\\004-Fighter04.png']


In [357]:
class imagedata:
  def __init__(self, path,  w = 128, h = 192):
    self.outp = Image.open(path).convert("RGBA")
    
    self.outp = self.outp.resize((w, h))
    area = (0, 0, int(w/4), int(h/4))
    self.inp = self.outp.crop(area)
    
    
    mresult = Image.new("RGBA",(w, int(h/4)))
    for i in range(0, 4):
        mresult.paste(im=self.inp, box=(int(w/4*i), 0))
    self.inp = Image.new("RGBA",(w, h))
    for i in range(0, 4):
        self.inp.paste(im=mresult, box=(0, int(h/4*i)))

In [364]:
class CustomDataset(Dataset): 
  def __init__(self, paths, model_path):
    self.x_data = []
    self.y_data = None
    self.mix = {}
    self.model_path = model_path
    
    for path in paths:
        imgd = imagedata(path)
        img_transforms = transforms.Compose([
            transforms.Resize((256, 256), Image.BICUBIC),
            transforms.ToTensor(),
        ])
        img_tensor = img_transforms(imgd.inp)
        imgp_tensor = img_transforms(imgd.outp)
        #img_tensor = img_tensor.unsqueeze(0)
        #print(img_tensor.size())
        self.x_data.append(img_tensor)
        self.mix[img_tensor]=imgp_tensor

  # 총 데이터의 개수를 리턴
  def __len__(self): 
    return len(self.x_data)

  # 인덱스를 입력받아 그에 맵핑되는 입출력 데이터를 파이토치의 Tensor 형태로 리턴
  def __getitem__(self, idx): 
    if random.randint(0, 1) == 0 :
        #왜곡 X
        x = self.x_data[idx]
        y = torch.tensor(0)
    else :
        #왜곡 O
        x = self.x_data[idx]
        x = getPertubedImage(x, self.mix[x], random.uniform(0.01, 0.9), self.model_path)
        #x = fgsm_attack(img_tensor, epsilon, gradient)
        y = torch.tensor(1)
    #print(x.size())
    #print(y.size())
    return x, y

In [365]:
train_loader = DataLoader(CustomDataset(trainlist, MODEL_PATH), batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(CustomDataset(testlist, MODEL_PATH), batch_size=BATCH_SIZE, shuffle=True)

In [366]:
#print(datas.targets)        
print("< DATA >")
i = 0
for batch_idx, (data, target) in enumerate(train_loader):
    #print(data)
    print(target)
    i+=1
    if i == 4:
        break

< DATA >


PermissionError: [Errno 13] Permission denied: './models'

## ResNet

In [326]:
class BasicBlock(nn.Module):
    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(4, 16, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(16, 2, stride=1)
        self.layer2 = self._make_layer(32, 2, stride=2)
        self.layer3 = self._make_layer(64, 2, stride=2)
        self.linear = nn.Linear(4096, num_classes)

    def _make_layer(self, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(BasicBlock(self.in_planes, planes, stride))
            self.in_planes = planes
        return nn.Sequential(*layers)

    def forward(self, x):
        #print(x.size())
        out = F.relu(self.bn1(self.conv1(x)))
        #print(out.size())
        out = self.layer1(out)
        #print(out.size())
        out = self.layer2(out)
        #print(out.size())
        out = self.layer3(out)
        #print(out.size())
        out = F.avg_pool2d(out, 8)
        #print(out.size())
        out = out.view(out.size(0), -1)
        #print(out.size())
        out = self.linear(out)
        #print(out.size())
        return out

## Train

In [327]:
model = ResNet().to(DEVICE)
optimizer = optim.SGD(model.parameters(), lr=0.1,
                      momentum=0.9, weight_decay=0.0005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        #print("Train")
        output = model(data)
        #target=torch.argmax(target, dim=0)
        #print("--------------------------------------------------------------------------")
        #print("결과 : " + str(output))
        #print("결과 사이즈 : " + str(output.size()))
        #print("타겟 : " + str(target))
        #print("타겟 사이즈 : " + str(target.size()))
        #print("--------------------------------------------------------------------------")
        
        #loss = F.cross_entropy(output, torch.max(target,1)[1])
        loss = F.cross_entropy(output, target)
        
        loss.backward()
        optimizer.step()
        
        
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            #target=torch.argmax(target, dim=0)
            #print(output)
            #print(target)
            # 배치 오차를 합산
            test_loss += F.cross_entropy(output, target,
                                         reduction='sum').item()

            # 가장 높은 값을 가진 인덱스가 바로 예측값
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

## 실행

In [328]:
for epoch in range(1, EPOCHS + 1):
    scheduler.step()
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    
    print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(
          epoch, test_loss, test_accuracy))

[1] Test Loss: 746259349504.0000, Accuracy: 50.00%
[2] Test Loss: 221191840.0000, Accuracy: 50.00%
[3] Test Loss: 26300920.0000, Accuracy: 50.00%
[4] Test Loss: 1438159.8750, Accuracy: 50.00%
[5] Test Loss: 404183.8125, Accuracy: 50.00%
[6] Test Loss: 130880.3438, Accuracy: 50.00%
[7] Test Loss: 54138.9727, Accuracy: 0.00%
[8] Test Loss: 4288.1641, Accuracy: 50.00%
[9] Test Loss: 101859.1875, Accuracy: 50.00%
[10] Test Loss: 594.7425, Accuracy: 0.00%
[11] Test Loss: 0.0000, Accuracy: 100.00%
[12] Test Loss: 0.0000, Accuracy: 100.00%
[13] Test Loss: 35353.3379, Accuracy: 0.00%
[14] Test Loss: 140329.3438, Accuracy: 50.00%
[15] Test Loss: 2672880384.0000, Accuracy: 50.00%
[16] Test Loss: 0.0000, Accuracy: 100.00%
[17] Test Loss: 526058176.0000, Accuracy: 0.00%
[18] Test Loss: 23592894.5312, Accuracy: 0.00%
[19] Test Loss: 3024832.5000, Accuracy: 50.00%
[20] Test Loss: 5349598.0000, Accuracy: 50.00%
[21] Test Loss: 2125763.1875, Accuracy: 0.00%


KeyboardInterrupt: 