In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.datasets import ImageFolder
import torchvision.transforms.functional as TF
from torchvision.utils import save_image
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from decimal import Decimal, getcontext
import time
import csv

Arithmetic Coding 

In [None]:
getcontext().prec = 100  # Increase precision

def arithmetic_compress(data):
    freq = {}
    for symbol in data:
        freq[symbol] = freq.get(symbol, 0) + 1
    total = sum(freq.values())

    low = {}
    high = {}
    cumulative = Decimal(0)
    for symbol, count in sorted(freq.items()):
        low[symbol] = cumulative
        high[symbol] = cumulative + Decimal(count) / Decimal(total)
        cumulative = high[symbol]

    l = Decimal(0)
    h = Decimal(1)
    for symbol in data:
        range_ = h - l
        h = l + range_ * high[symbol]
        l = l + range_ * low[symbol]

    code = (l + h) / 2
    return code, low, high



Convolutional Autoencoder


In [None]:
class ECG_CAE(nn.Module):
    def __init__(self):
        super(ECG_CAE, self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.ReLU(),
            nn.Conv2d(32, 16, kernel_size=5, stride=2, padding=2),
            nn.ReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(16, 32, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16, 1, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        _, _, h, w = x.size()
        decoded = decoded[:, :, :h, :w]
        return encoded, decoded



transforms

In [None]:
transform = transforms.Compose([
    transforms.Resize((296, 1024)),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

def get_dataloader(data_dir, batch_size=16, shuffle=True):
    dataset = ImageFolder(data_dir, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, pin_memory=True)



Training 

In [None]:
def train_model(model, dataloader, device, num_epochs=10, lr=1e-3, save_path="ecg_cae.pth"):
    model.train()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        total_loss = 0.0
        for images, _ in dataloader:
            images = images.to(device, non_blocking=True)
            _, outputs = model(images)
            loss = criterion(outputs, images)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.6f}")

    torch.save(model.state_dict(), save_path)
    print(f"Model saved to {save_path}")



In [None]:
durations = []
def process_folder(model, folder_path, device, output_folder="cae_arithmetirc"):
    model.eval()
    os.makedirs(output_folder, exist_ok=True)

    image_files = [f for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))]

    total_original_size = 0
    total_compressed_size = 0

    for filename in image_files:
        start_time = time.time()
        img_path = os.path.join(folder_path, filename)
        image = Image.open(img_path).convert("L")
        resized = transform(image).unsqueeze(0).to(device)

        with torch.no_grad():
            encoded, decoded = model(resized)

        # Save reconstructed image
        output_img = decoded.cpu().squeeze(0).squeeze(0).numpy()
        output_img = (output_img * 0.5) + 0.5
        output_img = np.clip(output_img, 0, 1)
        output_pil = Image.fromarray((output_img * 255).astype(np.uint8))
        output_pil.save(os.path.join(output_folder, filename))

        # Compress encoded features
        encoded_flat = encoded.cpu().detach().numpy().flatten()
        encoded_flat = (encoded_flat * 255).astype(np.uint8)
        encoded_list = list(encoded_flat)

        code, low, high = arithmetic_compress(encoded_list)
        compressed_size_bits = code.adjusted() + 1
        compressed_size_bytes = compressed_size_bits / 8

        print(f"{filename} done")
        end_time = time.time()
        duration = end_time - start_time
        durations.append((filename, duration))
        
    csv_path = 'cae_arithmetic_durations.csv'
    with open(csv_path, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(['Image Name', 'Duration (seconds)'])
        writer.writerows(durations)
    
    print(f"✅ Duration tracking complete. Saved to: {csv_path}")



In [None]:
if __name__ == '__main__':
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    model = ECG_CAE().to(device)

    if not os.path.exists("ecg_cae.pth"):
        data_dir = "org_train"
        dataloader = get_dataloader(data_dir, batch_size=16, shuffle=True)
        train_model(model, dataloader, device, num_epochs=40)

    if os.path.exists("ecg_cae.pth"):
        model.load_state_dict(torch.load("ecg_cae.pth", map_location=device))
        print("Model loaded.")

    # Process the entire folder
    process_folder(model, folder_path="org_ecg_10sec_resize", device=device)
