<a href="https://colab.research.google.com/github/thisissamuca/GOES_16/blob/main/INDEX_NBR_GOES_16.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install rasterio



In [None]:
pip install xarray



In [None]:
pip install opencv-python



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Processamento com 2 bandas

In [None]:
import os
from collections import defaultdict
import rasterio
from typing import Dict, List, Tuple

def get_valid_band_files(directory: str) -> Dict[str, List[str]]:
    """Lista todos os arquivos de bandas válidos (M6C01-M6C16) no diretório.

    Args:
        directory: Caminho do diretório contendo os arquivos

    Returns:
        Dicionário com bandas como chaves e listas de arquivos como valores
    """
    band_prefix = 'M6C'
    valid_bands = [f'{band_prefix}{i:02d}' for i in range(1, 17)]
    file_list = os.listdir(directory)

    band_files = {}
    for band in valid_bands:
        band_files[band] = [f for f in file_list if band in f]

    return band_files

def select_bands(band_files: Dict[str, List[str]]) -> Tuple[str, str]:
    """Permite ao usuário selecionar duas bandas para operação.

    Args:
        band_files: Dicionário com arquivos disponíveis por banda

    Returns:
        Tupla com as duas bandas selecionadas
    """
    print("Bandas disponíveis com arquivos:")
    for band, files in band_files.items():
        if files:
            print(f"{band} ({len(files)} arquivos)")

    while True:
        try:
            band1 = input("Digite a primeira banda (ex: M6C03): ").strip().upper()
            if band1 not in band_files or not band_files[band1]:
                raise ValueError(f"Banda {band1} não encontrada ou sem arquivos")

            band2 = input("Digite a segunda banda (ex: M6C06): ").strip().upper()
            if band2 not in band_files or not band_files[band2]:
                raise ValueError(f"Banda {band2} não encontrada ou sem arquivos")

            return band1, band2

        except ValueError as e:
            print(f"Erro: {str(e)}. Tente novamente.")

def group_files_by_timestamp(band_files: Dict[str, List[str]], band1: str, band2: str) -> Dict[str, Dict[str, List[str]]]:
    """Agrupa arquivos das bandas selecionadas por timestamp.

    Args:
        band_files: Dicionário com arquivos por banda
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada

    Returns:
        Dicionário com timestamps como chaves e dicionário de arquivos por banda como valores
    """
    grouped_files = defaultdict(lambda: defaultdict(list))

    for band in [band1, band2]:
        for filename in band_files[band]:
            # Extrai timestamp entre '_s' e '_e'
            start = filename.find('_s') + 2
            end = filename.find('_e', start)
            if start == -1 or end == -1:
                continue
            timestamp = filename[start:end]
            grouped_files[timestamp][band].append(filename)

    return grouped_files

def calculate_normalized_difference(tif_path1: str, tif_path2: str) -> Tuple:
    """Calcula a diferença normalizada entre duas imagens raster.

    Args:
        tif_path1: Caminho para o primeiro arquivo TIFF
        tif_path2: Caminho para o segundo arquivo TIFF

    Returns:
        Tupla contendo (resultado do cálculo, perfil do raster)
    """
    with rasterio.open(tif_path1) as src1, rasterio.open(tif_path2) as src2:
        image1 = src1.read(1)
        image2 = src2.read(1)
        profile = src1.profile.copy()

        # Evita divisão por zero
        #numerator = 2 * (image1 - image2)
        #denominator = (image1 + image2 + 1)
        #denominator[denominator == 0] = 1e-10

        result = (10*image1) - (9.8*image2) + 2 ### operação principal                            ----------------------
        return result, profile

def save_result(result, profile, output_path: str) -> None:
    """Salva o resultado raster com configurações otimizadas.

    Args:
        result: Array numpy com o resultado do cálculo
        profile: Perfil/metadados do raster
        output_path: Caminho para salvar o arquivo de saída
    """
    profile.update({
        'driver': 'GTiff',
        'count': 1,
        'dtype': result.dtype,
        'compress': 'lzw',
        'tiled': True
    })

    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    with rasterio.open(output_path, 'w', **profile) as dst:
        dst.write(result, 1)

def generate_band_pairs(grouped_files: Dict[str, Dict[str, List[str]]], band1: str, band2: str) -> List[Tuple[str, str]]:
    """Gera pares de arquivos das bandas selecionadas com o mesmo timestamp.

    Args:
        grouped_files: Dicionário com arquivos agrupados por timestamp e banda
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada

    Returns:
        Lista de tuplas com pares (arquivo_banda1, arquivo_banda2) do mesmo timestamp
    """
    file_pairs = []

    for timestamp, bands in grouped_files.items():
        if band1 in bands and band2 in bands:
            # Para simplificar, pegamos o primeiro arquivo de cada banda
            if bands[band1] and bands[band2]:
                file_pairs.append((bands[band1][0], bands[band2][0]))

    return file_pairs

