In [35]:
# -*- coding: utf-8 -*-
"""
SCRIPT OTIMIZADO: EXTRA√á√ÉO EM LOTE POR PIXEL DE IMAGENS EMIT
==================================================================
Vers√£o 2.0 - Refatorado para efici√™ncia e resili√™ncia

‚Ä¢ Implementa processamento em lote (batch) para minimizar requisi√ß√µes ao GEE.
‚Ä¢ Move a l√≥gica de extra√ß√£o para o lado do servidor com .map().
‚Ä¢ Salva os resultados incrementalmente a cada lote processado.
‚Ä¢ Adiciona uma etapa de diagn√≥stico para inspecionar as propriedades do Asset.

Autor: Sistema Otimizado de Sensoriamento Remoto (Refatorado por Gemini)
Data: Agosto 2025
"""

import ee
import pandas as pd
import os
import time
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Tenta importar do Colab, se n√£o, ignora (para uso local)
try:
    from google.colab import drive
except ImportError:
    drive = None

print("üöÄ SCRIPT OTIMIZADO: EXTRA√á√ÉO EM LOTE POR PIXEL")
print("="*70)

# =============================================================================
# 1. CONFIGURA√á√ïES
# =============================================================================

class OptimizedConfig:
    """Configura√ß√µes centralizadas para o script otimizado."""
    def __init__(self):
        # Arquivos e pastas
        self.paths = {
            'input_sheet_asset_id': 'projects/ee-vladimircambiental/assets/pontos_amostrados',
            'study_area_asset_id': 'projects/ee-crypto/assets/Vegetacao_CE_Shapefile',
            'output_folder': '/content/drive/My Drive/GEE_Exports_Pixel_Batch',
            'output_prefix': 'emit_batch_extracted'
        }
        # Configura√ß√µes temporais
        self.temporal_config = {
            'start_date': '2022-08-01',
            'end_date': datetime.now().strftime('%Y-%m-%d'),
        }
        # M√°scara de qualidade
        self.quality_masks = {
            'aggregate_flag_band': 'aggregate_flag',
            'cloud_bit': 10,
            'aerosol_bit': 7,
        }
        # Propriedades a serem mantidas do Asset original
        self.output_properties = ['UniFitoMor', 'mapbiomas_class']
        # Configura√ß√£o do processamento em lote
        self.batch_config = {
            'batch_size': 15  # N√∫mero de imagens a processar por requisi√ß√£o ao GEE
        }
        # Configura√ß√£o de sele√ß√£o de classes
        self.selection_config = {
            'enabled': True,
            # ‚ö†Ô∏è ATEN√á√ÉO: Verifique os nomes exatos no diagn√≥stico antes de rodar!
            'selected_classes': ['Forma√ß√£o Florestal', 'Forma√ß√£o Sav√¢nica', 'Mangue', 'Restinga Arborizada'],
            'filter_by_property': 'mapbiomas_class'
        }

config = OptimizedConfig()

# =============================================================================
# 2. FUN√á√ïES AUXILIARES E DE DIAGN√ìSTICO
# =============================================================================

def initialize_gee():
    """Inicializa e autentica o Google Earth Engine."""
    try:
        ee.Initialize(project='ee-vladimircambiental')
    except Exception as e:
        print("Autenticando GEE...")
        ee.Authenticate()
        ee.Initialize(project='ee-vladimircambiental')
    print("‚úÖ Google Earth Engine inicializado")

def mount_drive_colab():
    """Monta o Google Drive se estiver no ambiente Colab."""
    if drive:
        drive.mount('/content/drive', force_remount=True)
        print("‚úÖ Google Drive montado")

def inspect_asset_properties(asset_id, num_features=5):
    """Carrega algumas fei√ß√µes de um Asset e exibe suas propriedades para diagn√≥stico."""
    print(f"\nüïµÔ∏è INSPECIONANDO PROPRIEDADES DO ASSET: {asset_id}")
    try:
        fc = ee.FeatureCollection(asset_id).limit(num_features)
        features = fc.getInfo()['features']
        if not features:
            print("‚ùå O Asset parece estar vazio ou inacess√≠vel.")
            return
        print(f"   Propriedades encontradas na primeira fei√ß√£o: {list(features[0]['properties'].keys())}")
        print(f"   Exemplos de valores para '{config.selection_config['filter_by_property']}':")
        for f in features:
            prop_value = f['properties'].get(config.selection_config['filter_by_property'], 'N√ÉO ENCONTRADO')
            print(f"     - '{prop_value}'")
    except Exception as e:
        print(f"‚ùå Erro ao inspecionar o Asset: {e}")

# =============================================================================
# 3. FUN√á√ïES PRINCIPAIS (L√ìGICA GEE SERVER-SIDE)
# =============================================================================

def apply_pixel_cloud_mask(image):
    """Fun√ß√£o Server-Side para aplicar m√°scara de nuvem."""
    flag_band = image.select(config.quality_masks['aggregate_flag_band'])
    qa_int = flag_band.cast({'aggregate_flag': 'int'})
    cloud_mask = qa_int.bitwiseAnd(1 << config.quality_masks['cloud_bit']).eq(0)
    aerosol_mask = qa_int.bitwiseAnd(1 << config.quality_masks['aerosol_bit']).eq(0)
    clean_mask = cloud_mask.And(aerosol_mask)
    return image.updateMask(clean_mask)

