<a href="https://colab.research.google.com/github/segfault610/hello-world/blob/main/volume_analyzer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
from scipy import ndimage
from sklearn.cluster import KMeans
from dataclasses import dataclass
from typing import List, Tuple, Dict

@dataclass
class ImageFeatures:
    waterways: np.ndarray
    roads: np.ndarray
    elevation: np.ndarray
    buildings: np.ndarray
    vegetation: np.ndarray

@dataclass
class SoilData:
    permeability: np.ndarray
    composition: np.ndarray
    drainage: np.ndarray
    stability: np.ndarray

@dataclass
class CanalPlan:
    canal_routes: List[List[Tuple[int, int]]]
    container_locations: List[Tuple[int, int, int]]
    storage_volume: float
    canal_length: float
    efficiency_score: float
    cost_estimate: float

class ImageFusionProcessor:
    def __init__(self, pixel_to_meter_ratio: float = 2.0):
        self.pixel_to_meter = pixel_to_meter_ratio
        self.fusion_weights = {'geographic': 0.7, 'soil': 0.3}

    def load_and_validate_inputs(self, map_features, soil_composition):
        # Validate or simulate loading images
        if not isinstance(map_features, np.ndarray):
            map_features = self._create_sample_map_features()
        if not isinstance(soil_composition, np.ndarray):
            soil_composition = self._create_sample_soil_composition()
        map_features, soil_composition = self._ensure_compatible_dimensions(map_features, soil_composition)
        map_features = self._normalize_image(map_features, "map_features")
        soil_composition = self._normalize_image(soil_composition, "soil_composition")
        return map_features, soil_composition

    def rgb_to_ihs(self, rgb_image):
        rgb = rgb_image.astype(np.float32) / 255.0
        R, G, B = rgb[:,:,0], rgb[:,:,1], rgb[:,:,2]
        I = (R+G+B)/3
        numerator = 0.5 * ((R-G) + (R-B))
        denominator = np.sqrt((R-G)**2 + (R-B)*(G-B)) + 1e-10
        theta = np.arccos(np.clip(numerator/denominator,-1,1))
        H = np.where(B<=G, theta, 2*np.pi - theta)
        H /= 2*np.pi
        min_rgb = np.minimum(np.minimum(R,G),B)
        S = 1 - 3*min_rgb/(R+G+B+1e-10)
        S = np.clip(S,0,1)
        return I,H,S

    def ihs_to_rgb(self,I,H,S):
        H = H*2*np.pi
        R = np.zeros_like(I)
        G = np.zeros_like(I)
        B = np.zeros_like(I)
        mask1 = (H >= 0) & (H < 2*np.pi/3)
        B[mask1] = I[mask1]*(1-S[mask1])
        R[mask1] = I[mask1]*(1 + S[mask1]*np.cos(H[mask1])/np.cos(np.pi/3 - H[mask1]))
        G[mask1] = 3*I[mask1] - (R[mask1] + B[mask1])
        mask2 = (H >= 2*np.pi/3) & (H < 4*np.pi/3)
        H2 = H[mask2]-2*np.pi/3
        R[mask2] = I[mask2]*(1-S[mask2])
        G[mask2] = I[mask2]*(1 + S[mask2]*np.cos(H2)/np.cos(np.pi/3 - H2))
        B[mask2] = 3*I[mask2] - (R[mask2] + G[mask2])
        mask3 = (H >= 4*np.pi/3) & (H < 2*np.pi)
        H3 = H[mask3]-4*np.pi/3
        G[mask3] = I[mask3]*(1-S[mask3])
        B[mask3] = I[mask3]*(1 + S[mask3]*np.cos(H3)/np.cos(np.pi/3 - H3))
        R[mask3] = 3*I[mask3] - (G[mask3] + B[mask3])
        rgb = np.stack([np.clip(R,0,1),np.clip(G,0,1),np.clip(B,0,1)], axis=2)
        return (rgb*255).astype(np.uint8)

    def apply_guided_filter(self, image, guide, radius=8, epsilon=0.01):
        I = image.astype(np.float32)/255.0
        p = guide.astype(np.float32)/255.0
        mean_I = ndimage.uniform_filter(I,size=2*radius+1)
        mean_p = ndimage.uniform_filter(p,size=2*radius+1)
        mean_Ip = ndimage.uniform_filter(I*p,size=2*radius+1)
        cov_Ip = mean_Ip - mean_I*mean_p
        mean_II = ndimage.uniform_filter(I*I,size=2*radius+1)
        var_I = mean_II - mean_I*mean_I
        a = cov_Ip/(var_I + epsilon)
        b = mean_p - a*mean_I
        mean_a = ndimage.uniform_filter(a,size=2*radius+1)
        mean_b = ndimage.uniform_filter(b,size=2*radius+1)
        q = mean_a*I + mean_b
        return (np.clip(q,0,1)*255).astype(np.uint8)

    def fuse_images(self, map_features, soil_composition, fusion_method='ihs_weighted'):
        map_img, soil_img = self.load_and_validate_inputs(map_features, soil_composition)
        if fusion_method == 'ihs_weighted':
            return self._ihs_weighted_fusion(map_img, soil_img)
        else:
            raise NotImplementedError("Only ihs_weighted fusion implemented here.")

    def _ihs_weighted_fusion(self, map_img, soil_img):
        if len(map_img.shape)==2:
            map_rgb = np.stack([map_img]*3,axis=2)
        else:
            map_rgb=map_img
        I,H,S = self.rgb_to_ihs(map_rgb)
        soil_intensity = soil_img.astype(np.float32)/255.0
        soil_intensity = ndimage.gaussian_filter(soil_intensity,sigma=2)
        geo_wt = self.fusion_weights['geographic']
        soil_wt = self.fusion_weights['soil']
        I_fused = geo_wt*I + soil_wt*soil_intensity
        var_map = ndimage.generic_filter(I,np.var,size=9)
        enh_factor = 1 + 0.3*(var_map/np.max(var_map))
        I_fused *= enh_factor
        I_fused = np.clip(I_fused,0,1)
        fused_rgb = self.ihs_to_rgb(I_fused,H,S)
        guide = np.mean(map_rgb,axis=2).astype(np.uint8)
        for c in range(fused_rgb.shape[2]):
            fused_rgb[:,:,c] = self.apply_guided_filter(fused_rgb[:,:,c], guide)
        return fused_rgb

    def _normalize_image(self,image,name):
        image = np.clip(image,0,255).astype(np.uint8)
        return image

    def _ensure_compatible_dimensions(self, map_img, soil_img):
        if map_img.shape[:2] != soil_img.shape[:2]:
            from scipy.ndimage import zoom
            target_shape = map_img.shape[:2]
            if len(soil_img.shape)==2:
                soil_img = zoom(soil_img,(target_shape[0]/soil_img.shape[0],target_shape[1]/soil_img.shape[1]))
            else:
                soil_img = zoom(soil_img,(target_shape[0]/soil_img.shape[0],target_shape[1]/soil_img.shape[1],1))
            soil_img = np.clip(soil_img,0,255).astype(np.uint8)
        return map_img, soil_img