def batch_process(input_dir: str, output_dir: str, file_pairs: List[Tuple[str, str]], band1: str, band2: str) -> None:
    """Processa pares de arquivos das bandas selecionadas em lote.

    Args:
        input_dir: Diretório contendo os arquivos de entrada
        output_dir: Diretório para salvar os resultados
        file_pairs: Lista de tuplas com pares de arquivos para processar
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada
    """
    for file1, file2 in file_pairs:
        try:
            # Construir caminhos completos
            tif_path1 = os.path.join(input_dir, file1)
            tif_path2 = os.path.join(input_dir, file2)

            # Nome do arquivo de saída baseado no timestamp
            timestamp = file1[file1.find('_s')+2:file1.find('_e')]
            output_filename = f"MIRBI_{timestamp}.tif" ### MUDAR NOME                           ----------------------
            output_path = os.path.join(output_dir, output_filename)

            # Processamento
            result, profile = calculate_normalized_difference(tif_path1, tif_path2)
            save_result(result, profile, output_path)

            print(f"Processado: {file1} + {file2} -> {output_filename}")

        except Exception as e:
            print(f"Erro ao processar {file1} e {file2}: {str(e)}")

def main():
    """Fluxo principal de execução do programa."""
    # Configurações
    input_dir = '/content/drive/MyDrive/NBR/MASK_CMIPF'
    output_dir = '/content/drive/MyDrive/NBR/MIRBI' ### MUDAR NOMES                               ----------------------

    # 1. Listar arquivos por banda disponíveis
    band_files = get_valid_band_files(input_dir)

    # 2. Usuário seleciona as bandas
    print("\nSelecione duas bandas para operação (M6C01 a M6C16):")
    band1, band2 = select_bands(band_files)

    # 3. Agrupar arquivos por timestamp para as bandas selecionadas
    grouped_files = group_files_by_timestamp(band_files, band1, band2)

    # 4. Gerar pares com mesmo timestamp
    file_pairs = generate_band_pairs(grouped_files, band1, band2)

    if not file_pairs:
        print(f"\nNenhum par {band1}/{band2} com timestamps coincidentes encontrado.")
    else:
        # 5. Processar em lote
        print(f"\nIniciando processamento de {len(file_pairs)} pares {band1}/{band2}...")
        batch_process(input_dir, output_dir, file_pairs, band1, band2)
        print(f"\nProcessamento concluído. {len(file_pairs)} pares processados.")

if __name__ == "__main__":
    main()


Selecione duas bandas para operação (M6C01 a M6C16):
Bandas disponíveis com arquivos:
M6C01 (309 arquivos)
M6C02 (309 arquivos)
M6C03 (309 arquivos)
M6C04 (309 arquivos)
M6C05 (309 arquivos)
M6C06 (309 arquivos)
M6C07 (309 arquivos)
M6C08 (309 arquivos)
M6C09 (310 arquivos)
M6C10 (310 arquivos)
M6C11 (309 arquivos)
M6C12 (309 arquivos)
M6C13 (309 arquivos)
M6C14 (309 arquivos)
M6C15 (309 arquivos)
M6C16 (309 arquivos)
Digite a primeira banda (ex: M6C03): M6C03
Digite a segunda banda (ex: M6C06): M6C02
Digite a segunda banda (ex: M6C01): M6C01

Iniciando processamento de 300 pares M6C03/M6C02...
Erro ao processar ABI-L2-CMIPF-M6C03_G16_s20211991600230_e20211991609538_c20211991610008.tif e ABI-L2-CMIPF-M6C02_G16_s20211991600230_e20211991609538_c20211991610009.tif e ABI-L2-CMIPF-M6C01_G16_s20211991600230_e20211991609538_c20211991610009.tif: name 'src3' is not defined
Erro ao processar ABI-L2-CMIPF-M6C03_G16_s20212001600231_e20212001609539_c20212001610008.tif e ABI-L2-CMIPF-M6C02_G16_s202

KeyboardInterrupt: 

# Processamento com 3 bandas

In [None]:
import os
from collections import defaultdict
import rasterio
from typing import Dict, List, Tuple

