Module that downloads a ZIP file from a URL, extracts DICOM files, and converts them to OpenCV-compatible format

In [1]:
!pip install requests pydicom numpy opencv-python




[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: C:\Users\Sunidhi\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip





In [None]:
import os
import requests
import io
import zipfile
import pydicom
import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from typing import Generator, Tuple
import hashlib

class StreamedDicomProcessor:
    def __init__(self, save_dir: str):
        self.save_dir = save_dir
        os.makedirs(self.save_dir, exist_ok=True)

    def _get_zip_filename(self, url: str) -> str:
        """Generate a consistent filename for the ZIP file based on URL"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        return os.path.join(self.save_dir, f"zip_cache_{url_hash}.zip")

    def process_from_url(
        self, 
        url: str,
        target_format: str = "png"
    ) -> Generator[Tuple[str, str], None, None]:
        """
        Processes DICOM files one at a time from a ZIP URL.
        Yields (original_filename, saved_path) tuples.
        """
        # Check if ZIP file already exists locally
        zip_file_path = self._get_zip_filename(url)
        
        if os.path.exists(zip_file_path):
            print(f"Using existing ZIP file: {zip_file_path}")
            # Use existing ZIP file
            with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
                yield from self._process_zip_file(zip_file, target_format)
        else:
            print(f"Downloading ZIP file from: {url}")
            # Download and save ZIP file for future use
            response = requests.get(url, stream=True)
            response.raise_for_status()

            # Save ZIP file to disk
            with open(zip_file_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            
            print(f"ZIP file saved to: {zip_file_path}")
            
            # Process the downloaded ZIP file
            with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
                yield from self._process_zip_file(zip_file, target_format)

    def _process_zip_file(self, zip_file: zipfile.ZipFile, target_format: str) -> Generator[Tuple[str, str], None, None]:
        """Process DICOM files from an opened ZIP file"""
        for file_info in zip_file.infolist():
            if not file_info.filename.lower().endswith(('.dcm', '.dicom')):
                continue

            try:
                # Process one file at a time
                with zip_file.open(file_info) as dcm_file:
                    dicom = pydicom.dcmread(dcm_file, force=True)
                    img = self._process_dicom(dicom)
                    
                    # Save and yield
                    saved_path = self._save_image(
                        img, 
                        file_info.filename, 
                        target_format
                    )
                    yield (file_info.filename, saved_path)

            except Exception as e:
                #print(f"Skipped {file_info.filename}: {str(e)}")
                continue

    def _process_dicom(self, dicom: pydicom.Dataset) -> np.ndarray:
        """Process single DICOM file to OpenCV image"""
        # Handle windowing
        if 'WindowCenter' in dicom and 'WindowWidth' in dicom:
            img = self._apply_windowing(dicom)
        else:
            img = dicom.pixel_array
            img = self._normalize_to_8bit(img) if img.dtype != np.uint8 else img
        
        # Convert to BGR if grayscale
        return cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) if len(img.shape) == 2 else img

    def _normalize_to_8bit(self, array: np.ndarray) -> np.ndarray:
        """Normalize array to 0-255 range"""
        array = (array - array.min()) / (array.max() - array.min() + 1e-8)
        return (array * 255).astype(np.uint8)

    def _apply_windowing(self, dicom: pydicom.Dataset) -> np.ndarray:
        """Apply DICOM window/level parameters"""
        pixel_array = dicom.pixel_array.astype(np.float32)
        wc = dicom.WindowCenter[0] if isinstance(dicom.WindowCenter, pydicom.multival.MultiValue) else dicom.WindowCenter
        ww = dicom.WindowWidth[0] if isinstance(dicom.WindowWidth, pydicom.multival.MultiValue) else dicom.WindowWidth
        
        min_val = wc - ww/2
        max_val = wc + ww/2
        pixel_array = np.clip(pixel_array, min_val, max_val)
        return self._normalize_to_8bit(pixel_array)

    def _save_image(self, img: np.ndarray, original_name: str, fmt: str) -> str:
        """Save processed image with unique name"""
        base_name = os.path.splitext(os.path.basename(original_name))[0]
        save_path = os.path.join(self.save_dir, f"{base_name}.{fmt}")
        cv2.imwrite(save_path, img)
        return save_path

    def visualize_bboxes(self, image, bbox_df):
        fig, ax = plt.subplots(1, figsize=(8,8))
        ax.imshow(image)
        for _, row in bbox_df.iterrows():
            rect = patches.Rectangle(
                (row['x'], row['y']), row['width'], row['height'],
                linewidth=2, edgecolor='r' if row['Target'] == 1 else 'g', facecolor='none'
            )
            ax.add_patch(rect)
            # ax.text(row['x'], row['y']-5, f"T:{row['Target']}", color='blue', fontsize=8)
        
        plt.title(f"Bounding Box Visualisation")
        plt.axis('off')
        plt.show()