
# Wind effect animation UI

Interactively compute SAGA wind effect rasters for a range of wind directions and build PNG/MP4 animations without leaving this notebook.



In [5]:
# --- Cell 1: imports & helpers ---
import os, shutil, subprocess
from pathlib import Path
import numpy as np
import rasterio
import matplotlib.pyplot as plt
import imageio.v2 as imageio
from IPython.display import display
import ipywidgets as widgets
try:
    from ipyfilechooser import FileChooser
    HAVE_FILECHOOSER = True
except Exception:
    HAVE_FILECHOOSER = False
DEFAULT_MAXDIST_KM = 300.0
DEFAULT_ACCEL = 1.5
DEFAULT_USE_PYRAMIDS = False
DEFAULT_KEEP_SAGA = False
DEFAULT_WIDTH_PX = 960
DEFAULT_HEIGHT_PX = 704
DEFAULT_DPI = 128
DEFAULT_FPS = 8
SAGA_CANDIDATES = [
    Path(os.environ.get('OSGEO4W_ROOT', '')) / 'apps' / 'saga' / 'saga_cmd.exe',
    Path(r"C:\Users\usuario\Downloads\saga-9.9.1_x64\saga-9.9.1_x64\saga_cmd.exe"),
]
GDAL_CANDIDATES = [
    Path('C:/OSGeo4W64/bin/gdal_translate.exe'),
    Path('C:/Program Files/QGIS 3.34.0/bin/gdal_translate.exe'),
    Path('C:/Program Files/QGIS 3.28.10/bin/gdal_translate.exe'),
]
SAGA_EXTENSIONS = ['.sdat', '.sgrd', '.prj', '.mld', '.mgrd']
_LAST_FRAMES: list[str] = []
def _clean_path(value: str) -> str:
    return os.path.expanduser(os.path.expandvars((value or '').strip()))
def _resolve_executable(user_value: str, default_names: tuple[str, ...], candidates: list[Path]) -> str:
    candidate = Path(_clean_path(user_value)) if user_value else None
    if candidate:
        if candidate.is_file():
            return str(candidate)
        if candidate.is_dir():
            for name in default_names:
                guess = candidate / name
                if guess.exists():
                    return str(guess)
    for cand in candidates:
        if cand and Path(cand).exists():
            return str(Path(cand))
    for name in default_names:
        found = shutil.which(name)
        if found:
            return found
    return ''
def _run_command(cmd: list[str]) -> tuple[int, str]:
    proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    return proc.returncode, proc.stdout
def _build_angles(start: int, end: int, step: int, wrap: bool) -> list[int]:
    start = int(start) % 360
    end = int(end) % 360
    step = max(1, int(step))
    angles: list[int] = []
    if wrap:
        limit = end
        if end <= start:
            limit = end + 360
        if start == end:
            limit = start + 360
        current = float(start)
        guard = 0
        while current <= limit + 1e-9:
            angles.append(int(round(current)) % 360)
            current += step
            guard += 1
            if guard > 2000:
                break
    else:
        if end < start:
            raise ValueError('End angle must be greater than or equal to start angle when wrap is disabled.')
        for ang in range(start, end + 1, step):
            angles.append(ang % 360)
    seen = set()
    ordered = []
    for ang in angles:
        if ang not in seen:
            ordered.append(ang)
            seen.add(ang)
    return ordered or [start]
def _direction_to_saga(direction_from: int, mode: str) -> int:
    direction_from = int(direction_from) % 360
    if mode == 'from':
        return (direction_from + 180) % 360
    return direction_from
def _make_label(direction_from: int, direction_to: int, mode: str) -> str:
    if mode == 'from':
        return f"from_{direction_from:03d}deg"
    return f"to_{direction_to:03d}deg"
