# Sentinel-2 수생식물·조류 탐지 APA 스크립트

[Sentinel-2 수생식물·조류 탐지 APA 스크립트](https://custom-scripts.sentinel-hub.com/custom-scripts/sentinel-2/apa_script/)

In [1]:
import numpy as np
import rasterio
import matplotlib.pyplot as plt

from rasterio.warp import reproject, Resampling
from pathlib import Path

In [2]:
# 파일 경로 정의
base_path = "sentinel2_b02.tif"  # 기준 밴드 (10m)
align_paths = [
    "sentinel2_b03.tif",  # 10m
    "sentinel2_b04.tif",  # 10m
    "sentinel2_b05.tif",  # 20m
    "sentinel2_b08.tif",  # 10m
    "sentinel2_b11.tif",  # 20m
]

In [3]:
def load_and_align_bands(base_path, align_paths, scale=0.0001,
                         resampling=Resampling.bilinear):

    # 기준 밴드
    with rasterio.open(base_path) as src:
        base = src.read(1).astype(np.float32)
        nodata = src.nodata
        if nodata is not None:
            base = np.where(base == nodata, np.nan, base)
        base *= scale  # 스케일 적용
        base_profile = src.profile

    aligned = {}

    for p in align_paths:
        stem = Path(p).stem

        with rasterio.open(p) as src:
            arr = src.read(1).astype(np.float32)
            prof = src.profile
            nodata = src.nodata

        # nodata 처리
        if nodata is not None:
            arr = np.where(arr == nodata, np.nan, arr)

        # 스케일 적용
        arr *= scale

        # 리샘플링
        if (prof["transform"] != base_profile["transform"] or
            prof["crs"] != base_profile["crs"] or
            prof["height"] != base_profile["height"] or
            prof["width"] != base_profile["width"]):

            dst = np.empty((base_profile["height"], base_profile["width"]),
                           dtype=np.float32)

            reproject(
                source=arr,
                destination=dst,
                src_transform=prof["transform"],
                src_crs=prof["crs"],
                dst_transform=base_profile["transform"],
                dst_crs=base_profile["crs"],
                resampling=resampling
            )

            arr = dst

        aligned[stem] = arr

    return base, aligned, base_profile

In [4]:
B02, bands, base_profile = load_and_align_bands(base_path, align_paths)
B03 = bands["sentinel2_b03"]
B04 = bands["sentinel2_b04"]
B05 = bands["sentinel2_b05"]
B08 = bands["sentinel2_b08"]
B11 = bands["sentinel2_b11"]

In [5]:
def compute_aquatic_indices_rgb(B02, B03, B04, B05, B08, B11):
    """
    수생식물·조류 탐지용 분광지수(NDWI, water_plants, FAI)를 계산
    aquatic RGB를 생성하는 함수
    """

    # 안전한 나눗셈
    def safe_index(num, den):
        num = num.astype(np.float32)
        den = den.astype(np.float32)
        with np.errstate(divide="ignore", invalid="ignore"):
            out = num / den
        mask = (den == 0) | np.isnan(num) | np.isnan(den)
        out[mask] = np.nan
        return out.astype(np.float32)

    # 모든 밴드 float32로 변환
    B02 = B02.astype(np.float32)
    B03 = B03.astype(np.float32)
    B04 = B04.astype(np.float32)
    B05 = B05.astype(np.float32)
    B08 = B08.astype(np.float32)
    B11 = B11.astype(np.float32)

    # 분광지수 계산
    NDWI = safe_index(B03 - B08, B03 + B08)          # 물 탐지 지수
    water_plants = safe_index(B05 - B04, B05 + B04)  # 수생 식물 지수

    # Floating Algae Index (FAI)
    NIR2 = B04 + (B11 - B04) * ((832.8 - 664.6) / (1613.7 - 664.6))
    FAI = B08 - NIR2                                 # 부유 조류 지수

    # aquatic RGB 합성
    rgb = np.stack([
        8.5 * FAI,
        5.5 * water_plants,
        1.0 * NDWI,
    ], axis=0).astype(np.float32)

    return rgb, NDWI, FAI, water_plants

In [6]:
rgb, NDWI, FAI, water_plants = compute_aquatic_indices_rgb(
    B02, B03, B04, B05, B08, B11
)

In [7]:
def save_single_band(arr, profile, path, nodata_value=-9999.0):
    """
    단일 밴드를 GeoTIFF로 저장
    NaN은 nodata_value로 기록
    """
    arr_to_save = np.where(np.isnan(arr), nodata_value, arr).astype(np.float32)

    prof = profile.copy()
    prof.update(
        dtype=rasterio.float32,
        count=1,
        compress="lzw",
        nodata=nodata_value,
    )

    with rasterio.open(path, "w", **prof) as dst:
        dst.write(arr_to_save, 1)

    print("Saved:", path)

In [8]:
# 단일 지수 저장
save_single_band(NDWI, base_profile, "ndwi.tif")
save_single_band(FAI, base_profile, "fai.tif")
save_single_band(water_plants, base_profile, "water_plants.tif")

Saved: ndwi.tif
Saved: fai.tif
Saved: water_plants.tif


In [9]:
def save_rgb(
    rgb,
    profile,
    path="aquatic_rgb.tif",
    nodata_value=-9999.0,
    band_names=["FAI", "WATER_PLANTS", "NDWI"]
):
    """
    (3, H, W) rgb 배열을 3밴드 GeoTIFF로 저장.
    NaN은 nodata_value로 기록하며, 밴드 이름도 저장.
    """
    rgb_to_save = np.where(np.isnan(rgb), nodata_value, rgb).astype(np.float32)

    prof = profile.copy()
    prof.update(
        dtype=rasterio.float32,
        count=3,
        compress="lzw",
        nodata=nodata_value,
    )

    with rasterio.open(path, "w", **prof) as dst:
        # 데이터 쓰기
        for i in range(3):
            dst.write(rgb_to_save[i], i + 1)

        # 밴드 설명 추가
        for i, name in enumerate(band_names):
            dst.set_band_description(i + 1, name)

    print("Saved:", path)

In [10]:
# RGB 저장
save_rgb(rgb, base_profile, "aquatic_rgb.tif")

Saved: aquatic_rgb.tif


In [11]:
def apply_scl_mask(rgb_path, scl_path, out_path,
                   exclude_classes=[0, 1, 3, 8, 9, 10],
                   nodata_value=-9999):

    # rgb 읽기
    with rasterio.open(rgb_path) as src_rgb:
        rgb = src_rgb.read().astype(np.float32)   # shape: (C, H, W)
        profile = src_rgb.profile
        rgb_transform = src_rgb.transform
        rgb_crs = src_rgb.crs
        height, width = src_rgb.height, src_rgb.width

        # 밴드 설명(이름) 가져오기
        band_descriptions = src_rgb.descriptions

    # scl 읽기
    with rasterio.open(scl_path) as src_scl:
        scl = src_scl.read(1).astype(np.uint8)
        scl_transform = src_scl.transform
        scl_crs = src_scl.crs
        scl_height, scl_width = src_scl.height, src_scl.width

    # scl을 rgb 격자로 변환 (최근접, 코드값 유지)
    if (scl_transform != rgb_transform or
        scl_crs != rgb_crs or
        scl_height != height or
        scl_width != width):

        scl_reproj = np.empty((height, width), dtype=np.uint8)

        reproject(
            source=scl,
            destination=scl_reproj,
            src_transform=scl_transform,
            src_crs=scl_crs,
            dst_transform=rgb_transform,
            dst_crs=rgb_crs,
            resampling=Resampling.nearest
        )

        scl = scl_reproj

    # 마스크 생성
    mask = np.isin(scl, exclude_classes)

    # nodata 설정
    profile.update(
        dtype="float32",
        nodata=nodata_value
    )

    # 마스크 적용
    rgb[:, mask] = nodata_value

    # 저장 + 밴드 이름 복원
    with rasterio.open(out_path, "w", **profile) as dst:
        dst.write(rgb)
        for i, desc in enumerate(band_descriptions, start=1):
            if desc:
                dst.set_band_description(i, desc)

    print("Saved:", out_path)

In [12]:
apply_scl_mask(
    rgb_path="aquatic_rgb.tif",
    scl_path="sentinel2_scl.tif",
    out_path="aquatic_rgb_masked.tif"
)

Saved: aquatic_rgb_masked.tif