class AdvancedCanalOptimizer:
    def __init__(self, features: ImageFeatures, soil_data: SoilData, pixel_to_meter: float = 2.0):
        self.features = features
        self.soil_data = soil_data
        self.pixel_to_meter = pixel_to_meter
        self.height, self.width = features.elevation.shape
        self.containers = {
            'small': {'capacity': 1000, 'cost': 800, 'footprint': 4},
            'medium': {'capacity': 2000, 'cost': 1200, 'footprint': 6},
            'large': {'capacity': 5000, 'cost': 2500, 'footprint': 10}
        }

    def calculate_runoff_map(self, rainfall_mm: float=100):
        soil_runoff_coeff = 1.0 - (self.soil_data.permeability / 255.0) * 0.6
        soil_runoff_coeff = np.clip(soil_runoff_coeff, 0.1, 0.95)
        runoff_coeff = soil_runoff_coeff.copy()
        road_mask = self.features.roads > 0
        building_mask = self.features.buildings>0
        runoff_coeff[road_mask] = np.minimum(runoff_coeff[road_mask] + 0.3, 0.95)
        runoff_coeff[building_mask] = np.minimum(runoff_coeff[building_mask] + 0.4, 0.95)
        veg_mask = self.features.vegetation > 0
        runoff_coeff[veg_mask] = np.maximum(runoff_coeff[veg_mask] - 0.2, 0.1)
        pixel_area_m2 = self.pixel_to_meter ** 2
        rainfall_m = rainfall_mm / 1000.0
        runoff_volume = runoff_coeff * rainfall_m * pixel_area_m2 * 1000
        return runoff_volume

    def find_source_points(self, runoff_map: np.ndarray, num_sources: int=6):
        runoff_smooth = ndimage.gaussian_filter(runoff_map, sigma=15)
        local_max = (runoff_smooth == ndimage.maximum_filter(runoff_smooth, size=50))
        max_coords = np.where(local_max)
        max_values = runoff_smooth[max_coords]
        sorted_indices = np.argsort(max_values)[::-1]
        source_points = []
        min_distance = 80
        for idx in sorted_indices:
            y,x = max_coords[0][idx], max_coords[1][idx]
            if not source_points or all(np.sqrt((x-sx)**2 + (y-sy)**2) > min_distance for sy,sx in source_points):
                source_points.append((y,x))
                if len(source_points) >= num_sources:
                    break
        return source_points

    def find_sink_points(self):
        waterway_coords = np.where(self.features.waterways > 0)
        if len(waterway_coords[0]) > 0:
            coords = np.column_stack((waterway_coords[0], waterway_coords[1]))
            if len(coords)>100:
                num_clusters = min(4,len(coords)//100)
                kmeans = KMeans(n_clusters=num_clusters, random_state=42, n_init=10)
                clusters = kmeans.fit_predict(coords)
                sink_points = [(int(c[0]), int(c[1])) for c in kmeans.cluster_centers_]
            else:
                sink_points=[(waterway_coords[0][0], waterway_coords[1][0])]
        else:
            sink_points=[(self.height-1, self.width//2),(self.height//2,self.width-1)]
        return sink_points

    def plan_canal_route(self, source:Tuple[int,int], sink:Tuple[int,int])->List[Tuple[int,int]]:
        def get_cost(y,x):
            if not(0<=y<self.height and 0<=x<self.width): return float('inf')
            elevation_cost = self.features.elevation[y,x]/255.0
            stability_cost = 1.0 - (self.soil_data.stability[y,x]/255.0)
            if self.features.buildings[y,x]>0: return float('inf')
            if self.features.roads[y,x]>0:
                stability_cost+=0.5
            return elevation_cost + stability_cost
        current = source
        path = [current]
        max_steps = abs(sink[0]-source[0])+abs(sink[1]-source[1])+100
        for _ in range(max_steps):
            if current == sink: break
            y,x = current
            directions = [(-1,0),(1,0),(0,-1),(0,1),(-1,-1),(-1,1),(1,-1),(1,1)]
            best_next = None
            best_score = float('inf')
            for dy,dx in directions:
                ny,nx = y+dy,x+dx
                if 0<=ny<self.height and 0<=nx<self.width:
                    dist_to_sink = abs(ny-sink[0]) + abs(nx-sink[1])
                    terrain_c = get_cost(ny,nx)
                    score = dist_to_sink + terrain_c*10
                    if score<best_score:
                        best_score=score
                        best_next=(ny,nx)
            if best_next and best_next not in path:
                current = best_next
                path.append(current)
            else: break
        if current != sink:
            path.append(sink)
        return path

    def optimize_containers(self, canal_routes: List[List[Tuple[int,int]]], runoff_map: np.ndarray, target_efficiency=0.3) -> CanalPlan:
        all_containers = []
        total_storage=0
        total_cost=0
        total_canal_length=0
        for route in canal_routes:
            if len(route)<2: continue
            route_len_px=len(route)
            route_len_m = route_len_px*self.pixel_to_meter
            total_canal_length += route_len_m
            corridor_radius=25
            route_runoff=0
            for y,x in route:
                y0,y1=max(0,y-corridor_radius), min(self.height,y+corridor_radius+1)
                x0,x1=max(0,x-corridor_radius), min(self.width,x+corridor_radius+1)
                route_runoff += np.sum(runoff_map[y0:y1,x0:x1])
            target_storage = route_runoff*target_efficiency
            best_config = self._select_containers(route_len_m,target_storage)
            containers = self._place_route_containers(route,best_config)
            all_containers.extend(containers)
            total_storage += sum(self.containers[c['type']]['capacity'] for c in containers)
            total_cost += sum(self.containers[c['type']]['cost'] for c in containers)
        total_cost += total_canal_length*75  # canal cost per meter
        total_runoff = np.sum(runoff_map)
        efficiency_score = min(1.0,total_storage/total_runoff) if total_runoff>0 else 0
        return CanalPlan(canal_routes=canal_routes, container_locations=[(c['x'],c['y'],self.containers[c['type']]['capacity']) for c in all_containers], storage_volume=total_storage, canal_length=total_canal_length, efficiency_score=efficiency_score, cost_estimate=total_cost)

    def _select_containers(self, route_length, target_storage):
        best_config=None
        best_score=-1
        for cont_type,specs in self.containers.items():
            max_containers = int(route_length // (specs['footprint'] * self.pixel_to_meter))
            for count in range(1, min(max_containers+1,50)):
                storage = count * specs['capacity']
                cost = count * specs['cost']
                storage_score = min(1.0, storage/target_storage) if target_storage>0 else 0
                cost_efficiency = storage/cost if cost>0 else 0
                score = storage_score*0.7 + (cost_efficiency/5)*0.3
                if score > best_score:
                    best_score = score
                    best_config = {'type':cont_type,'count':count,'storage':storage,'cost':cost}
        return best_config or {'type':'medium','count':1,'storage':2000,'cost':1200}

    def _place_route_containers(self, route, config):
        containers = []
        route_len = len(route)
        if config['count'] <= 0 or route_len==0:
            return containers
        if config['count'] ==1:
            positions = [route_len//2]
        else:
            positions = [int(i*route_len/config['count']) for i in range(config['count'])]
        for pos in positions:
            pos = min(pos, route_len-1)
            y,x = route[pos]
            containers.append({'x':x,'y':y,'type':config['type']})
        return containers


# ----- Example usage -----
def run_full_pipeline(geographic_image=None, soil_image=None):
    # Step 1: Image Fusion
    fusion_processor = ImageFusionProcessor(pixel_to_meter_ratio=2.0)
    map_features, soil_composition = fusion_processor.load_and_validate_inputs(geographic_image, soil_image)
    fused_image = fusion_processor.fuse_images(map_features, soil_composition, fusion_method='ihs_weighted')

    # Assuming features and soil_data extract from fused_image or separately
    # Here we simulate feature extraction for demo purposes
    features = ImageFeatures(
        waterways = (fused_image[:,:,2] > 150).astype(np.uint8)*255,
        roads = (fused_image[:,:,0] > 120).astype(np.uint8)*255,
        elevation = (np.mean(fused_image, axis=2)).astype(np.uint8),
        buildings = (fused_image[:,:,1] > 180).astype(np.uint8)*255,
        vegetation = (fused_image[:,:,1] > 150).astype(np.uint8)*255
    )
    soil_data = SoilData(
        permeability = soil_composition,
        composition = np.zeros_like(soil_composition),
        drainage = np.zeros_like(soil_composition),
        stability = np.full_like(soil_composition, 200)
    )

    # Step 2: Runoff Map Estimation
    optimizer = AdvancedCanalOptimizer(features, soil_data, pixel_to_meter=2.0)
    runoff_map = optimizer.calculate_runoff_map(rainfall_mm=125)

    # Step 3: Source and Sink Point Detection
    sources = optimizer.find_source_points(runoff_map, num_sources=7)
    sinks = optimizer.find_sink_points()

    # Step 4: Canal Route Planning
    canal_routes = []
    for source in sources:
        distances = [np.sqrt((source[0]-sink[0])**2+(source[1]-sink[1])**2) for sink in sinks]
        closest_sink = sinks[np.argmin(distances)]
        canal_routes.append(optimizer.plan_canal_route(source, closest_sink))

    # Step 5: Container Optimization
    optimal_plan = optimizer.optimize_containers(canal_routes, runoff_map, target_efficiency=0.35)

    # Output results summary
    print(f"Containers deployed: {len(optimal_plan.container_locations)}")
    print(f"Total storage volume: {optimal_plan.storage_volume:.0f} liters")
    print(f"Canal network length: {optimal_plan.canal_length:.1f} meters")
    print(f"Water capture efficiency: {optimal_plan.efficiency_score*100:.2f}%")
    print(f"Total system cost estimated: ${optimal_plan.cost_estimate:.2f}")

    return fused_image, optimal_plan

# To run with teammate outputs loaded as numpy arrays:
# fused_img, plan = run_full_pipeline(geographic_image=your_map_image_array, soil_image=your_soil_array)