def _saga_wind_effect(saga_exe: str, dem_path: Path, out_base: Path, dir_to: int,
                      maxdist: float, accel: float, pyramids: bool) -> Path:
    out_base = Path(out_base)
    sdat = out_base.with_suffix('.sdat')
    cmd = [
        saga_exe, 'ta_morphometry', '15',
        '-DEM', str(dem_path),
        '-EFFECT', str(sdat),
        '-DIR_UNITS', '1',
        '-DIR_CONST', f'{dir_to:.6f}',
        '-MAXDIST', f'{maxdist:.6f}',
        '-ACCEL', f'{accel:.6f}',
        '-PYRAMIDS', '1' if pyramids else '0',
    ]
    code, out = _run_command(cmd)
    if code != 0:
        raise RuntimeError(f'saga_cmd failed (exit {code}){out}')
    return sdat
def _gdal_translate(gdal_exe: str, grid_path: Path, tif_path: Path) -> Path:
    tif_path = Path(tif_path)
    if tif_path.exists():
        tif_path.unlink()
    cmd = [
        gdal_exe,
        '-of', 'GTiff',
        '-a_nodata', '-9999',
        '-ot', 'Float32',
        '-co', 'COMPRESS=DEFLATE',
        '-co', 'TILED=YES',
        str(grid_path),
        str(tif_path),
    ]
    code, out = _run_command(cmd)
    if code != 0:
        raise RuntimeError(f'gdal_translate failed (exit {code}){out}')
    return tif_path
def _render_png(raster_path: Path, png_path: Path, title: str,
                width_px: int, height_px: int, dpi: int) -> Path:
    with rasterio.open(raster_path) as src:
        arr = src.read(1).astype('float32')
        nodata = src.nodata
        if nodata is not None:
            arr = np.where(arr == nodata, np.nan, arr)
    finite = np.isfinite(arr)
    if finite.any():
        vmin, vmax = np.nanpercentile(arr[finite], [2, 98])
    else:
        vmin, vmax = 0.0, 1.0
    fig = plt.figure(figsize=(width_px / dpi, height_px / dpi), dpi=dpi)
    ax = fig.add_subplot(111)
    im = ax.imshow(arr, vmin=vmin, vmax=vmax)
    ax.set_axis_off()
    ax.set_title(title)
    cbar = plt.colorbar(im, shrink=0.75, pad=0.02)
    cbar.set_label('Wind effect (dimensionless)')
    fig.tight_layout()
    fig.savefig(png_path)
    plt.close(fig)
    return png_path
def _cleanup_saga_outputs(base_path: Path):
    base_path = Path(base_path)
    for ext in SAGA_EXTENSIONS:
        candidate = base_path.with_suffix(ext)
        if candidate.exists():
            try:
                candidate.unlink()
            except Exception:
                pass


In [None]:
# --- Cell 2: interactive UI ---
out_log = widgets.Output(layout=widgets.Layout(border='1px solid #ddd', max_height='220px', overflow='auto'))
status = widgets.HTML('<span style="color:#555;">Idle.</span>')
w_saga = widgets.Text(
    value=_resolve_executable('', ('saga_cmd', 'saga_cmd.exe'), SAGA_CANDIDATES),
    description='saga_cmd:',
    placeholder='Path to saga_cmd or folder containing it',
    layout=widgets.Layout(width='70%')
)
w_gdal = widgets.Text(
    value=_resolve_executable('', ('gdal_translate', 'gdal_translate.exe'), GDAL_CANDIDATES),
    description='gdal_translate:',
    placeholder='Path to gdal_translate or folder containing it',
    layout=widgets.Layout(width='70%')
)
w_mode = widgets.ToggleButtons(
    options=[('FROM (met)', 'from'), ('TO (SAGA)', 'to')],
    value='from',
    description='Angles:'
)
w_angle_start = widgets.BoundedIntText(value=0, min=0, max=359, step=1, description='Start (°):')
w_angle_end   = widgets.BoundedIntText(value=350, min=0, max=359, step=1, description='End (°):')
w_angle_step  = widgets.BoundedIntText(value=10, min=1, max=180, step=1, description='Step (°):')
w_angle_wrap  = widgets.Checkbox(value=True, description='Wrap if end < start (full sweep)')
w_angle_info  = widgets.HTML('')
w_dem_path = widgets.Text(
    value='',
    description='DEM path:',
    placeholder='e.g., /data/dem.tif',
    layout=widgets.Layout(width='70%')
)
w_dem_upload = widgets.FileUpload(
    accept='.tif,.tiff,.sgrd,.sdat,.sg-grd,.sg-grd-z',
    multiple=False,
    description='Upload DEM'
)
if HAVE_FILECHOOSER:
    w_out_dir = FileChooser(str(Path.cwd()))
    w_out_dir.title = 'Output directory'
    w_out_dir.show_only_dirs = True
    w_out_dir.use_dir_icons = True
    out_dir_ui = w_out_dir
