In [None]:
import os
import shutil
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
from skimage.transform import resize
import random

In [None]:
ENVI_TO_NP = {
    1: np.uint8,
    2: np.int16,
    3: np.int32,
    4: np.float32,
    5: np.float64,
    12: np.uint16,
    13: np.uint32,
    14: np.int64,
    15: np.uint64
}

# Cell 2: Utility Functions
def parse_envi_header(hdr_path):
    """Parse ENVI header file into a dictionary"""
    header = {}
    with open(hdr_path, 'r') as f:
        for line in f:
            line = line.strip()
            if '=' in line:
                key, value = line.split('=', 1)
                key = key.strip().lower()
                value = value.strip()
                header[key] = value
    return header

def get_file_list(input_root):
    """Find all .bin files in directory structure"""
    bin_files = []
    for root, dirs, files in os.walk(input_root):
        for file in files:
            if file.endswith('.bin'):
                bin_files.append(os.path.join(root, file))
    return bin_files

# Cell 3: Processing Functions
def process_hsi_file(bin_path, output_root):
    """Process a single HSI file and return statistics"""
    hdr_path = os.path.splitext(bin_path)[0] + '.hdr'
    if not os.path.exists(hdr_path):
        raise FileNotFoundError(f"No header file found for {bin_path}")

    header = parse_envi_header(hdr_path)
    
    height = int(header['lines'])
    width = int(header['samples'])
    bands = int(header['bands'])
    dtype = ENVI_TO_NP[int(header['data type'])]
    interleave = header.get('interleave', 'bil').lower()
    byte_order = '<' if int(header.get('byte order', 0)) == 0 else '>'
    dtype = np.dtype(dtype).newbyteorder(byte_order)

    with open(bin_path, 'rb') as f:
        data = np.fromfile(f, dtype=dtype)
    
    if interleave == 'bil':
        data = data.reshape((height, bands, width)).transpose(0, 2, 1)
    elif interleave == 'bip':
        data = data.reshape((height, width, bands))
    elif interleave == 'bsq':
        data = data.reshape((bands, height, width)).transpose(1, 2, 0)
    else:
        raise ValueError(f"Unsupported interleave format: {interleave}")

    # Save to npy
    rel_path = os.path.splitext(os.path.relpath(bin_path, input_root))[0]
    output_path = os.path.join(output_root, rel_path + '.npy')
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    np.save(output_path, data)

    return {
        'height': height,
        'width': width,
        'bands': bands,
        'pixel_min': np.min(data),
        'pixel_max': np.max(data),
        'pixel_sum': np.sum(data),
        'pixel_count': data.size
    }

# Cell 4: Main Processing and Statistics
def process_all_files(input_root, output_root):
    """Process all files and collect statistics"""
    file_list = get_file_list(input_root)
    all_stats = []
    global_stats = {
        'total_pixel_sum': 0,
        'total_pixel_count': 0,
        'global_pixel_min': None,
        'global_pixel_max': None,
        'dim_stats': defaultdict(list)
    }
    
    for i, bin_path in enumerate(file_list):
        try:
            print(f"Processing {i+1}/{len(file_list)}: {bin_path}")
            stats = process_hsi_file(bin_path, output_root)
            all_stats.append(stats)
            
            # Update global statistics
            global_stats['total_pixel_sum'] += stats['pixel_sum']
            global_stats['total_pixel_count'] += stats['pixel_count']
            
            if (global_stats['global_pixel_min'] is None or 
                stats['pixel_min'] < global_stats['global_pixel_min']):
                global_stats['global_pixel_min'] = stats['pixel_min']
            
            if (global_stats['global_pixel_max'] is None or 
                stats['pixel_max'] > global_stats['global_pixel_max']):
                global_stats['global_pixel_max'] = stats['pixel_max']
            
            # Update dimension stats
            for dim in ['height', 'width', 'bands']:
                global_stats['dim_stats'][dim].append(stats[dim])
                
        except Exception as e:
            print(f"Error processing {bin_path}: {str(e)}")
            continue
    
    return global_stats, all_stats

