<h1> Neural Net </h1>

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
import subprocess
import time
import os
import glob
from torch.utils.data import Dataset, DataLoader
from lucam import Lucam

# --- 기본 설정 (이전과 동일) ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
N = 600
total_cam_roi = (450, -500, 450, -500) # Top Bottom Left Right
first_order_cam_roi = (500, -450, 450, -500) # Top Bottom Left Right

LEARNING_RATE_MODEL = 1e-3
LEARNING_RATE_PHASE = 1e-2

# --- 🧠 중간 깊이의 뉴럴 네트워크 모델 정의 ---
class MediumUNetPropagation(nn.Module):
    def __init__(self, in_channels=2, out_channels=1):
        super(MediumUNetPropagation, self).__init__()

        # --- 인코더 (Contracting Path) ---
        # Level 1
        self.enc1 = self.conv_block(in_channels, 64)
        # Level 2
        self.enc2 = self.conv_block(64, 128)
        # Level 3 (추가된 깊이)
        self.enc3 = self.conv_block(128, 256)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # --- 병목 구간 (Bottleneck) ---
        self.bottleneck = self.conv_block(256, 512)

        # --- 디코더 (Expanding Path) ---
        # Level 3
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = self.conv_block(256 + 256, 256) # Skip connection 포함
        # Level 2
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = self.conv_block(128 + 128, 128) # Skip connection 포함
        # Level 1
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = self.conv_block(64 + 64, 64)   # Skip connection 포함

        # --- 최종 출력 레이어 ---
        self.out_conv = nn.Conv2d(64, out_channels, kernel_size=1)

    def conv_block(self, in_c, out_c):
        """두 개의 3x3 Conv와 ReLU, BatchNorm으로 구성된 기본 블록"""
        return nn.Sequential(
            nn.Conv2d(in_c, out_c, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_c),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_c, out_c, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_c),
            nn.ReLU(inplace=True)
        )

    def forward(self, phase_map):
        # ‼️ 입력 변환: φ -> [cos(φ), sin(φ)]
        if phase_map.dim() == 3: # (B, H, W) -> (B, 1, H, W)
            phase_map = phase_map.unsqueeze(1)

        x_cos = torch.cos(phase_map)
        x_sin = torch.sin(phase_map)
        x = torch.cat([x_cos, x_sin], dim=1) # (B, 2, N, N)

        # --- 인코더 ---
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool(e1))
        e3 = self.enc3(self.pool(e2))

        # --- 병목 ---
        b = self.bottleneck(self.pool(e3))

        # --- 디코더 + Skip Connections ---
        d3 = self.upconv3(b)
        d3 = torch.cat([d3, e3], dim=1)
        d3 = self.dec3(d3)

        d2 = self.upconv2(d3)
        d2 = torch.cat([d2, e2], dim=1)
        d2 = self.dec2(d2)

        d1 = self.upconv1(d2)
        d1 = torch.cat([d1, e1], dim=1)
        d1 = self.dec1(d1)

        # --- 출력 ---
        out = self.out_conv(d1)
        return out.squeeze(1) # (B, N, N)

# --- 헬퍼 함수 정의 (이전과 동일) ---
def save_phase_as_image(phase_tensor, filename):
    phase_normalized = (phase_tensor.detach() + torch.pi) / (2 * torch.pi)
    phase_uint8 = (phase_normalized * 255).byte().cpu().numpy()
    Image.fromarray(phase_uint8).save(filename)

def load_and_preprocess_image(path, size=(N, N)):
    img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    if img is None: raise FileNotFoundError(f"'{path}' 파일을 찾을 수 없습니다.")
    img = cv2.resize(img, dsize=size)
    img_float = img.astype(np.float32) / np.max(img) # 0~1 사이로 정규화
    return torch.from_numpy(img_float).to('cpu')

# --- 🌟 데이터셋 클래스 정의 🌟 ---
class HolographyDataset(Dataset):
    def __init__(self, image_dir):
        # 이미지 파일 경로 리스트 가져오기
        self.image_paths = glob.glob(os.path.join(image_dir, '*.jpg')) + \
                           glob.glob(os.path.join(image_dir, '*.png'))
                           
        lam = 0.532e-6
        dx = 12.5e-6
        z = 100e-3

        lam = torch.tensor(lam).cuda()
        dx = torch.tensor(dx).cuda()
        z = torch.tensor(z).cuda()

        # 각 이미지에 대한 위상 텐서를 저장할 딕셔너리
        self.phase_tensors = {}
        for path in self.image_paths:
            # 초기 위상은 랜덤으로 생성
            phase = (torch.pi * torch.ones(N,N)).requires_grad_(True)
            self.phase_tensors[path] = phase
            
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        path = self.image_paths[idx]
        target_intensity = load_and_preprocess_image(path)
        target_intensity = target_intensity / torch.max(target_intensity).item()
        target_amplitude = torch.sqrt(target_intensity)
        phase_tensor = self.phase_tensors[path]
        return target_amplitude.to('cuda'), phase_tensor.to('cuda'), path # 경로도 함께 반환하여 추적