else:
    w_out_dir_text = widgets.Text(
        value='',
        description='Output dir:',
        placeholder='If empty → DEM folder',
        layout=widgets.Layout(width='70%')
    )
    w_out_dir_btn = widgets.Button(description='Browse…', icon='folder-open')
    out_dir_ui = widgets.HBox([w_out_dir_text, w_out_dir_btn])
    def _browse_out_dir(_):
        try:
            import tkinter as tk
            from tkinter import filedialog
            root = tk.Tk()
            root.withdraw()
            sel = filedialog.askdirectory(initialdir=str(Path.cwd()))
            root.destroy()
            if sel:
                w_out_dir_text.value = sel
        except Exception as e:
            with out_log:
                print(f'(Folder dialog unavailable) {e}')
    w_out_dir_btn.on_click(_browse_out_dir)
def _get_out_dir_path() -> Path | None:
    if HAVE_FILECHOOSER:
        selected = getattr(w_out_dir, 'selected_path', None) or getattr(w_out_dir, 'default_path', '')
        selected = (selected or '').strip()
        return Path(selected) if selected else None
    value = (getattr(w_out_dir_text, 'value', '') or '').strip()
    return Path(_clean_path(value)) if value else None
w_prefix = widgets.Text(
    value='',
    description='Output prefix:',
    placeholder='If empty → <DEM>_WindEffect',
    layout=widgets.Layout(width='60%')
)
w_maxdist = widgets.FloatText(value=DEFAULT_MAXDIST_KM, description='MAXDIST (km):', layout=widgets.Layout(width='32%'))
w_accel   = widgets.FloatText(value=DEFAULT_ACCEL,     description='ACCEL:',         layout=widgets.Layout(width='32%'))
w_pyramids= widgets.Checkbox(value=DEFAULT_USE_PYRAMIDS, description='PYRAMIDS')
w_keep_saga = widgets.Checkbox(value=DEFAULT_KEEP_SAGA, description='Keep SAGA grids')
w_width_px  = widgets.IntText(value=DEFAULT_WIDTH_PX,  description='PNG width:',  layout=widgets.Layout(width='28%'))
w_height_px = widgets.IntText(value=DEFAULT_HEIGHT_PX, description='PNG height:', layout=widgets.Layout(width='28%'))
w_dpi       = widgets.IntText(value=DEFAULT_DPI,       description='DPI:',        layout=widgets.Layout(width='20%'))
w_make_mp4 = widgets.Checkbox(value=True, description='Create MP4 animation')
w_mp4_name = widgets.Text(value='wind_effect_animation.mp4', description='MP4 name:', layout=widgets.Layout(width='60%'))
w_fps      = widgets.BoundedIntText(value=DEFAULT_FPS, min=1, max=60, step=1, description='FPS:')
btn_run = widgets.Button(description='Generate frames / animation', icon='film', button_style='primary')
w_dem_upload.style.button_color = '#4caf50'
angle_box = widgets.VBox([
    widgets.HBox([w_mode, w_angle_start, w_angle_end, w_angle_step]),
    widgets.HBox([w_angle_wrap, w_angle_info])
])
advanced_box = widgets.VBox([
    widgets.HBox([w_maxdist, w_accel]),
    widgets.HBox([w_pyramids, w_keep_saga])
])
advanced = widgets.Accordion(children=[advanced_box])
advanced.set_title(0, 'SAGA parameters')
png_box = widgets.HBox([w_width_px, w_height_px, w_dpi])
mp4_box = widgets.HBox([w_make_mp4, w_fps, w_mp4_name])
help_html = widgets.HTML('''
<p><b>Workflow</b></p>
<ol>
<li>Select <code>saga_cmd</code>, <code>gdal_translate</code>, and the DEM you want to process.</li>
<li>Define the wind direction range (start, end, step) and whether the values represent <i>FROM</i> or <i>TO</i> angles.</li>
<li>Pick an output directory and optional prefix, tweak advanced parameters if needed.</li>
<li>Click <b>Generate frames / animation</b> to create GeoTIFFs, PNG quicklooks, and (optionally) an MP4 animation.</li>
</ol>
''')
def _update_angle_info(*_):
    try:
        angles = _build_angles(w_angle_start.value, w_angle_end.value, w_angle_step.value, bool(w_angle_wrap.value))
        w_angle_info.value = f'<span style="color:#2b8a3e;">{len(angles)} directions</span>'
    except Exception as exc:
        w_angle_info.value = f'<span style="color:#c00;">{exc}</span>'