def get_valid_band_files(directory: str) -> Dict[str, List[str]]:
    """Lista todos os arquivos de bandas válidos (M6C01-M6C16) no diretório.

    Args:
        directory: Caminho do diretório contendo os arquivos

    Returns:
        Dicionário com bandas como chaves e listas de arquivos como valores
    """
    band_prefix = 'M6C'
    valid_bands = [f'{band_prefix}{i:02d}' for i in range(1, 17)]
    file_list = os.listdir(directory)

    band_files = {}
    for band in valid_bands:
        band_files[band] = [f for f in file_list if band in f]

    return band_files

def select_bands(band_files: Dict[str, List[str]]) -> Tuple[str, str, str]:
    """Permite ao usuário selecionar três bandas para operação.

    Args:
        band_files: Dicionário com arquivos disponíveis por banda

    Returns:
        Tupla com as três bandas selecionadas
    """
    print("Bandas disponíveis com arquivos:")
    for band, files in band_files.items():
        if files:
            print(f"{band} ({len(files)} arquivos)")

    while True:
        try:
            band1 = input("Digite a primeira banda (ex: M6C03): ").strip().upper()
            if band1 not in band_files or not band_files[band1]:
                raise ValueError(f"Banda {band1} não encontrada ou sem arquivos")

            band2 = input("Digite a segunda banda (ex: M6C06): ").strip().upper()
            if band2 not in band_files or not band_files[band2]:
                raise ValueError(f"Banda {band2} não encontrada ou sem arquivos")

            band3 = input("Digite a terceira banda (ex: M6C01): ").strip().upper()
            if band3 not in band_files or not band_files[band3]:
                raise ValueError(f"Banda {band3} não encontrada ou sem arquivos")

            return band1, band2, band3

        except ValueError as e:
            print(f"Erro: {str(e)}. Tente novamente.")

def group_files_by_timestamp(band_files: Dict[str, List[str]], band1: str, band2: str, band3: str) -> Dict[str, Dict[str, List[str]]]:
    """Agrupa arquivos das bandas selecionadas por timestamp.

    Args:
        band_files: Dicionário com arquivos por banda
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada
        band3: Terceira banda selecionada

    Returns:
        Dicionário com timestamps como chaves e dicionário de arquivos por banda como valores
    """
    grouped_files = defaultdict(lambda: defaultdict(list))

    for band in [band1, band2, band3]:
        for filename in band_files[band]:
            # Extrai timestamp entre '_s' e '_e'
            start = filename.find('_s') + 2
            end = filename.find('_e', start)
            if start == -1 or end == -1:
                continue
            timestamp = filename[start:end]
            grouped_files[timestamp][band].append(filename)

    return grouped_files

def calculate_normalized_difference(tif_path1: str, tif_path2: str, tif_path3: str) -> Tuple:
    """Calcula a diferença normalizada entre três imagens raster.

    Args:
        tif_path1: Caminho para o primeiro arquivo TIFF
        tif_path2: Caminho para o segundo arquivo TIFF
        tif_path3: Caminho para o terceiro arquivo TIFF

    Returns:
        Tupla contendo (resultado do cálculo, perfil do raster)
    """
    with rasterio.open(tif_path1) as src1, rasterio.open(tif_path2) as src2, rasterio.open(tif_path3) as src3:
        image1 = src1.read(1)
        image2 = src2.read(1)
        image3 = src3.read(1)

        profile = src1.profile.copy()

        # Evita divisão por zero
        numerator = (image1 - image2)
        denominator = (image1 + (6*image2) - (7.5*image3) + 1)
        #denominator[denominator == 0] = 1e-10

        result = numerator / denominator + 2  # operação principal
        return result, profile

def save_result(result, profile, output_path: str) -> None:
    """Salva o resultado raster com configurações otimizadas.

    Args:
        result: Array numpy com o resultado do cálculo
        profile: Perfil/metadados do raster
        output_path: Caminho para salvar o arquivo de saída
    """
    profile.update({
        'driver': 'GTiff',
        'count': 1,
        'dtype': result.dtype,
        'compress': 'lzw',
        'tiled': True
    })

    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    with rasterio.open(output_path, 'w', **profile) as dst:
        dst.write(result, 1)

def generate_band_triplets(grouped_files: Dict[str, Dict[str, List[str]]], band1: str, band2: str, band3: str) -> List[Tuple[str, str, str]]:
    """Gera trios de arquivos das bandas selecionadas com o mesmo timestamp.

    Args:
        grouped_files: Dicionário com arquivos agrupados por timestamp e banda
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada
        band3: Terceira banda selecionada

    Returns:
        Lista de tuplas com trios (arquivo_banda1, arquivo_banda2, arquivo_banda3) do mesmo timestamp
    """
    file_triplets = []

    for timestamp, bands in grouped_files.items():
        if band1 in bands and band2 in bands and band3 in bands:
            # Para simplificar, pegamos o primeiro arquivo de cada banda
            if bands[band1] and bands[band2] and bands[band3]:
                file_triplets.append((bands[band1][0], bands[band2][0], bands[band3][0]))

    return file_triplets