Using device: cuda


In [21]:
for i in range(44, 100):
    pattern = np.random.rand(600, 600) * 255
    cv2.imwrite('test.png', pattern)

    slm_process = subprocess.Popen(['python', 'test.py'])
    time.sleep(2)
    
    camera = Lucam()
    capture = camera.TakeSnapshot()
    capture = cv2.resize(capture, dsize=(600, 600))

    slm_process.terminate()
    slm_process.wait()
    
    cv2.imwrite(f'dataset/trainset/patterns/{i}.png', pattern)
    cv2.imwrite(f'dataset/trainset/captures/{i}.png', capture)

In [3]:
# --- 🌟 데이터셋 클래스 정의 🌟 ---
class SpeckleDataset(Dataset):
    def __init__(self, pattern_dir, capture_dir):
        # 이미지 파일 경로 리스트 가져오기
        self.pattern_paths = sorted(glob.glob(os.path.join(pattern_dir, '*.png')))
        self.target_paths = sorted(glob.glob(os.path.join(capture_dir, '*.png')))

    def __len__(self):
        return len(self.pattern_paths)
    
    def __getitem__(self, idx):
        pattern_path = self.pattern_paths[idx]
        # '패턴 이미지'는 0-255 grayscale로 저장된 위상 정보를 나타낸다고 가정
        pattern_img = cv2.imread(pattern_path, cv2.IMREAD_GRAYSCALE)
        pattern_img = cv2.resize(pattern_img, dsize=(N, N))
        
        # 0~255 값을 0~2π 범위의 위상(phase)으로 변환
        phase_map = (pattern_img.astype(np.float32) / 255.0) * 2 * np.pi
        phase_map_tensor = torch.from_numpy(phase_map)

        target_path = self.target_paths[idx]
        target_intensity = load_and_preprocess_image(target_path)
        target_intensity = target_intensity / torch.max(target_intensity).item()
        target_amplitude = torch.sqrt(target_intensity)

        return phase_map_tensor.to('cuda'), target_amplitude.to('cuda')

# --- 변수 및 모델 초기화 ---
model = MediumUNetPropagation().to(device)

# 🌟 데이터셋 및 데이터로더 생성
# 'images' 폴더에 학습용 이미지를 넣어주세요.
train_dataset = SpeckleDataset(pattern_dir='./dataset/train/patterns',
                               capture_dir='./dataset/train/captures')
test_dataset = SpeckleDataset(pattern_dir='./dataset/test/patterns',
                               capture_dir='./dataset/test/captures')

# 미니배치 크기. GPU 메모리에 따라 조절.
BATCH_SIZE = 1
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# s1, s2 스케일 팩터. 이제 이미지마다 필요할 수 있으나, 우선은 공유
s = torch.tensor(1.0, device=device, requires_grad=True)

# ‼️ 옵티마이저 정의. 이제 위상 텐서는 데이터셋 안에 있으므로, 별도로 최적화
optimizer_model = optim.Adam(list(model.parameters()) + [s], lr=LEARNING_RATE_MODEL)
loss_fn = torch.nn.MSELoss()

In [None]:
from pytorch_msssim import ssim, ms_ssim # Multi-Scale SSIM이 더 성능이 좋을 수 있습니다.
import torch.nn.functional as F

NUM_EPOCHS = 2 # 전체 데이터셋을 몇 번 반복할지

for epoch in range(NUM_EPOCHS):
    print(f"\n{'='*20} Epoch {epoch + 1}/{NUM_EPOCHS} {'='*20}")
    
    # data_loader에서 미니배치 단위로 데이터를 가져옴
    model = model.train()
    for i, (pattern_amplitudes, target_amplitudes) in enumerate(train_loader):
        
        # --- 단계 1: 위상 업데이트 -
        optimizer_model.zero_grad()
        
        # U-Net 모델은 배치 입력을 처리할 수 있도록 수정됨
        prediction = model(pattern_amplitudes**2)

        loss_train = loss_fn(s * prediction, target_amplitudes**2)
        loss_train.backward()
        optimizer_model.step()
        
        if i%10==0:
            print(f"Epoch {epoch+1}, Batch {i+1} 모델 업데이트 완료. Train Loss : {loss_train.item():.6f}")