for ctrl in (w_angle_start, w_angle_end, w_angle_step, w_angle_wrap):
    ctrl.observe(_update_angle_info, names='value')
_update_angle_info()
def on_run_clicked(_):
    global _LAST_FRAMES
    btn_run.disabled = True
    status.value = '<span style="color:#1c7ed6;">Running…</span>'
    out_log.clear_output()
    with out_log:
        saga_exe = _resolve_executable(w_saga.value, ('saga_cmd', 'saga_cmd.exe'), SAGA_CANDIDATES)
        if not saga_exe:
            print('ERROR: saga_cmd not found. Please provide the executable or its folder.')
            status.value = '<span style="color:#c00;">Failed (saga_cmd)</span>'
            btn_run.disabled = False
            return
        gdal_exe = _resolve_executable(w_gdal.value, ('gdal_translate', 'gdal_translate.exe'), GDAL_CANDIDATES)
        if not gdal_exe:
            print('ERROR: gdal_translate not found. Please provide the executable or its folder.')
            status.value = '<span style="color:#c00;">Failed (gdal)</span>'
            btn_run.disabled = False
            return
        try:
            angles = _build_angles(w_angle_start.value, w_angle_end.value, w_angle_step.value, bool(w_angle_wrap.value))
        except Exception as exc:
            print(f'ERROR: {exc}')
            status.value = '<span style="color:#c00;">Failed (angles)</span>'
            btn_run.disabled = False
            return
        if not angles:
            print('ERROR: No wind directions computed. Adjust the start/end/step values.')
            status.value = '<span style="color:#c00;">Failed (angles)</span>'
            btn_run.disabled = False
            return
        out_dir = _get_out_dir_path()
        dem_path: Path | None = None
        if w_dem_upload.value:
            try:
                val = w_dem_upload.value
                if isinstance(val, dict):
                    key, up_item = next(iter(val.items()))
                    name = up_item.get('metadata', {}).get('name', key)
                    data = up_item.get('content')
                elif isinstance(val, (list, tuple)) and val:
                    up = val[0]
                    name = getattr(up, 'name', 'uploaded_dem')
                    data = getattr(up, 'content', None)
                    if data is None and isinstance(up, dict):
                        data = up.get('content')
                else:
                    raise ValueError('Unsupported upload payload')
                if data is None:
                    raise ValueError('No data received from upload widget')
                target_dir = out_dir or Path('.')
                target_dir.mkdir(parents=True, exist_ok=True)
                dem_path = target_dir / name
                with open(dem_path, 'wb') as fh:
                    fh.write(data)
            except Exception as exc:
                print(f'ERROR: Could not read uploaded file: {exc}')
                status.value = '<span style="color:#c00;">Failed (upload)</span>'
                btn_run.disabled = False
                return
        else:
            p = _clean_path(w_dem_path.value)
            if p:
                dem_path = Path(p)
        if not dem_path or not dem_path.exists():
            print(f'ERROR: DEM not found: {dem_path}')
            status.value = '<span style="color:#c00;">Failed (DEM)</span>'
            btn_run.disabled = False
            return
        if out_dir is None:
            out_dir = dem_path.parent
        out_dir.mkdir(parents=True, exist_ok=True)
        prefix = w_prefix.value.strip() or f"{dem_path.stem}_WindEffect"
        width_px = max(64, int(w_width_px.value))
        height_px = max(64, int(w_height_px.value))
        dpi = max(10, int(w_dpi.value))
        fps = max(1, int(w_fps.value))
        print(f'Using output directory: {out_dir}')
        print(f'Frames to generate: {len(angles)}')
        frames: list[str] = []
        for idx, direction_from in enumerate(angles, 1):
            direction_to = _direction_to_saga(direction_from, w_mode.value)
            label = _make_label(direction_from, direction_to, w_mode.value)
            base = out_dir / f"{prefix}_{label}"
            print(f"[{idx}/{len(angles)}] Wind effect → {label.replace('_', ' ')} (TO {direction_to:03d}°)")
            try:
                sdat_path = _saga_wind_effect(
                    saga_exe,
                    dem_path,
                    base,
                    direction_to,
                    float(w_maxdist.value),
                    float(w_accel.value),
                    bool(w_pyramids.value)
                )
            except Exception as exc:
                print(exc)
                status.value = '<span style="color:#c00;">Failed (saga_cmd)</span>'
                btn_run.disabled = False
                return
            try:
                tif_path = _gdal_translate(gdal_exe, sdat_path, base.with_suffix('.tif'))
            except Exception as exc:
                print(exc)
                status.value = '<span style="color:#c00;">Failed (gdal)</span>'
                btn_run.disabled = False
                return
            try:
                png_path = _render_png(
                    tif_path,
                    base.with_suffix('.png'),
                    f"Wind effect {label.replace('_', ' ')} (TO {direction_to:03d}°)",
                    width_px,
                    height_px,
                    dpi
                )
                frames.append(str(png_path))
            except Exception as exc:
                print(f'WARNING: Could not render PNG for {tif_path}: {exc}')
            if not bool(w_keep_saga.value):
                _cleanup_saga_outputs(base)
        _LAST_FRAMES = frames
        if bool(w_make_mp4.value) and frames:
            mp4_name = w_mp4_name.value.strip() or f"{prefix}_animation.mp4"
            mp4_path = out_dir / mp4_name
            if mp4_path.suffix.lower() != '.mp4':
                mp4_path = mp4_path.with_suffix('.mp4')
            try:
                with imageio.get_writer(
                    str(mp4_path),
                    fps=fps,
                    codec='libx264',
                    quality=8,
                    macro_block_size=16,
                    ffmpeg_params=['-pix_fmt', 'yuv420p']
                ) as writer:
                    for frame in frames:
                        writer.append_data(imageio.imread(frame))
                print(f'MP4 written: {mp4_path}')
            except Exception as exc:
                print(f'WARNING: Could not create MP4: {exc}')
        print(f'Frames saved: {len(frames)} in {out_dir}')
        status.value = '<span style="color:#2b8a3e;">Processed ✔</span>'
    btn_run.disabled = False
btn_run.on_click(on_run_clicked)
ui = widgets.VBox([
    status,
    w_saga,
    w_gdal,
    angle_box,
    widgets.HBox([w_dem_path, w_dem_upload]),
    out_dir_ui,
    w_prefix,
    advanced,
    png_box,
    mp4_box,
    btn_run,
    out_log,
    help_html,
])
display(ui)
print('Ready.')


VBox(children=(HTML(value='<span style="color:#555;">Idle.</span>'), Text(value='C:\\Users\\usuario\\Downloads…

Ready.