# Cell 5: Visualization and Reporting
def print_statistics(global_stats):
    """Print formatted statistics"""
    print("Dimension Statistics:")
    for dim in ['height', 'width', 'bands']:
        values = global_stats['dim_stats'][dim]
        print(f"{dim.capitalize()}:")
        print(f"  Min: {min(values)}")
        print(f"  Max: {max(values)}")
        print(f"  Avg: {sum(values)/len(values):.2f}")
    
    print("\nPixel Value Statistics:")
    print(f"Global Minimum: {global_stats['global_pixel_min']}")
    print(f"Global Maximum: {global_stats['global_pixel_max']}")
    if global_stats['total_pixel_count'] > 0:
        print(f"Global Mean: {global_stats['total_pixel_sum'] / global_stats['total_pixel_count']:.4f}")

def plot_dimension_distributions(global_stats):
    """Plot histograms for dimensions"""
    fig, axs = plt.subplots(1, 3, figsize=(15, 4))
    for i, dim in enumerate(['height', 'width', 'bands']):
        axs[i].hist(global_stats['dim_stats'][dim], bins=20, alpha=0.7)
        axs[i].set_title(f'{dim.capitalize()} Distribution')
        axs[i].set_xlabel(dim)
        axs[i].set_ylabel('Count')
    plt.tight_layout()
    plt.show()

In [None]:
# Cell 6: Execution Cell (Edit paths here before running)
if __name__ == "__main__":
    
    # Set your paths here
    input_root = r'<path_to_input_root>/Ripeness'
    output_root = r'<path_to_output_root>/Ripeness_npy'
    
    # Process files and get statistics
    global_stats, all_stats = process_all_files(input_root, output_root)
    
    # Print statistics
    print_statistics(global_stats)
    
    # Show visualizations
    plot_dimension_distributions(global_stats)

In [None]:
def compute_stats(values):
    
    """Compute min, max, and average of a list."""
    if not values:
        return None, None, None
    return min(values), max(values), sum(values) / len(values)

def analyze_hsi_dataset(root_dir):
    
    # Initialize lists to store dimensions
    heights = []
    widths = []
    bands_list = []
    
    # Traverse directory structure
    for root, dirs, files in os.walk(root_dir):
        
        for file in files:
            
            if file.endswith('.npy'):
                
                file_path = os.path.join(root, file)
                
                try:
                    arr = np.load(file_path)
                    
                    if arr.ndim != 3:
                        print(f"Skipping {file_path}: Expected 3D array, got {arr.ndim}D")
                        
                        continue
                    
                    # Assuming shape is (height, width, bands)
                    h, w, b = arr.shape  # Adjust indices if your data uses a different order
                    
                    heights.append(h)
                    widths.append(w)
                    bands_list.append(b)
                    
                except Exception as e:
                    print(f"Error processing {file_path}: {str(e)}")
    
    # Compute statistics
    h_min, h_max, h_avg = compute_stats(heights)
    w_min, w_max, w_avg = compute_stats(widths)
    b_min, b_max, b_avg = compute_stats(bands_list)
    
    # Print results
    print("Height Statistics:")
    print(f"  Min: {h_min}, Max: {h_max}, Average: {h_avg:.2f}")
    print("\nWidth Statistics:")
    print(f"  Min: {w_min}, Max: {w_max}, Average: {w_avg:.2f}")
    print("\nBands Statistics:")
    print(f"  Min: {b_min}, Max: {b_max}, Average: {b_avg:.2f}")

if __name__ == "__main__":
    
    root_directory = r'<path_to_directory>/VIS'
    analyze_hsi_dataset(root_directory)

In [None]:
def resize_hsi(arr, target_h, target_w, preserve_range=True):
    
    # Initialize resized array
    resized = np.zeros((target_h, target_w, arr.shape[2]), dtype=arr.dtype)
    
    # Resize each band individually
    for band in range(arr.shape[2]):
        resized_band = resize(
            arr[:, :, band],
            (target_h, target_w),
            preserve_range=preserve_range,
            anti_aliasing=True  # Reduces artifacts
        )
        resized[:, :, band] = resized_band
        
    return resized


def process_hsi_dataset(input_root, output_root, target_h, target_w):
    
    for root, dirs, files in os.walk(input_root):
        for file in files:
            if file.endswith('.npy'):
                # Create output directory structure
                rel_path = os.path.relpath(root, input_root)
                output_dir = os.path.join(output_root, rel_path)
                os.makedirs(output_dir, exist_ok=True)
                
                # Process file
                input_path = os.path.join(root, file)
                output_path = os.path.join(output_dir, file)
                
                try:
                    # Load original HSI
                    arr = np.load(input_path)
                    
                    if arr.ndim != 3:
                        print(f"Skipping {input_path}: Not a 3D array")
                        continue
                        
                    # Resize and save
                    resized_arr = resize_hsi(arr, target_h, target_w)
                    np.save(output_path, resized_arr)
                    print(f"Processed: {input_path} -> {output_path}")
                    
                except Exception as e:
                    print(f"Error processing {input_path}: {str(e)}")