Epoch 1, Batch 1 모델 업데이트 완료. Train Loss : 0.245718
Epoch 1, Batch 11 모델 업데이트 완료. Train Loss : 0.051116
Epoch 1, Batch 21 모델 업데이트 완료. Train Loss : 0.040201
Epoch 1, Batch 31 모델 업데이트 완료. Train Loss : 0.034389
Epoch 1, Batch 41 모델 업데이트 완료. Train Loss : 0.032959
Epoch 1, Batch 51 모델 업데이트 완료. Train Loss : 0.032424
Epoch 1, Batch 61 모델 업데이트 완료. Train Loss : 0.031423
Epoch 1, Batch 71 모델 업데이트 완료. Train Loss : 0.031982

Epoch 2, Batch 1 모델 업데이트 완료. Train Loss : 0.031591
Epoch 2, Batch 11 모델 업데이트 완료. Train Loss : 0.031814
Epoch 2, Batch 21 모델 업데이트 완료. Train Loss : 0.031957
Epoch 2, Batch 31 모델 업데이트 완료. Train Loss : 0.031799


KeyboardInterrupt: 

In [12]:
# data_loader에서 미니배치 단위로 데이터를 가져옴
model = model.eval()
with torch.no_grad():
    losses = []
    for i, (pattern_amplitudes, target_amplitudes) in enumerate(test_loader):
            
        # U-Net 모델은 배치 입력을 처리할 수 있도록 수정됨
        prediction = model(pattern_amplitudes**2)

        loss_test = loss_fn(s * prediction, target_amplitudes**2)

        print(f"Batch {i+1} Test Loss : {loss_test:.6f}")

Batch 1 Test Loss : 0.031475
Batch 2 Test Loss : 0.031648
Batch 3 Test Loss : 0.031562
Batch 4 Test Loss : 0.031512
Batch 5 Test Loss : 0.032123
Batch 6 Test Loss : 0.031546
Batch 7 Test Loss : 0.031795
Batch 8 Test Loss : 0.031342
Batch 9 Test Loss : 0.032039
Batch 10 Test Loss : 0.032106
Batch 11 Test Loss : 0.031734
Batch 12 Test Loss : 0.031614
Batch 13 Test Loss : 0.031771
Batch 14 Test Loss : 0.031430
Batch 15 Test Loss : 0.031634
Batch 16 Test Loss : 0.031535
Batch 17 Test Loss : 0.031200
Batch 18 Test Loss : 0.032145
Batch 19 Test Loss : 0.031499
Batch 20 Test Loss : 0.031671


In [49]:
torch.save(model.state_dict(), 'speckle_model.pth')

In [4]:
# --- 변수 및 모델 초기화 ---
model = MediumUNetPropagation().to(device)
model.load_state_dict(torch.load('speckle_model.pth'))

<All keys matched successfully>

In [5]:
target = Image.open('apple.png').convert('L')
target = np.array(target)
target = cv2.resize(target, dsize=(600,600))
target = target / np.max(target)
target = np.sqrt(target)
target = torch.from_numpy(target).cuda()
target = target.type(torch.float32)

In [6]:
phase = torch.ones(600,600) * torch.pi
phase = phase.type(torch.float32)
phase = phase.requires_grad_()
optimizer = torch.optim.Adam([phase], lr=1e-2)
phase = phase.cuda()

In [7]:
model = model.eval()
loss_fn = torch.nn.MSELoss()
for i in range(100):
    optimizer.zero_grad()
    prediction = model(phase.unsqueeze(0))
    loss = loss_fn(prediction, target**2)
    loss.backward()
    optimizer.step()
    if i%10 == 0:
        print(f'Epoch {i}, Loss : {loss.item():.6f}')

  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 0, Loss : 0.367850
Epoch 10, Loss : 0.367850
Epoch 20, Loss : 0.367850
Epoch 30, Loss : 0.367850
Epoch 40, Loss : 0.367850
Epoch 50, Loss : 0.367850
Epoch 60, Loss : 0.367850
Epoch 70, Loss : 0.367850
Epoch 80, Loss : 0.367850
Epoch 90, Loss : 0.367850


In [8]:
phase = phase.detach().cpu().numpy()

In [10]:
phase = phase / np.max(phase) * 255
Image.fromarray(phase.astype('uint8')).save('test.png')