def extract_data_from_image(image, points_fc):
    """
    Fun√ß√£o Server-Side que processa UMA imagem: mascara, seleciona bandas
    e extrai dados para a cole√ß√£o de pontos.
    """
    image = ee.Image(image) # Cast para garantir que √© um ee.Image
    image_id = image.id()
    image_timestamp = image.get('system:time_start')

    image_masked = apply_pixel_cloud_mask(image)

    # Extrai valores para os pontos desta imagem
    sampled_features = image_masked.sampleRegions(
        collection=points_fc,
        scale=60,
        geometries=True,
        properties=config.output_properties
    )

    # Adiciona metadados da imagem a cada ponto extra√≠do
    def add_metadata(feature):
        return feature.set({
            'image_id': image_id,
            'image_timestamp': image_timestamp
        })

    # Retorna uma FeatureCollection com dados e metadados, ou vazia se nada foi extra√≠do
    return sampled_features.map(add_metadata)

# =============================================================================
# 4. PIPELINE PRINCIPAL DE PROCESSAMENTO EM LOTE
# =============================================================================

def main_batch_pipeline(config):
    """Pipeline otimizado que processa imagens em lotes e salva incrementalmente."""
    start_time = time.time()
    print("\nüöÄ EXECUTANDO PIPELINE DE EXTRA√á√ÉO EM LOTE")
    print("="*60)

    # 1. Inicializa√ß√£o e configura√ß√£o de pastas
    initialize_gee()
    mount_drive_colab()
    os.makedirs(config.paths['output_folder'], exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_path = os.path.join(
        config.paths['output_folder'],
        f"{config.paths['output_prefix']}_{timestamp}.csv"
    )
    print(f"üíæ Arquivo de sa√≠da ser√° salvo incrementalmente em: {output_path}")


    # 2. Diagn√≥stico e carregamento dos pontos
    inspect_asset_properties(config.paths['input_sheet_asset_id'])
    points_fc = ee.FeatureCollection(config.paths['input_sheet_asset_id'])

    if config.selection_config['enabled']:
        print(f"\nüéØ FILTRANDO PONTOS pela propriedade '{config.selection_config['filter_by_property']}'...")
        class_filter = ee.Filter.inList(
            config.selection_config['filter_by_property'],
            config.selection_config['selected_classes']
        )
        points_fc = points_fc.filter(class_filter)
        total_points = points_fc.size().getInfo()
        print(f"‚úÖ Total de pontos ap√≥s filtro: {total_points:,}")
        if total_points == 0:
            print("‚ùå O filtro resultou em 0 pontos. Verifique os nomes em 'selected_classes' com base na inspe√ß√£o acima. Abortando.")
            return

    # 3. Carregamento da √°rea de estudo e da cole√ß√£o de imagens
    study_area = ee.FeatureCollection(config.paths['study_area_asset_id']).geometry().dissolve()
    emit_collection = (ee.ImageCollection('NASA/EMIT/L2A/RFL')
                       .filterDate(config.temporal_config['start_date'], config.temporal_config['end_date'])
                       .filterBounds(study_area))

    image_ids = emit_collection.aggregate_array('system:id').getInfo()
    total_images = len(image_ids)
    if total_images == 0:
        print("‚ùå Nenhuma imagem EMIT encontrada para a √°rea e per√≠odo. Abortando.")
        return
    print(f"\nüõ∞Ô∏è {total_images} imagens EMIT encontradas. Iniciando processamento em lotes de {config.batch_config['batch_size']}.")

    # 4. Loop de processamento em lote
    total_extracted_points = 0
    for i in range(0, total_images, config.batch_config['batch_size']):
        batch_ids = image_ids[i:i + config.batch_config['batch_size']]
        batch_num = (i // config.batch_config['batch_size']) + 1
        num_batches = (total_images + config.batch_config['batch_size'] - 1) // config.batch_config['batch_size']

        print(f"\n--- üîÑ Processando Lote {batch_num}/{num_batches} ({len(batch_ids)} imagens) ---")

        # Filtra a cole√ß√£o para o lote atual de imagens
        batch_collection = ee.ImageCollection(batch_ids)

        # Mapeia a fun√ß√£o de extra√ß√£o em toda a cole√ß√£o do lote (SERVER-SIDE)
        extracted_fc = batch_collection.map(lambda image: extract_data_from_image(image, points_fc)).flatten()

        # Baixa os resultados APENAS para este lote
        try:
            batch_data = extracted_fc.getInfo()['features']
            if not batch_data:
                print("   ‚ö†Ô∏è Nenhum ponto v√°lido extra√≠do neste lote.")
                continue

            # Converte os resultados para um DataFrame
            props_list = [f['properties'] for f in batch_data]
            df_batch = pd.DataFrame(props_list)

            # Adiciona coordenadas a partir da geometria
            geometries = [f['geometry']['coordinates'] for f in batch_data if f.get('geometry')]
            if geometries:
                df_coords = pd.DataFrame(geometries, columns=['longitude', 'latitude'], index=df_batch.index)
                df_batch = pd.concat([df_batch, df_coords], axis=1)

            # Salva o lote no arquivo CSV (anexa se o arquivo j√° existe)
            is_first_batch = not os.path.exists(output_path)
            df_batch.to_csv(output_path, mode='a', header=is_first_batch, index=False)

            num_extracted = len(df_batch)
            total_extracted_points += num_extracted
            print(f"   ‚úÖ Lote conclu√≠do. {num_extracted:,} pontos extra√≠dos e salvos.")

        except Exception as e:
            print(f"   ‚ùå Erro ao processar ou baixar o lote {batch_num}: {e}")
            print("   Pulando para o pr√≥ximo lote...")

    # 5. Relat√≥rio Final
    end_time = time.time()
    print("\n" + "="*60)
    print("üéâ PIPELINE CONCLU√çDO!")
    print(f"   - Tempo total: {((end_time - start_time) / 60):.2f} minutos")
    print(f"   - Total de pontos extra√≠dos: {total_extracted_points:,}")
    print(f"   - Resultados salvos em: {output_path}")
    print("="*60)


# =============================================================================
# 5. EXECU√á√ÉO
# =============================================================================

# Para executar, chame a fun√ß√£o principal com o objeto de configura√ß√£o.
# main_batch_pipeline(config)
print("‚ö° PRONTO PARA EXECUTAR O SCRIPT OTIMIZADO")
print("Execute: main_batch_pipeline(config)")

üöÄ SCRIPT OTIMIZADO: EXTRA√á√ÉO EM LOTE POR PIXEL
‚ö° PRONTO PARA EXECUTAR O SCRIPT OTIMIZADO
Execute: main_batch_pipeline(config)


In [36]:
# -*- coding: utf-8 -*-
"""
SISTEMA DE EXTRA√á√ÉO DE DADOS EMIT COM ARQUITETURA DE PIPELINE
==================================================================
Vers√£o 3.0 - Refatorado para Programa√ß√£o Orientada a Dados (ETL)

‚Ä¢ Estrutura baseada no padr√£o Extra√ß√£o-Transforma√ß√£o-Carga (ETL).
‚Ä¢ Fun√ß√µes com responsabilidades √∫nicas e claras.
‚Ä¢ Uso de geradores (generators) para processamento em fluxo e efici√™ncia de mem√≥ria.
‚Ä¢ Configura√ß√£o centralizada e documenta√ß√£o aprimorada com type hints.

Autor: Sistema Otimizado de Sensoriamento Remoto (Refatorado por Gemini)
Data: Agosto 2025
"""

import ee
import pandas as pd
import os
import time
from datetime import datetime
from typing import List, Dict, Any, Generator, Tuple

# Tenta importar do Colab, se n√£o, ignora.
try:
    from google.colab import drive
except ImportError:
    drive = None

print("üöÄ ARQUITETURA DE PIPELINE DE DADOS EMIT (ETL)")
print("="*70)

# =============================================================================
# 1. CONFIGURA√á√ÉO (O "PROJETO" DO PIPELINE)
# =============================================================================

class PipelineConfig:
    """
    Estrutura de dados que cont√©m todas as configura√ß√µes do pipeline.
    Funciona como o "contrato" que define o que o pipeline far√°.
    """
    def __init__(self):
        self.gee_project = 'ee-vladimircambiental'
        self.paths = {
            'input_points_asset': 'projects/ee-vladimircambiental/assets/pontos_amostrados',
            'study_area_asset': 'projects/ee-crypto/assets/Vegetacao_CE_Shapefile',
            'output_folder': '/content/drive/My Drive/GEE_Data_Pipelines/EMIT_Extraction',
            'output_prefix': 'emit_extraction_results'
        }
        self.temporal = {'start_date': '2022-08-01', 'end_date': datetime.now().strftime('%Y-%m-%d')}
        self.processing = {'batch_size': 20}
        self.selection = {
            'enabled': True,
            'filter_property': 'mapbiomas_class',
            # ‚ö†Ô∏è ATEN√á√ÉO: Use os IDs corretos (como strings) aqui!
            'selected_class_ids': ['3', '4', '5', '49']
        }
        self.schema = {
            'properties_to_keep': ['UniFitoMor', 'mapbiomas_class']
        }

# =============================================================================
# 2. FUN√á√ïES DO PIPELINE (L√ìGICA ESTRUTURADA)
# =============================================================================

# --- FASE 1: EXTRACT (Extra√ß√£o) ---

def _fetch_gee_data(config: PipelineConfig) -> Tuple[ee.FeatureCollection, ee.ImageCollection]:
    """
    Busca os dados brutos e aplica a amostragem estratificada.
    """
    print("\n--- [E] FASE DE EXTRA√á√ÉO INICIADA ---")
    try:
        ee.Initialize(project=config.gee_project)
    except Exception:
        ee.Authenticate()
        ee.Initialize(project=config.gee_project)
    print("‚úÖ GEE inicializado.")

    # 1. Carrega todos os pontos do Asset
    print(f"   - Carregando pontos de: {config.paths['input_points_asset']}")
    all_points_fc = ee.FeatureCollection(config.paths['input_points_asset'])

    # 2. Aplica a amostragem estratificada
    if config.selection['enabled']:
        points_fc = _perform_stratified_sample(
            points_fc=all_points_fc,
            class_property=config.selection['filter_property'],
            class_ids=config.selection['selected_class_ids'],
            points_per_class=150  # O n√∫mero de pontos que voc√™ solicitou por classe
        )
    else:
        # Se a sele√ß√£o n√£o estiver habilitada, usa todos os pontos
        points_fc = all_points_fc

    point_count = points_fc.size().getInfo()
    print(f"   - {point_count:,} pontos selecionados para processamento (AP√ìS AMOSTRAGEM ESTRATIFICADA).")
    if point_count == 0:
        raise ValueError("A amostragem resultou em 0 pontos. Verifique se as classes existem no Asset.")

    # 3. Carrega a √°rea de estudo e a cole√ß√£o de imagens (como antes)
    study_area = ee.FeatureCollection(config.paths['study_area_asset']).geometry().dissolve()
    print("   - Carregando cole√ß√£o de imagens EMIT...")
    image_collection = (ee.ImageCollection('NASA/EMIT/L2A/RFL')
                        .filterDate(config.temporal['start_date'], config.temporal['end_date'])
                        .filterBounds(study_area))
    image_count = image_collection.size().getInfo()
    print(f"   - {image_count} imagens encontradas.")
    if image_count == 0:
        raise ValueError("Nenhuma imagem EMIT encontrada para a √°rea e per√≠odo.")

    print("--- [E] FASE DE EXTRA√á√ÉO CONCLU√çDA ---")
    return points_fc, image_collection

# --- FASE 2: TRANSFORM (Transforma√ß√£o) ---
# =============================================================================
# FUN√á√ÉO DE TRANSFORMA√á√ÉO CORRIGIDA
# =============================================================================

def _process_batches_generator(points_fc: ee.FeatureCollection, image_collection: ee.ImageCollection, config: PipelineConfig) -> Generator[pd.DataFrame, None, None]:
    """
    Processa os dados em lotes. √â um GERADOR: processa um lote, entrega (yields)
    o resultado e libera a mem√≥ria antes de processar o pr√≥ximo.
    """
    print("\n--- [T] FASE DE TRANSFORMA√á√ÉO INICIADA (Processamento em Lote) ---")
    image_ids = image_collection.aggregate_array('system:id').getInfo()
    total_images = len(image_ids)
    batch_size = config.processing['batch_size']
    num_batches = (total_images + batch_size - 1) // batch_size

    # Fun√ß√£o server-side para mascarar e extrair dados de uma imagem
    def extract_from_image(image):
        image = ee.Image(image)

        # --- IN√çCIO DA CORRE√á√ÉO ---
        # Seleciona a banda de flag e a CONVERTE para inteiro antes das opera√ß√µes de bit
        flag_band = image.select('aggregate_flag').toInt()
        # --- FIM DA CORRE√á√ÉO ---

        # O resto da l√≥gica da m√°scara agora funciona com a banda inteira
        clean_mask = flag_band.bitwiseAnd(1 << 10).eq(0).And(flag_band.bitwiseAnd(1 << 7).eq(0))

        sampled = image.updateMask(clean_mask).sampleRegions(
            collection=points_fc, scale=60, geometries=True, properties=config.schema['properties_to_keep']
        )
        return sampled.map(lambda f: f.set({'image_id': image.id(), 'image_timestamp': image.get('system:time_start')}))

    for i in range(0, total_images, batch_size):
        batch_ids = image_ids[i:i + batch_size]
        print(f"   - Processando lote {i//batch_size + 1}/{num_batches}...")

        batch_collection = ee.ImageCollection(batch_ids)

        extracted_fc = batch_collection.map(extract_from_image).flatten()

        try:
            batch_data = extracted_fc.getInfo()['features']
            if not batch_data:
                print("     - Nenhum ponto v√°lido extra√≠do neste lote (provavelmente devido √† m√°scara de nuvem).")
                continue

            props_list = [f['properties'] for f in batch_data]
            df_batch = pd.DataFrame(props_list)
            geometries = [f.get('geometry', {}).get('coordinates') for f in batch_data]
            coords = [c for c in geometries if c]
            if coords:
                df_coords = pd.DataFrame(coords, columns=['longitude', 'latitude'], index=df_batch.index)
                df_batch = pd.concat([df_batch, df_coords], axis=1)

            print(f"     - {len(df_batch)} pontos processados.")
            yield df_batch

        except Exception as e:
            # Imprime uma mensagem de erro mais detalhada do GEE
            detailed_error = str(e)
            if "Image.bitwiseAnd" in detailed_error:
                 print(f"     - ‚ùå Erro de tipo de dado (bitwiseAnd): {detailed_error}. Pulando.")
            else:
                 print(f"     - ‚ùå Erro ao processar o lote: {detailed_error}. Pulando.")


    print("--- [T] FASE DE TRANSFORMA√á√ÉO CONCLU√çDA ---")

# --- FASE 3: LOAD (Carga/Salvamento) ---

def _save_results_incrementally(processed_data_generator: Generator[pd.DataFrame, None, None], config: PipelineConfig) -> Tuple[str, int]:
    """
    Recebe o gerador de dados processados e salva cada lote em um arquivo CSV.
    √â respons√°vel pela persist√™ncia dos dados.
    """
    print("\n--- [L] FASE DE CARGA INICIADA (Salvamento Incremental) ---")
    os.makedirs(config.paths['output_folder'], exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_path = os.path.join(
        config.paths['output_folder'],
        f"{config.paths['output_prefix']}_{timestamp}.csv"
    )
    print(f"   - Arquivo de sa√≠da: {output_path}")

    total_saved_points = 0
    is_first_batch = True
    for batch_df in processed_data_generator:
        batch_df.to_csv(output_path, mode='a', header=is_first_batch, index=False)
        total_saved_points += len(batch_df)
        print(f"   - Lote salvo. Total de pontos no arquivo: {total_saved_points:,}")
        if is_first_batch:
            is_first_batch = False

    print("--- [L] FASE DE CARGA CONCLU√çDA ---")
    return output_path, total_saved_points

# =============================================================================
# 4. ORQUESTRADOR DO PIPELINE
# =============================================================================

def run_pipeline(config: PipelineConfig):
    """
    Orquestra a execu√ß√£o do pipeline ETL.
    √â o ponto de entrada principal, com uma l√≥gica limpa e leg√≠vel.
    """
    start_time = time.time()
    print("\n" + "="*70)
    print("üöÄ ORQUESTRANDO EXECU√á√ÉO DO PIPELINE DE DADOS EMIT")
    print("="*70)

    try:
        # 1. Extrair
        points_fc, image_collection = _fetch_gee_data(config)

        # 2. Transformar (obter o gerador)
        processed_data_generator = _process_batches_generator(points_fc, image_collection, config)

        # 3. Carregar (salvar os resultados do gerador)
        output_file, total_points = _save_results_incrementally(processed_data_generator, config)

        # Relat√≥rio Final
        end_time = time.time()
        print("\n" + "="*70)
        print("üéâ PIPELINE EXECUTADO COM SUCESSO!")
        print(f"   - Tempo total: {((end_time - start_time) / 60):.2f} minutos")
        print(f"   - Total de pontos extra√≠dos e salvos: {total_points:,}")
        print(f"   - Resultados em: {output_file}")
        print("="*70)

    except Exception as e:
        print("\n" + "!"*70)
        print(f"üí• ERRO CR√çTICO DURANTE A EXECU√á√ÉO DO PIPELINE: {e}")
        print("!"*70)

# =============================================================================
# 5. EXECU√á√ÉO
# =============================================================================

# 1. Crie uma inst√¢ncia da sua configura√ß√£o
config = PipelineConfig()

# 2. (OPCIONAL) Ajuste qualquer par√¢metro antes de executar
# Exemplo:
# config.selection['selected_class_ids'] = ['3'] # Para testar com uma √∫nica classe
config.processing['batch_size'] = 10           # Para lotes menores durante o teste

# 3. Execute o pipeline
run_pipeline(config)

üöÄ ARQUITETURA DE PIPELINE DE DADOS EMIT (ETL)

üöÄ ORQUESTRANDO EXECU√á√ÉO DO PIPELINE DE DADOS EMIT

--- [E] FASE DE EXTRA√á√ÉO INICIADA ---
‚úÖ GEE inicializado.
   - Carregando pontos de: projects/ee-vladimircambiental/assets/pontos_amostrados

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
üí• ERRO CR√çTICO DURANTE A EXECU√á√ÉO DO PIPELINE: name '_perform_stratified_sample' is not defined
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!


In [37]:
def _perform_stratified_sample(
    points_fc: ee.FeatureCollection,
    class_property: str,
    class_ids: List[int],
    points_per_class: int
) -> ee.FeatureCollection:
    """
    Executa uma amostragem estratificada em uma FeatureCollection.

    Args:
        points_fc: A cole√ß√£o de pontos de entrada.
        class_property: A propriedade usada para definir os estratos (ex: 'mapbiomas_class').
        class_ids: Uma lista de IDs de classe para amostrar.
        points_per_class: O n√∫mero de pontos a serem amostrados de cada classe.

    Returns:
        Uma FeatureCollection contendo a amostra estratificada.
    """
    print(f"   - Iniciando amostragem estratificada para {len(class_ids)} classes...")

    sampled_collections = []
    for class_id in class_ids:
        # Filtra a cole√ß√£o para a classe atual
        class_subset = points_fc.filter(ee.Filter.eq(class_property, class_id))

        # Adiciona uma coluna aleat√≥ria, ordena e limita para obter a amostra
        sampled_subset = class_subset.randomColumn().sort('random').limit(points_per_class)

        # Armazena a cole√ß√£o amostrada (um objeto GEE) na lista
        sampled_collections.append(sampled_subset)

    # Une todas as cole√ß√µes amostradas em uma s√≥
    # ee.FeatureCollection() aceita uma lista de cole√ß√µes e as une.
    # .flatten() garante que o resultado seja uma √∫nica FeatureCollection e n√£o uma cole√ß√£o de cole√ß√µes.
    final_sample = ee.FeatureCollection(sampled_collections).flatten()

    return final_sample

In [38]:
def _fetch_gee_data(config: PipelineConfig) -> Tuple[ee.FeatureCollection, ee.ImageCollection]:
    """
    Busca os dados brutos e aplica a amostragem estratificada.
    """
    print("\n--- [E] FASE DE EXTRA√á√ÉO INICIADA ---")
    try:
        ee.Initialize(project=config.gee_project)
    except Exception:
        ee.Authenticate()
        ee.Initialize(project=config.gee_project)
    print("‚úÖ GEE inicializado.")

    # 1. Carrega todos os pontos do Asset
    print(f"   - Carregando pontos de: {config.paths['input_points_asset']}")
    all_points_fc = ee.FeatureCollection(config.paths['input_points_asset'])

    # 2. Aplica a amostragem estratificada
    if config.selection['enabled']:
        points_fc = _perform_stratified_sample(
            points_fc=all_points_fc,
            class_property=config.selection['filter_property'],
            class_ids=config.selection['selected_class_ids'],
            points_per_class=100  # O n√∫mero de pontos que voc√™ solicitou por classe
        )
    else:
        # Se a sele√ß√£o n√£o estiver habilitada, usa todos os pontos
        points_fc = all_points_fc

    point_count = points_fc.size().getInfo()
    print(f"   - {point_count:,} pontos selecionados para processamento (AP√ìS AMOSTRAGEM ESTRATIFICADA).")
    if point_count == 0:
        raise ValueError("A amostragem resultou em 0 pontos. Verifique se as classes existem no Asset.")

    # 3. Carrega a √°rea de estudo e a cole√ß√£o de imagens (como antes)
    study_area = ee.FeatureCollection(config.paths['study_area_asset']).geometry().dissolve()
    print("   - Carregando cole√ß√£o de imagens EMIT...")
    image_collection = (ee.ImageCollection('NASA/EMIT/L2A/RFL')
                        .filterDate(config.temporal['start_date'], config.temporal['end_date'])
                        .filterBounds(study_area))
    image_count = image_collection.size().getInfo()
    print(f"   - {image_count} imagens encontradas.")
    if image_count == 0:
        raise ValueError("Nenhuma imagem EMIT encontrada para a √°rea e per√≠odo.")

    print("--- [E] FASE DE EXTRA√á√ÉO CONCLU√çDA ---")
    return points_fc, image_collection

In [39]:
# ===================================================================
# PASSO FINAL (CORRIGIDO): CONFIGURAR COM O TIPO DE DADO CORRETO E EXECUTAR
# ===================================================================

# 1. Crie uma inst√¢ncia da configura√ß√£o
config = PipelineConfig()

# 2. Corrija a lista de classes para usar N√öMEROS INTEIROS, conforme o Asset
config.selection['enabled'] = True
config.selection['selected_class_ids'] = [4] # <-- CORRE√á√ÉO APLICADA AQUI
config.selection['filter_property'] = 'mapbiomas_class'

print(f"‚úÖ Configura√ß√£o atualizada para usar os IDs num√©ricos (Integers): {config.selection['selected_class_ids']}")
print("---")

# 3. Execute o pipeline principal com a configura√ß√£o correta.
run_pipeline(config)

‚úÖ Configura√ß√£o atualizada para usar os IDs num√©ricos (Integers): [4]
---

üöÄ ORQUESTRANDO EXECU√á√ÉO DO PIPELINE DE DADOS EMIT

--- [E] FASE DE EXTRA√á√ÉO INICIADA ---
‚úÖ GEE inicializado.
   - Carregando pontos de: projects/ee-vladimircambiental/assets/pontos_amostrados
   - Iniciando amostragem estratificada para 1 classes...
   - 100 pontos selecionados para processamento (AP√ìS AMOSTRAGEM ESTRATIFICADA).
   - Carregando cole√ß√£o de imagens EMIT...


KeyboardInterrupt: 

# **ETL 3.0**:



---

ESTRATIFICA√á√ÉO MULTIVARIADA

In [47]:
# -*- coding: utf-8 -*-
"""
SISTEMA DE EXTRA√á√ÉO DE DADOS EMIT COM ARQUITETURA DE PIPELINE
==================================================================
Vers√£o 3.0 - Refatorado para Programa√ß√£o Orientada a Dados (ETL)

‚Ä¢ Estrutura baseada no padr√£o Extra√ß√£o-Transforma√ß√£o-Carga (ETL).
‚Ä¢ Fun√ß√µes com responsabilidades √∫nicas e claras.
‚Ä¢ Uso de geradores (generators) para processamento em fluxo e efici√™ncia de mem√≥ria.
‚Ä¢ Configura√ß√£o centralizada e documenta√ß√£o aprimorada com type hints.

Autor: Sistema Otimizado de Sensoriamento Remoto (Refatorado por Gemini)
Data: Agosto 2025
"""

import ee
import pandas as pd
import os
import time
from datetime import datetime
from typing import List, Dict, Any, Generator, Tuple

# Tenta importar do Colab, se n√£o, ignora.
try:
    from google.colab import drive
except ImportError:
    drive = None

print("üöÄ ARQUITETURA DE PIPELINE DE DADOS EMIT (ETL)")
print("="*70)

# =============================================================================
# NOVA FUN√á√ÉO AUXILIAR: AMOSTRAGEM ESTRATIFICADA SIMPLES
# =============================================================================
def _perform_single_stratified_sample(
    points_fc: ee.FeatureCollection,
    strata_property: str,
    points_per_stratum: int
) -> ee.FeatureCollection:
    """
    Executa amostragem estratificada em uma √∫nica propriedade.

    Args:
        points_fc: A cole√ß√£o de fei√ß√µes a ser amostrada.
        strata_property: A propriedade usada para definir os estratos (ex: 'mapbiomas_class').
        points_per_stratum: O n√∫mero de pontos a ser em sorteados de cada estrato.

    Returns:
        Uma FeatureCollection contendo os pontos amostrados.
    """
    print(f"   - Iniciando amostragem estratificada simples por '{strata_property}'...")

    # 1. Obt√©m a lista de estratos √∫nicos (classes) do GEE
    unique_strata = points_fc.aggregate_array(strata_property).distinct().getInfo()
    print(f"   - Encontrados {len(unique_strata)} estratos de '{strata_property}': {unique_strata}")

    if not unique_strata:
        return ee.FeatureCollection([])

    # 2. Itera sobre cada estrato, filtra, amostra e coleta os resultados
    sampled_collections = []
    for stratum in unique_strata:
        # Filtra a cole√ß√£o para incluir apenas os pontos do estrato atual
        stratum_subset = points_fc.filter(ee.Filter.eq(strata_property, stratum))

        # Realiza a amostragem aleat√≥ria dentro do estrato
        # .randomColumn() adiciona uma coluna 'random' com valores de 0 a 1
        # .sort('random') embaralha os pontos
        # .limit() pega o n√∫mero desejado de pontos
        sampled_subset = stratum_subset.randomColumn().sort('random').limit(points_per_stratum)

        sampled_collections.append(sampled_subset)

    # 3. Une todas as cole√ß√µes amostradas em uma s√≥
    final_sample = ee.FeatureCollection(sampled_collections).flatten()

    return final_sample


# =============================================================================
# 1. CONFIGURA√á√ÉO (ATUALIZADA)
# =============================================================================
class PipelineConfig:
    """
    Estrutura de dados que cont√©m todas as configura√ß√µes do pipeline.
    """
    def __init__(self):
        self.gee_project = 'ee-vladimircambiental'
        self.paths = {
            'input_points_asset': 'projects/ee-vladimircambiental/assets/pontos_amostrados',
            'study_area_asset': 'projects/ee-crypto/assets/APA_chapada_araripe',
            'output_folder': '/content/drive/My Drive/GEE_Data_Pipelines/EMIT_Preconfigured_Filter',
            'output_prefix': 'emit_extract'
        }
        self.temporal = {'start_date': '2022-08-01', 'end_date': datetime.now().strftime('%Y-%m-%d')}
        self.processing = {'batch_size': 20}
        self.schema = {
            'properties_to_keep': ['UniFitoMor', 'mapbiomas_class']
        }

        # Modo Interativo (desabilitado para este caso)
        self.interactive_mode = {
            'enabled': False,
            'filter_column': 'UniFitoMor',
            'stratify_column': 'mapbiomas_class',
            'points_per_stratum': 15
        }

        # --- NOVA SE√á√ÉO PARA FILTRO PR√â-CONFIGURADO ---
        # Usado quando o modo interativo est√° desabilitado.
        self.preconfigured_filter = {
            'enabled': True,  # Ativa o filtro est√°tico
            'filter_column': 'UniFitoMor', # A coluna pela qual filtrar
            'selected_values': ['Carrasco'] # O(s) valor(es) a ser(em) extra√≠do(s). Use strings!
        }

        # --- Adicionado a se√ß√£o 'selection' que estava faltando ---
        self.selection = {
            'enabled': True,
            'filter_property_1': 'UniFitoMor', # Primeira propriedade para estratifica√ß√£o
            'filter_property_2': 'mapbiomas_class', # Segunda propriedade para estratifica√ß√£o
            'points_per_class': 10 # N√∫mero de pontos por estrato combinado (UniFitoMor + mapbiomas_class)
        }
# =============================================================================
# 5. EXECU√á√ÉO (ATUALIZADA)
# =============================================================================

# 1. Crie uma inst√¢ncia da sua configura√ß√£o
config = PipelineConfig()

# 2. (OPCIONAL) Ajuste qualquer par√¢metro antes de executar
config.selection['points_per_class'] = 1 # Ex: Reduzir pontos por estrato para um teste r√°pido
config.processing['batch_size'] = 1

print("‚úÖ Configura√ß√£o de amostragem estratificada dupla pronta.")
print(f"   - Estratificando por: '{config.selection['filter_property_1']}' e '{config.selection['filter_property_2']}'")
print(f"   - Pontos por estrato combinado: {config.selection['points_per_class']}")
print("---")


# 3. Execute o pipeline com a nova configura√ß√£o
run_pipeline(config)
# =============================================================================
# 2. FUN√á√ïES DO PIPELINE (L√ìGICA ESTRUTURADA)
# =============================================================================

# --- FASE 1: EXTRACT (Extra√ß√£o) ---
# SUBSTITUA ESTA FUN√á√ÉO
def _fetch_gee_data(config: PipelineConfig) -> Tuple[ee.FeatureCollection, ee.ImageCollection]:
    """
    Busca os dados brutos e aplica a amostragem estratificada dupla sem usar .getInfo() para contar.
    """
    print("\\n--- [E] FASE DE EXTRA√á√ÉO INICIADA ---")
    try:
        ee.Initialize(project=config.gee_project)
    except Exception:
        ee.Authenticate()
        ee.Initialize(project=config.gee_project)
    print("‚úÖ GEE inicializado.")

    print(f"   - Carregando pontos de: {config.paths['input_points_asset']}")
    all_points_fc = ee.FeatureCollection(config.paths['input_points_asset'])

    if config.selection['enabled']:
        # --- IN√çCIO DA MUDAN√áA ---
        points_fc, unique_strata_list = _perform_double_stratified_sample(
            points_fc=all_points_fc,
            property1=config.selection['filter_property_1'],
            property2=config.selection['filter_property_2'],
            points_per_stratum=config.selection['points_per_class']
        )

        # A nova verifica√ß√£o √© feita na lista de estratos, que j√° est√° na mem√≥ria local
        if not unique_strata_list:
            raise ValueError("A amostragem resultou em 0 estratos. Verifique as propriedades no Asset.")

        # Imprime uma contagem te√≥rica, sem for√ßar computa√ß√£o no GEE
        theoretical_max = len(unique_strata_list) * config.selection['points_per_class']
        print(f"   - Amostragem definida. M√°ximo te√≥rico de {theoretical_max:,} pontos a serem processados.")
        # --- FIM DA MUDAN√áA ---
    else:
        points_fc = all_points_fc

    # O resto da fun√ß√£o permanece igual
    study_area = ee.FeatureCollection(config.paths['study_area_asset']).geometry().dissolve()
    print("   - Carregando cole√ß√£o de imagens EMIT...")
    image_collection = (ee.ImageCollection('NASA/EMIT/L2A/RFL')
                        .filterDate(config.temporal['start_date'], config.temporal['end_date'])
                        .filterBounds(study_area))
    image_count = image_collection.size().getInfo()
    print(f"   - {image_count} imagens encontradas.")
    if image_count == 0:
        raise ValueError("Nenhuma imagem EMIT encontrada para a √°rea e per√≠odo.")

    print("--- [E] FASE DE EXTRA√á√ÉO CONCLU√çDA ---")
    return points_fc, image_collection

# --- FASE 2: TRANSFORM (Transforma√ß√£o) ---
# =============================================================================
# FUN√á√ÉO DE TRANSFORMA√á√ÉO CORRIGIDA
# =============================================================================

def _process_batches_generator(points_fc: ee.FeatureCollection, image_collection: ee.ImageCollection, config: PipelineConfig) -> Generator[pd.DataFrame, None, None]:
    """
    Processa os dados em lotes. √â um GERADOR: processa um lote, entrega (yields)
    o resultado e libera a mem√≥ria antes de processar o pr√≥ximo.
    """
    print("\n--- [T] FASE DE TRANSFORMA√á√ÉO INICIADA (Processamento em Lote) ---")
    image_ids = image_collection.aggregate_array('system:id').getInfo()
    total_images = len(image_ids)
    batch_size = config.processing['batch_size']
    num_batches = (total_images + batch_size - 1) // batch_size

    # Fun√ß√£o server-side para mascarar e extrair dados de uma imagem
    def extract_from_image(image):
        image = ee.Image(image)

        # --- IN√çCIO DA CORRE√á√ÉO ---
        # Seleciona a banda de flag e a CONVERTE para inteiro antes das opera√ß√µes de bit
        flag_band = image.select('aggregate_flag').toInt()
        # --- FIM DA CORRE√á√ÉO ---

        # O resto da l√≥gica da m√°scara agora funciona com a banda inteira
        clean_mask = flag_band.bitwiseAnd(1 << 10).eq(0).And(flag_band.bitwiseAnd(1 << 7).eq(0))

        sampled = image.updateMask(clean_mask).sampleRegions(
            collection=points_fc, scale=60, geometries=True, properties=config.schema['properties_to_keep']
        )
        return sampled.map(lambda f: f.set({'image_id': image.id(), 'image_timestamp': image.get('system:time_start')}))

    for i in range(0, total_images, batch_size):
        batch_ids = image_ids[i:i + batch_size]
        print(f"   - Processando lote {i//batch_size + 1}/{num_batches}...")

        batch_collection = ee.ImageCollection(batch_ids)

        extracted_fc = batch_collection.map(extract_from_image).flatten()

        try:
            batch_data = extracted_fc.getInfo()['features']
            if not batch_data:
                print("     - Nenhum ponto v√°lido extra√≠do neste lote (provavelmente devido √† m√°scara de nuvem).")
                continue

            props_list = [f['properties'] for f in batch_data]
            df_batch = pd.DataFrame(props_list)
            geometries = [f.get('geometry', {}).get('coordinates') for f in batch_data]
            coords = [c for c in geometries if c]
            if coords:
                df_coords = pd.DataFrame(coords, columns=['longitude', 'latitude'], index=df_batch.index)
                df_batch = pd.concat([df_batch, df_coords], axis=1)

            print(f"     - {len(df_batch)} pontos processados.")
            yield df_batch

        except Exception as e:
            # Imprime uma mensagem de erro mais detalhada do GEE
            detailed_error = str(e)
            if "Image.bitwiseAnd" in detailed_error:
                 print(f"     - ‚ùå Erro de tipo de dado (bitwiseAnd): {detailed_error}. Pulando.")
            else:
                 print(f"     - ‚ùå Erro ao processar o lote: {detailed_error}. Pulando.")


    print("--- [T] FASE DE TRANSFORMA√á√ÉO CONCLU√çDA ---")

# --- FASE 3: LOAD (Carga/Salvamento) ---

def _save_results_incrementally(processed_data_generator: Generator[pd.DataFrame, None, None], config: PipelineConfig) -> Tuple[str, int]:
    """
    Recebe o gerador de dados processados e salva cada lote em um arquivo CSV.
    √â respons√°vel pela persist√™ncia dos dados.
    """
    print("\n--- [L] FASE DE CARGA INICIADA (Salvamento Incremental) ---")
    os.makedirs(config.paths['output_folder'], exist_ok=True)
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    output_path = os.path.join(
        config.paths['output_folder'],
        f"{config.paths['output_prefix']}_{timestamp}.csv"
    )
    print(f"   - Arquivo de sa√≠da: {output_path}")

    total_saved_points = 0
    is_first_batch = True
    for batch_df in processed_data_generator:
        batch_df.to_csv(output_path, mode='a', header=is_first_batch, index=False)
        total_saved_points += len(batch_df)
        print(f"   - Lote salvo. Total de pontos no arquivo: {total_saved_points:,}")
        if is_first_batch:
            is_first_batch = False

    print("--- [L] FASE DE CARGA CONCLU√çDA ---")
    return output_path, total_saved_points

# =============================================================================
# 4. ORQUESTRADOR DO PIPELINE
# =============================================================================

def run_pipeline(config: PipelineConfig):
    """
    Orquestra a execu√ß√£o do pipeline ETL.
    √â o ponto de entrada principal, com uma l√≥gica limpa e leg√≠vel.
    """
    start_time = time.time()
    print("\n" + "="*70)
    print("üöÄ ORQUESTRANDO EXECU√á√ÉO DO PIPELINE DE DADOS EMIT")
    print("="*70)

    try:
        # 1. Extrair
        points_fc, image_collection = _fetch_gee_data(config)

        # 2. Transformar (obter o gerador)
        processed_data_generator = _process_batches_generator(points_fc, image_collection, config)

        # 3. Carregar (salvar os resultados do gerador)
        output_file, total_points = _save_results_incrementally(processed_data_generator, config)

        # Relat√≥rio Final
        end_time = time.time()
        print("\n" + "="*70)
        print("üéâ PIPELINE EXECUTADO COM SUCESSO!")
        print(f"   - Tempo total: {((end_time - start_time) / 60):.2f} minutos")
        print(f"   - Total de pontos extra√≠dos e salvos: {total_points:,}")
        print(f"   - Resultados em: {output_file}")
        print("="*70)

    except Exception as e:
        print("\n" + "!"*70)
        print(f"üí• ERRO CR√çTICO DURANTE A EXECU√á√ÉO DO PIPELINE: {e}")
        print("!"*70)



# 3. Execute o pipeline
run_pipeline(config)

üöÄ ARQUITETURA DE PIPELINE DE DADOS EMIT (ETL)
‚úÖ Configura√ß√£o de amostragem estratificada dupla pronta.
   - Estratificando por: 'UniFitoMor' e 'mapbiomas_class'
   - Pontos por estrato combinado: 1
---

üöÄ ORQUESTRANDO EXECU√á√ÉO DO PIPELINE DE DADOS EMIT
\n--- [E] FASE DE EXTRA√á√ÉO INICIADA ---
‚úÖ GEE inicializado.
   - Carregando pontos de: projects/ee-vladimircambiental/assets/pontos_amostrados
   - Iniciando amostragem estratificada dupla por 'UniFitoMor' e 'mapbiomas_class'...
   - Encontrados 119 estratos √∫nicos combinados.
   - Amostragem definida. M√°ximo te√≥rico de 119 pontos a serem processados.
   - Carregando cole√ß√£o de imagens EMIT...
   - 21 imagens encontradas.
--- [E] FASE DE EXTRA√á√ÉO CONCLU√çDA ---

--- [L] FASE DE CARGA INICIADA (Salvamento Incremental) ---
   - Arquivo de sa√≠da: /content/drive/My Drive/GEE_Data_Pipelines/EMIT_Preconfigured_Filter/emit_extract_20250901_192931.csv

--- [T] FASE DE TRANSFORMA√á√ÉO INICIADA (Processamento em Lote) ---
 



     - ‚ùå Erro ao processar o lote: Computation timed out.. Pulando.
   - Processando lote 2/21...




KeyboardInterrupt: 