if __name__ == "__main__":
    # Configuration
    input_directory = r'<path_to_input_directory>/VIS'
    output_directory = r'<path_to_output_directory>/VIS_resized'

    target_height = 250  
    target_width = 200   
    
    # Run processing
    process_hsi_dataset(
        input_root=input_directory,
        output_root=output_directory,
        target_h=target_height,
        target_w=target_width
    )

In [None]:
def visualize_random_hsi(directory, num_images=4, figsize=(15, 10)):
    # Find all .npy files in directory
    hsi_files = []
    for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith('.npy'):
                hsi_files.append(os.path.join(root, file))
    
    if not hsi_files:
        print("No .npy files found in directory!")
        return

    # Select random files
    selected_files = random.sample(hsi_files, min(num_images, len(hsi_files)))
    
    # Create subplots
    rows = int(np.sqrt(num_images))
    cols = int(np.ceil(num_images / rows))
    fig, axes = plt.subplots(rows, cols, figsize=figsize)
    axes = axes.ravel() if num_images > 1 else [axes]

    for idx, file_path in enumerate(selected_files):
        try:
            # Load HSI cube
            hsi = np.load(file_path)
            
            if hsi.ndim != 3:
                print(f"Skipping {file_path}: Not a 3D array")
                continue

            # Create pseudo-color image (mean across bands)
            img = np.mean(hsi, axis=2)  # Change axis if different dimension order
            
            # Plot with Viridis colormap
            axes[idx].imshow(img, cmap='viridis')
            axes[idx].set_title(os.path.basename(file_path))
            axes[idx].axis('off')

        except Exception as e:
            print(f"Error visualizing {file_path}: {str(e)}")
    
    # Hide empty axes
    for j in range(idx+1, len(axes)):
        axes[j].axis('off')
    
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    output_directory = r'<path_to_output_directory>/VIS_resized'

    
    visualize_random_hsi(
        
        directory=output_directory,
        num_images=4,        # Number of images to display
        figsize=(15, 10)     # Figure size
    )

In [None]:
def split_dataset(source_dir, output_dir, train_ratio=0.75, val_ratio=0.125, test_ratio=0.125):
    
    if not os.path.exists(source_dir):
        
        print(f"Source directory '{source_dir}' does not exist.")
        
        return

    # Ensure ratios sum up to 1
    assert train_ratio + val_ratio + test_ratio == 1, "Ratios must sum up to 1"

    # Define output subdirectories
    train_dir = os.path.join(output_dir, "train")
    val_dir = os.path.join(output_dir, "val")
    test_dir = os.path.join(output_dir, "test")

    # Create output directories if they don't exist
    for folder in [train_dir, val_dir, test_dir]:
        os.makedirs(folder, exist_ok=True)

    # Walk through dataset and split files
    for root, dirs, files in os.walk(source_dir):
        if not files:
            continue  # Skip empty directories
        
        # Relative path from source directory
        rel_path = os.path.relpath(root, source_dir)

        # Create corresponding folders in train, val, test
        for subset_dir in [train_dir, val_dir, test_dir]:
            os.makedirs(os.path.join(subset_dir, rel_path), exist_ok=True)

        # Shuffle and split files
        random.shuffle(files)
        total_files = len(files)
        train_split = int(total_files * train_ratio)
        val_split = int(total_files * val_ratio)

        train_files = files[:train_split]
        val_files = files[train_split:train_split + val_split]
        test_files = files[train_split + val_split:]

        # Copy files to respective directories
        for file in train_files:
            shutil.copy2(os.path.join(root, file), os.path.join(train_dir, rel_path, file))
        for file in val_files:
            shutil.copy2(os.path.join(root, file), os.path.join(val_dir, rel_path, file))
        for file in test_files:
            shutil.copy2(os.path.join(root, file), os.path.join(test_dir, rel_path, file))

    print(f"Dataset split complete. Check '{output_dir}' for the split dataset.")

source_dataset = r'<path_to_source_dataset>/NIR_resized'
split_output = r'<path_to_split_output>/NIR_resized_Split'

split_dataset(source_dataset, split_output)