In [30]:
import cv2
import numpy as np
from scipy import ndimage
import os
import uuid
import json

In [35]:
class CubeSatData:
    def __init__(self, image_size = (2048, 2048)):
        self.image_size = image_size
    def generate_base_terrain(self):
        x = np.linspace(0, 100, self.image_size[0])
        y = np.linspace(0, 100, self.image_size[1])
        terrain = (np.sin(x/10) + np.cos(y/8) + np.sin((x+y)/15) + np.random.normal(0, 0.1, x.shape))
        terrain = ((terrain - terrain.min()) * 255 / (terrain.max() - terrain.min())).astype(np.uint8)
        terrain = np.resize(terrain, self.image_size)

        return terrain
    def add_radiometric_errors(self, image):
        dark_current = np.random.normal(10, 2, image.shape).astype(np.uint8)
        center_y, center_x = self.image_size[0]//2, self.image_size[1]//2
        y, x = np.ogrid[:self.image_size[0], :self.image_size[1]]
        dist_from_center = np.sqrt((x - center_x)**2 + (y - center_y)**2)
        flat_field = 1 - (dist_from_center / (np.sqrt(center_x**2 + center_y**2)))
        flat_field = (flat_field * 255).astype(np.uint8)
        flat_field = flat_field[:image.shape[0], :image.shape[1]]
        print(f"Image shape: {image.shape}")
        print(f"Image shape: {image.shape}")
        print(f"Dark current shape: {dark_current.shape}")
        degraded = image.astype(float)
        degraded += dark_current
        degraded *= (flat_field / 255.0)
        degraded = np.clip(degraded, 0, 255).astype(np.uint8)
        return degraded, dark_current, flat_field
    def add_geometric_distortion(self, image):
        rows, cols = image.shape
        src_points = np.float32([[0,0], [cols-1,0], [0,rows-1], [cols-1,rows-1]])
        dst_points = src_points + np.random.normal(0, 50, src_points.shape).astype(np.float32)
        print(f"src_points shape: {src_points.shape}, dtype: {src_points.dtype}")
        print(f"dst_points shape: {dst_points.shape}, dtype: {dst_points.dtype}")
        matrix = cv2.getPerspectiveTransform(src_points, dst_points)
        distorted = cv2.warpPerspective(image, matrix, (cols, rows))
        gcp1 = (100, 100)
        gcp2 = (cols-100, 100)
        return distorted, [gcp1, gcp2]
    def add_spectral_errors(self, image, band):
        attenuation = {
            'R': 0.8,
            'G': 0.85,
            'B': 0.75,
            'NIR': 0.9
        }
        factor = attenuation.get(band, 0.8)
        atmospheric_effect = image * factor
        noise_level = np.random.normal(0, 10, image.shape)
        degraded = atmospheric_effect + noise_level
        degraded = np.clip(degraded, 0, 255).astype(np.uint8)
        reference_values = {
            'mean': np.mean(image),
            'attenuation': factor
        }
        return degraded, reference_values
    def generate_test_dataset(self, output_dir = 'test_dataset', num_images_per_band=25):
        os.makedirs(output_dir, exist_ok=True)
        base_image = self.generate_base_terrain()
        reference_data = {}
        for band in ['R', 'G', 'B', 'NIR']: 
            
            reference_data[band] = []
            for i in range(num_images_per_band):
                band_image = self.generate_base_terrain()
                if band == 'NIR':
                    band_image = cv2.add(band_image, 30)
            
            rad_degraded, dark_current, flat_field = self.add_radiometric_errors(base_image)
            
           
            geo_degraded, control_points = self.add_geometric_distortion(rad_degraded)
            
                
          
            final_degraded, spec_ref = self.add_spectral_errors(geo_degraded, band)

            
            original_filename = f'original_{band}_{i+1}.png'
            degraded_filename = f'degraded_{band}_{i+1}.png'
            dark_current_filename = f'dark_current_{band}_{i+1}.png'
            flat_field_filename = f'flat_field_{band}_{i+1}.png'

           
        
            cv2.imwrite(os.path.join(output_dir, original_filename), band_image)
            cv2.imwrite(os.path.join(output_dir, degraded_filename), final_degraded)
            cv2.imwrite(os.path.join(output_dir, dark_current_filename), dark_current)
            cv2.imwrite(os.path.join(output_dir, flat_field_filename), flat_field)
        if band not in reference_data:
            reference_data[band] = []

        reference_data[band].append({
                'control_points': control_points,
                'spectral_reference': spec_ref,
                'dark_current_path': dark_current_filename,
                'flat_field_path': flat_field_filename,
                'original_path': original_filename,
                'degraded_path': degraded_filename
            })
        

        reference_data_path = os.path.join(output_dir, 'reference_data.json')
        with open(reference_data_path, 'w') as f:
            json.dump(reference_data, f, indent=4)
        print(f"Dataset with {num_images_per_band * 4} images generated successfully!")

if __name__=="__main__":
    generator = CubeSatData()
    reference_data = generator.generate_test_dataset()
    

Image shape: (2048, 2048)
Image shape: (2048, 2048)
Dark current shape: (2048, 2048)
src_points shape: (4, 2), dtype: float32
dst_points shape: (4, 2), dtype: float32
Image shape: (2048, 2048)
Image shape: (2048, 2048)
Dark current shape: (2048, 2048)
src_points shape: (4, 2), dtype: float32
dst_points shape: (4, 2), dtype: float32
Image shape: (2048, 2048)
Image shape: (2048, 2048)
Dark current shape: (2048, 2048)
src_points shape: (4, 2), dtype: float32
dst_points shape: (4, 2), dtype: float32
Image shape: (2048, 2048)
Image shape: (2048, 2048)
Dark current shape: (2048, 2048)
src_points shape: (4, 2), dtype: float32
dst_points shape: (4, 2), dtype: float32
Dataset with 100 images generated successfully!
