In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Używane urządzenie: {DEVICE}")

# Parametry
N_SAMPLES = 1000
SEQ_LENGTH = 128
X_MIN, X_MAX = -5, 5
T_STEPS = 100         
BETA_START = 0.0001
BETA_END = 0.02
BATCH_SIZE = 64
LR = 1e-3
EPOCHS = 2000             

# ==========================================
# 1. PRZYGOTOWANIE DANYCH
# ==========================================

def generate_random_curves(n_samples, seq_length):
    data = []
    x = np.linspace(X_MIN, X_MAX, seq_length)
    for _ in range(n_samples):
        freq1 = np.random.uniform(0.5, 3.0)
        freq2 = np.random.uniform(0.5, 3.0)
        phase = np.random.uniform(0, 2*np.pi)
        amp1 = np.random.uniform(0.5, 1.5)
        amp2 = np.random.uniform(0.1, 0.5)
        trend = np.random.uniform(-0.1, 0.1) * x
        y = amp1 * np.sin(freq1 * x + phase) + amp2 * np.cos(freq2 * x) + trend
        # Normalizacja
        y = (y - y.mean()) / (y.std() + 1e-8)
        data.append(y)
    return np.array(data, dtype=np.float32), x

curves_data, x_axis = generate_random_curves(N_SAMPLES, SEQ_LENGTH)

class SignalDataset(Dataset):
    def __init__(self, data):
        self.data = torch.from_numpy(data).unsqueeze(1)
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idx):
        return self.data[idx]

dataloader = DataLoader(SignalDataset(curves_data), batch_size=BATCH_SIZE, shuffle=True)

# Harmonogram Dyfuzji
betas = torch.linspace(BETA_START, BETA_END, T_STEPS).to(DEVICE)
alphas = 1.0 - betas
alphas_cumprod = torch.cumprod(alphas, dim=0).to(DEVICE)

# # ==========================================
# # 2. MODELE 
# # ==========================================

class SinusoidalPosEmb(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.dim = dim
    def forward(self, t):
        device = t.device
        half_dim = self.dim // 2
        emb = torch.log(torch.tensor(10000.0)) / (half_dim - 1)
        emb = torch.exp(torch.arange(half_dim, device=device) * -emb)
        emb = t[:, None].float() * emb[None, :]
        emb = torch.cat((torch.sin(emb), torch.cos(emb)), dim=1)
        return emb

class DiffusionModel(nn.Module):
    def __init__(self, t_emb_dim=32):
        super().__init__()
        self.time_mlp = nn.Sequential(
            SinusoidalPosEmb(t_emb_dim),
            nn.Linear(t_emb_dim, t_emb_dim),
            nn.ReLU()
        )
        self.time_emb_proj = nn.Linear(t_emb_dim, 64)
        
        self.conv1 = nn.Conv1d(1, 32, 5, padding=2)
        self.conv2 = nn.Conv1d(32, 64, 5, padding=2)
        self.conv3 = nn.Conv1d(64, 32, 5, padding=2)
        self.conv4 = nn.Conv1d(32, 1, 5, padding=2)
        self.relu = nn.ReLU()

    def forward(self, x, t):
        t_emb = self.time_mlp(t)
        t_emb = self.time_emb_proj(t_emb)[:, :, None]
        
        h = self.relu(self.conv1(x))
        h = self.relu(self.conv2(h))
        h = h + t_emb 
        h = self.relu(self.conv3(h))
        h = self.conv4(h)
        return h

# Denoising Autoencoder (DAE) ---
class DenoisingAutoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 32, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv1d(64, 32, 3, padding=1),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            nn.Conv1d(32, 1, 3, padding=1)
        )
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

# ==========================================
# 3. FUNKCJE SAMPLOWANIA
# ==========================================

def forward_diffusion_sample(x0, t):
    sqrt_alpha = torch.sqrt(alphas_cumprod[t]).view(-1, 1, 1)
    sqrt_one_minus = torch.sqrt(1 - alphas_cumprod[t]).view(-1, 1, 1)
    noise = torch.randn_like(x0)
    return sqrt_alpha * x0 + sqrt_one_minus * noise, noise

@torch.no_grad()
def sample_reverse_process(model, x_noisy):
    model.eval()
    x = x_noisy.clone()
    for t_inv in reversed(range(T_STEPS)):
        t = torch.tensor([t_inv], device=DEVICE).expand(x.shape[0])
        pred_noise = model(x, t)
        
        alpha = alphas[t_inv]
        alpha_hat = alphas_cumprod[t_inv]
        beta = betas[t_inv]
        
        if t_inv > 0:
            noise = torch.randn_like(x)
        else:
            noise = 0
            
        x = (1 / torch.sqrt(alpha)) * (x - ((1 - alpha) / (torch.sqrt(1 - alpha_hat))) * pred_noise) + torch.sqrt(beta) * noise
    return x