def batch_process(input_dir: str, output_dir: str, file_triplets: List[Tuple[str, str, str]], band1: str, band2: str, band3: str) -> None:
    """Processa trios de arquivos das bandas selecionadas em lote.

    Args:
        input_dir: Diretório contendo os arquivos de entrada
        output_dir: Diretório para salvar os resultados
        file_triplets: Lista de tuplas com trios de arquivos para processar
        band1: Primeira banda selecionada
        band2: Segunda banda selecionada
        band3: Terceira banda selecionada
    """
    for file1, file2, file3 in file_triplets:
        try:
            # Construir caminhos completos
            tif_path1 = os.path.join(input_dir, file1)
            tif_path2 = os.path.join(input_dir, file2)
            tif_path3 = os.path.join(input_dir, file3)

            # Nome do arquivo de saída baseado no timestamp
            timestamp = file1[file1.find('_s')+2:file1.find('_e')]
            output_filename = f"EVI_{timestamp}.tif"
            output_path = os.path.join(output_dir, output_filename)

            # Processamento
            result, profile = calculate_normalized_difference(tif_path1, tif_path2, tif_path3)
            save_result(result, profile, output_path)

            print(f"Processado: {file1} + {file2} + {file3} -> {output_filename}")

        except Exception as e:
            print(f"Erro ao processar {file1}, {file2} e {file3}: {str(e)}")


def main():
    """Fluxo principal de execução do programa."""
    # Configurações
    input_dir = '/content/drive/MyDrive/NBR/MASK_CMIPF'
    output_dir = '/content/drive/MyDrive/NBR/EVI' ### MUDAR NOMES                               ----------------------

    # 1. Listar arquivos por banda disponíveis
    band_files = get_valid_band_files(input_dir)

    # 2. Usuário seleciona as bandas
    print("\nSelecione duas bandas para operação (M6C01 a M6C16):")
    band1, band2, band3 = select_bands(band_files)

    # 3. Agrupar arquivos por timestamp para as bandas selecionadas
    grouped_files = group_files_by_timestamp(band_files, band1, band2, band3)

    # 4. Gerar pares com mesmo timestamp
    file_pairs = generate_band_pairs(grouped_files, band1, band2, band3)

    if not file_pairs:
        print(f"\nNenhum par {band1}/{band2} com timestamps coincidentes encontrado.")
    else:
        # 5. Processar em lote
        print(f"\nIniciando processamento de {len(file_pairs)} pares {band1}/{band2}...")
        batch_process(input_dir, output_dir, file_pairs, band1, band2, band3)
        print(f"\nProcessamento concluído. {len(file_pairs)} pares processados.")

if __name__ == "__main__":
    main()


Selecione duas bandas para operação (M6C01 a M6C16):
Bandas disponíveis com arquivos:
M6C01 (309 arquivos)
M6C02 (309 arquivos)
M6C03 (309 arquivos)
M6C04 (309 arquivos)
M6C05 (309 arquivos)
M6C06 (309 arquivos)
M6C07 (309 arquivos)
M6C08 (309 arquivos)
M6C09 (310 arquivos)
M6C10 (310 arquivos)
M6C11 (309 arquivos)
M6C12 (309 arquivos)
M6C13 (309 arquivos)
M6C14 (309 arquivos)
M6C15 (309 arquivos)
M6C16 (309 arquivos)
Digite a primeira banda (ex: M6C03): M6C03
Digite a segunda banda (ex: M6C06): M6C02
Digite a terceira banda (ex: M6C01): M6C01

Iniciando processamento de 300 pares M6C03/M6C02...
Processado: ABI-L2-CMIPF-M6C03_G16_s20211991600230_e20211991609538_c20211991610008.tif + ABI-L2-CMIPF-M6C02_G16_s20211991600230_e20211991609538_c20211991610009.tif + ABI-L2-CMIPF-M6C01_G16_s20211991600230_e20211991609538_c20211991610009.tif -> EVI_20211991600230.tif
Processado: ABI-L2-CMIPF-M6C03_G16_s20212001600231_e20212001609539_c20212001610008.tif + ABI-L2-CMIPF-M6C02_G16_s20212001600231_e