# ==========================================
# 4. TRENING 
# ==========================================

diff_model = DenoiseCNN_withSinusoidalPosEmb().to(DEVICE)
dae_model = DenoisingAutoencoder().to(DEVICE)

opt_diff = optim.Adam(diff_model.parameters(), lr=LR)
opt_dae = optim.Adam(dae_model.parameters(), lr=LR)
criterion = nn.MSELoss()

print("\n--- Rozpoczynanie treningu (Dyfuzja + Autoencoder) ---")

for epoch in range(EPOCHS):
    loss_d, loss_a = 0, 0
    for x0 in dataloader:
        x0 = x0.to(DEVICE)
        
        # 1. Trenowanie Dyfuzji
        t = torch.randint(0, T_STEPS, (x0.shape[0],), device=DEVICE).long()
        xt, noise = forward_diffusion_sample(x0, t)
        
        opt_diff.zero_grad()
        pred_noise = diff_model(xt, t)
        l_diff = criterion(pred_noise, noise)
        l_diff.backward()
        opt_diff.step()
        loss_d += l_diff.item()
        
        # 2. Trenowanie Autoencodera
        # DAE dostaje wejście zaszumione stałym poziomem szumu
        dae_noise = torch.randn_like(x0) * 0.3
        x_noisy_dae = x0 + dae_noise
        
        opt_dae.zero_grad()
        x_rec = dae_model(x_noisy_dae)
        l_dae = criterion(x_rec, x0)
        l_dae.backward()
        opt_dae.step()
        loss_a += l_dae.item()

    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1} | Diff Loss: {loss_d/len(dataloader):.4f} | DAE Loss: {loss_a/len(dataloader):.4f}")

# ==========================================
# 5. PORÓWNANIE I WYNIKI
# ==========================================

print("\n--- Generowanie wyników porównawczych ---")

# Dane testowe
test_y, _ = generate_random_curves(1, SEQ_LENGTH)
x_test = torch.tensor(test_y).unsqueeze(1).to(DEVICE)

# Dodanie szumu do testu
noise_level = 0.3
input_noise = torch.randn_like(x_test) * noise_level
x_noisy = x_test + input_noise

# Konwersja do numpy dla metod klasycznych i wykresów
y_clean = x_test.squeeze().cpu().numpy()
y_noisy = x_noisy.squeeze().cpu().numpy()

# 1. Metoda: Savitzky-Golay (Klasyczna)
start = time.time()
y_sg = savgol_filter(y_noisy, window_length=15, polyorder=3)
time_sg = time.time() - start

# 2. Metoda: Denoising Autoencoder (DL Baseline)
dae_model.eval()
start = time.time()
with torch.no_grad():
    y_dae = dae_model(x_noisy).squeeze().cpu().numpy()
time_dae = time.time() - start

# 3. Metoda: Twoja Dyfuzja
diff_model.eval()
start = time.time()

# Traktujemy x_noisy jako punkt startowy w procesie dyfuzji (x_T).
x_diff_out = sample_reverse_process(diff_model, x_noisy) 
time_diff = time.time() - start
y_diff = x_diff_out.squeeze().cpu().numpy()

# Obliczenie metryk
results = {
    "Metoda": ["Zaszumiony (Brak)", "Savitzky-Golay", "Autoencoder (DAE)", "Twój Model (Dyfuzja)"],
    "MSE": [
        mean_squared_error(y_clean, y_noisy),
        mean_squared_error(y_clean, y_sg),
        mean_squared_error(y_clean, y_dae),
        mean_squared_error(y_clean, y_diff)
    ],
    "Czas [s]": [0, time_sg, time_dae, time_diff]
}

df = pd.DataFrame(results)
print("\nTABELA WYNIKÓW:")
print(df)

# Wykres
plt.figure(figsize=(14, 8))
plt.plot(x_axis, y_clean, 'k-', linewidth=3, label="Oryginał (Ground Truth)", alpha=0.5)
plt.plot(x_axis, y_noisy, 'k.', label="Zaszumiony", alpha=0.3)

plt.plot(x_axis, y_sg, 'g--', linewidth=2, label=f"Savitzky-Golay (MSE={df.iloc[1]['MSE']:.4f})")
plt.plot(x_axis, y_dae, 'b--', linewidth=2, label=f"Autoencoder (MSE={df.iloc[2]['MSE']:.4f})")
plt.plot(x_axis, y_diff, 'r-', linewidth=2, label=f"Dyfuzja (MSE={df.iloc[3]['MSE']:.4f})")

plt.title("Porównanie modelu dyfuzyjnego Denoise z istniejącymi narzędziami")
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()