In [1]:
import sys
from pathlib import Path
import pandas as pd
import time
from datetime import datetime

In [2]:
# =============================================================================
# PROJECT CONFIGURATION
# =============================================================================

# Setup project paths
project_root = Path.cwd().parent if Path.cwd().name == 'notebooks' else Path.cwd()
src_path = project_root / 'src'
sys.path.insert(0, str(src_path))

# Import Fair-Price modules
from config.settings import get_config
from utils.logger import get_extraction_logger, get_standardization_logger
from extraction.extractors import OpenDataSUSExtractor
from standardization.cleaners import clean_dataframe
from standardization.validators import validate_dataframe, get_validation_summary
from consolidation.consolidators import HealthDataConsolidator

print("FAIR-PRICE BRAZILIAN HEALTH DATA PIPELINE")
print("=" * 60)
print(f"Project root: {project_root}")
print("Status: Production pipeline initialized")

FAIR-PRICE BRAZILIAN HEALTH DATA PIPELINE
Project root: /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price
Status: Production pipeline initialized


In [3]:
# Initialize configuration and loggers
config = get_config()
extraction_logger = get_extraction_logger()
standardization_logger = get_standardization_logger()

print(f"\\nData directories configured:")
print(f"  Raw data: {config.raw_data_dir}")
print(f"  Processed data: {config.processed_data_dir}")
print(f"  Output data: {config.output_data_dir}")

# Ensure directories exist
config.processed_data_dir.mkdir(parents=True, exist_ok=True)
config.output_data_dir.mkdir(parents=True, exist_ok=True)

\nData directories configured:
  Raw data: /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price/data/raw
  Processed data: /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price/data/processed
  Output data: /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price/data/output


In [4]:
# =============================================================================
# PHASE 1: DATA EXTRACTION
# =============================================================================

print("\\n" + "=" * 60)
print("PHASE 1: DATA EXTRACTION FROM OPENDATASUS")
print("=" * 60)

extraction_start_time = time.time()

try:
    # Initialize and run extractor
    extractor = OpenDataSUSExtractor()
    extraction_report = extractor.extract_all_years()
    
    extraction_time = time.time() - extraction_start_time
    
    # ✅ Success logging
    print(f"✅ Extraction completed successfully ({extraction_time:.2f}s)")
    print(f"   Files extracted: {len(extraction_report)}")
    
    # List available files
    csv_files = list(config.raw_data_dir.glob("*.csv"))
    print(f"\\nExtracted files ({len(csv_files)}):")
    total_size_mb = 0
    for csv_file in sorted(csv_files):
        size_mb = csv_file.stat().st_size / (1024 * 1024)
        total_size_mb += size_mb
        print(f"  {csv_file.name}: {size_mb:.1f} MB")
    print(f"Total extracted data: {total_size_mb:.1f} MB")

except Exception as e:
    extraction_logger.error(f"Extraction failed: {str(e)}")
    print(f"❌ Extraction failed: {str(e)}")
    csv_files = list(config.raw_data_dir.glob("*.csv"))
    print(f"Using existing files: {len(csv_files)} CSV files found")
    extraction_time = time.time() - extraction_start_time


PHASE 1: DATA EXTRACTION FROM OPENDATASUS
2025-06-29 22:51:12 - fair_price.extraction - INFO - 🏥 OpenDataSUS Extractor initialized
2025-06-29 22:51:12 - fair_price.extraction - INFO - 📁 Output directory: /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price/data/raw
2025-06-29 22:51:12 - fair_price.extraction - INFO - 🎯 Target years: [2020, 2021, 2022, 2023, 2024]
2025-06-29 22:51:12 - fair_price.extraction - INFO - 🚀 Starting Complete multi-year extraction
2025-06-29 22:51:12 - fair_price.extraction - INFO - 🚀 Starting extraction for years: [2020, 2021, 2022, 2023, 2024]
2025-06-29 22:51:12 - fair_price.extraction - INFO - 🔍 Validating connection to OpenDataSUS...
2025-06-29 22:51:12 - fair_price.extraction - INFO - ✅ Connection to OpenDataSUS validated successfully
2025-06-29 22:51:12 - fair_price.extraction - INFO - 📋 Existing files status:
2025-06-29 22:51:12 - fair_price.extraction - INFO -    ❌ 2020: Not found
2025-06-29 22:51:12 - fair_price.extraction - INFO -    ❌ 2021: No

In [6]:
# =============================================================================
# PHASE 2: DATA STANDARDIZATION
# =============================================================================

print("\\n" + "=" * 60)
print("PHASE 2: DATA STANDARDIZATION")
print("=" * 60)

class ProductionStandardizationProcessor:
    """Production processor using modular Fair-Price functions."""
    
    def __init__(self, config, logger):
        self.config = config
        self.logger = logger
    
    def load_csv_with_encoding(self, file_path: Path) -> pd.DataFrame:
        """Load CSV with Brazilian encoding handling."""
        try:
            df = pd.read_csv(file_path, encoding='latin-1', sep=';')
            self.logger.info(f"Loaded {file_path.name}: {len(df):,} rows")
            return df
        except Exception:
            # Try fallback encodings
            for encoding, sep in [('utf-8', ';'), ('latin-1', ','), ('utf-8', ',')]:
                try:
                    df = pd.read_csv(file_path, encoding=encoding, sep=sep)
                    self.logger.info(f"Loaded {file_path.name} with {encoding}/{sep}")
                    return df
                except Exception:
                    continue
            raise Exception(f"Failed to load {file_path.name}")
    
    def process_single_file(self, input_path: Path, output_path: Path) -> dict:
        """Process one CSV file through complete standardization pipeline."""
        start_time = time.time()
        file_name = input_path.name
        
        self.logger.info(f"Processing {file_name}...")
        
        # Load raw data
        df_raw = self.load_csv_with_encoding(input_path)
        
        # Add year column if missing (extract from filename)
        year = int(input_path.stem)
        if 'ano' not in [col.lower() for col in df_raw.columns]:
            df_raw['ano'] = year
            self.logger.info(f"Added 'ano' column with value {year}")
        
        # Apply cleaning using Fair-Price cleaners module
        df_cleaned = clean_dataframe(df_raw, self.config)
        
        # Apply validation using Fair-Price validators module
        validation_rules = {'required_fields': ['ano', 'uf']}
        df_validated = validate_dataframe(df_cleaned, validation_rules)
        
        # Generate quality summary
        quality_summary = get_validation_summary(df_validated)
        
        # Save business data (remove validation columns)
        business_columns = [col for col in df_validated.columns 
                          if not col.startswith(('quality_', 'has_valid_'))]
        df_output = df_validated[business_columns]
        
        output_path.parent.mkdir(parents=True, exist_ok=True)
        df_output.to_csv(output_path, index=False, encoding='utf-8', sep=',')
        
        processing_time = time.time() - start_time
        
        return {
            'file_name': file_name,
            'processing_time': processing_time,
            'rows_input': len(df_raw),
            'rows_output': len(df_output),
            'columns_input': len(df_raw.columns),
            'columns_output': len(df_output.columns),
            'quality_score': quality_summary['avg_quality_score'],
            'high_quality_rows': quality_summary['high_quality_rows']
        }
        
# Initialize and run standardization
standardization_start_time = time.time()
processor = ProductionStandardizationProcessor(config, standardization_logger)

if not csv_files:
    print("No CSV files found for standardization")
    processing_reports = []
else:
    print(f"Standardizing {len(csv_files)} CSV files...")
    processing_reports = []
    
    for csv_file in sorted(csv_files):
        try:
            output_file = config.processed_data_dir / csv_file.name
            report = processor.process_single_file(csv_file, output_file)
            processing_reports.append(report)
            
            print(f"  ✅ {report['file_name']}: {report['quality_score']:.1f}% quality ({report['processing_time']:.2f}s)")
            
        except Exception as e:
            standardization_logger.error(f"Failed to process {csv_file.name}: {str(e)}")
            print(f"  ❌ {csv_file.name}: Failed - {str(e)}")

standardization_time = time.time() - standardization_start_time

# Standardization summary
if processing_reports:
    total_input_rows = sum(r['rows_input'] for r in processing_reports)
    total_output_rows = sum(r['rows_output'] for r in processing_reports)
    avg_quality_score = sum(r['quality_score'] for r in processing_reports) / len(processing_reports)
    total_high_quality = sum(r['high_quality_rows'] for r in processing_reports)
    
    print(f"\\n✅ Standardization completed successfully ({standardization_time:.2f}s)")
    print(f"   Files processed: {len(processing_reports)}/{len(csv_files)}")
    print(f"   Total records: {total_input_rows:,} → {total_output_rows:,}")
    print(f"   Data preservation: {(total_output_rows/total_input_rows)*100:.1f}%")
    print(f"   Average quality score: {avg_quality_score:.1f}%")
    print(f"   High quality rows: {total_high_quality:,}")
    
    # Show processed files
    processed_files = list(config.processed_data_dir.glob("*.csv"))
    print(f"\\nStandardized files ({len(processed_files)}):")
    for pfile in sorted(processed_files):
        size_mb = pfile.stat().st_size / (1024 * 1024)
        print(f"  {pfile.name}: {size_mb:.1f} MB")

else:
    print("❌ No files were successfully standardized")

PHASE 2: DATA STANDARDIZATION
Standardizing 5 CSV files...
2025-06-29 22:52:05 - fair_price.standardization - INFO - Processing 2020.csv...
2025-06-29 22:52:05 - fair_price.standardization - INFO - Loaded 2020.csv: 71,227 rows
  ✅ 2020.csv: 99.8% quality (3.36s)
2025-06-29 22:52:08 - fair_price.standardization - INFO - Processing 2021.csv...
2025-06-29 22:52:08 - fair_price.standardization - INFO - Loaded 2021.csv: 70,893 rows
  ✅ 2021.csv: 100.0% quality (4.60s)
2025-06-29 22:52:13 - fair_price.standardization - INFO - Processing 2022.csv...
2025-06-29 22:52:13 - fair_price.standardization - INFO - Loaded 2022.csv: 69,028 rows
  ✅ 2022.csv: 99.6% quality (2.85s)
2025-06-29 22:52:15 - fair_price.standardization - INFO - Processing 2023.csv...
2025-06-29 22:52:16 - fair_price.standardization - INFO - Loaded 2023.csv: 37,522 rows
2025-06-29 22:52:16 - fair_price.standardization - INFO - Added 'ano' column with value 2023
  ✅ 2023.csv: 98.9% quality (2.19s)
2025-06-29 22:52:18 - fair_pric

In [7]:
# =============================================================================
# PHASE 3: DATA CONSOLIDATION
# =============================================================================

print("\\n" + "=" * 60)
print("PHASE 3: DATA CONSOLIDATION")
print("=" * 60)

consolidation_start_time = time.time()

# Check for standardized files
standardized_files = list(config.processed_data_dir.glob("*.csv"))

if not standardized_files:
    print("❌ No standardized files found for consolidation")
    consolidation_report = None
else:
    print(f"Consolidating {len(standardized_files)} standardized files...")
    
    # Show input files
    total_input_size = 0
    for sfile in sorted(standardized_files):
        size_mb = sfile.stat().st_size / (1024 * 1024)
        total_input_size += size_mb
        print(f"  {sfile.name}: {size_mb:.1f} MB")
    print(f"Total input data: {total_input_size:.1f} MB")
    
    try:
        # Initialize and run consolidator
        consolidator = HealthDataConsolidator(config, standardization_logger)
        consolidation_report = consolidator.consolidate_all_data(
            input_dir=config.processed_data_dir,
            output_dir=config.output_data_dir,
            duplicate_strategy="keep_latest"
        )
        
        consolidation_time = time.time() - consolidation_start_time
        
        print(f"\\n✅ Consolidation completed successfully ({consolidation_time:.2f}s)")
        
    except Exception as e:
        standardization_logger.error(f"Consolidation failed: {str(e)}")
        print(f"❌ Consolidation failed: {str(e)}")
        consolidation_report = None
        consolidation_time = time.time() - consolidation_start_time

PHASE 3: DATA CONSOLIDATION
Consolidating 5 standardized files...
  2020.csv: 22.8 MB
  2021.csv: 25.2 MB
  2022.csv: 22.3 MB
  2023.csv: 12.2 MB
  2024.csv: 7.7 MB
Total input data: 90.2 MB
2025-06-29 22:52:47 - fair_price.standardization - INFO - 🚀 Starting complete data consolidation process...
2025-06-29 22:52:47 - fair_price.standardization - INFO - 📂 Loading standardized files from /home/victor-jose/Documents/projetos/DGU/DGU45/fair-price/data/processed
2025-06-29 22:52:47 - fair_price.standardization - INFO -    ✅ 2020: 71,227 rows, 20 columns
2025-06-29 22:52:47 - fair_price.standardization - INFO -    ✅ 2021: 70,893 rows, 20 columns
2025-06-29 22:52:47 - fair_price.standardization - INFO -    ✅ 2022: 69,028 rows, 20 columns
2025-06-29 22:52:48 - fair_price.standardization - INFO -    ✅ 2023: 37,522 rows, 20 columns
2025-06-29 22:52:48 - fair_price.standardization - INFO -    ✅ 2024: 24,635 rows, 20 columns
2025-06-29 22:52:48 - fair_price.standardization - INFO - 📊 Total loade

  combined_df = pd.concat(combined_data, ignore_index=True)


2025-06-29 22:52:48 - fair_price.standardization - INFO -    📊 Total duplicates: 28,213 (10.3%)
2025-06-29 22:52:48 - fair_price.standardization - INFO -    🔀 Cross-year duplicates: 0
2025-06-29 22:52:48 - fair_price.standardization - INFO -    📅 Same-year duplicates: 0
2025-06-29 22:52:48 - fair_price.standardization - INFO - 🔧 Consolidating data with 'keep_latest' duplicate strategy...


  consolidated_df = pd.concat(all_dataframes, ignore_index=True)


2025-06-29 22:52:49 - fair_price.standardization - INFO -    📊 Consolidation complete:
2025-06-29 22:52:49 - fair_price.standardization - INFO -       Input records: 273,305
2025-06-29 22:52:49 - fair_price.standardization - INFO -       Output records: 250,789
2025-06-29 22:52:49 - fair_price.standardization - INFO -       Reduction: 8.2%
2025-06-29 22:52:49 - fair_price.standardization - INFO - ✅ Validating consolidated dataset...
2025-06-29 22:52:49 - fair_price.standardization - INFO -    📊 Final dataset: 250,789 rows × 21 columns
2025-06-29 22:52:49 - fair_price.standardization - INFO -    💾 Memory usage: 257.4 MB
2025-06-29 22:52:49 - fair_price.standardization - INFO -    ✅ Overall completeness: 78.0%
2025-06-29 22:52:49 - fair_price.standardization - INFO -    📅 Year distribution:
2025-06-29 22:52:49 - fair_price.standardization - INFO -       2020: 71,216 records
2025-06-29 22:52:49 - fair_price.standardization - INFO -       2021: 70,814 records
2025-06-29 22:52:49 - fair_pri

In [8]:
# =============================================================================
# PIPELINE RESULTS SUMMARY
# =============================================================================

print("\\n" + "=" * 60)
print("PIPELINE EXECUTION SUMMARY")
print("=" * 60)

# Calculate total pipeline time
total_pipeline_time = extraction_time + standardization_time + (consolidation_time if 'consolidation_time' in locals() else 0)

print(f"Execution times:")
print(f"  Phase 1 (Extraction): {extraction_time:.2f}s")
print(f"  Phase 2 (Standardization): {standardization_time:.2f}s")
if 'consolidation_time' in locals():
    print(f"  Phase 3 (Consolidation): {consolidation_time:.2f}s")
print(f"  Total pipeline time: {total_pipeline_time:.2f}s")

# Phase-by-phase status
print(f"\\nPhase results:")
print(f"  ✅ Extraction: {len(csv_files) if csv_files else 0} files ({total_size_mb:.1f} MB)")

if processing_reports:
    print(f"  ✅ Standardization: {total_output_rows:,} records ({avg_quality_score:.1f}% quality)")
else:
    print(f"  ❌ Standardization: Failed")

if consolidation_report:
    summary = consolidation_report['consolidation_summary']
    final_records = summary['output_records']
    quality_score = summary['data_quality_score']
    print(f"  ✅ Consolidation: {final_records:,} final records ({quality_score:.1f}% quality)")
else:
    print(f"  ❌ Consolidation: Failed or skipped")

# Final output files
if consolidation_report and 'saved_files' in consolidation_report:
    print(f"\\nFinal output files:")
    for format_name, file_path in consolidation_report['saved_files'].items():
        file_path_obj = Path(file_path)
        if file_path_obj.exists():
            size_mb = file_path_obj.stat().st_size / (1024 * 1024)
            print(f"  {format_name.upper()}: {file_path_obj.name} ({size_mb:.1f} MB)")

PIPELINE EXECUTION SUMMARY
Execution times:
  Phase 1 (Extraction): 7.02s
  Phase 2 (Standardization): 14.48s
  Phase 3 (Consolidation): 3.45s
  Total pipeline time: 24.96s
\nPhase results:
  ✅ Extraction: 5 files (142.2 MB)
  ✅ Standardization: 273,305 records (99.2% quality)
  ✅ Consolidation: 250,789 final records (78.0% quality)
\nFinal output files:
  CSV: consolidated_health_data_20250629_225249.csv (73.5 MB)


In [9]:
# =============================================================================
# DATA QUALITY ASSESSMENT
# =============================================================================

if consolidation_report:
    print("\\n" + "=" * 60)
    print("DATA QUALITY ASSESSMENT")
    print("=" * 60)
    
    final_validation = consolidation_report['final_validation']
    duplicate_detection = consolidation_report['duplicate_detection']
    
    print(f"Dataset characteristics:")
    print(f"  Total records: {final_validation['total_rows']:,}")
    print(f"  Total columns: {final_validation['total_columns']}")
    print(f"  Memory usage: {final_validation['memory_usage_mb']:.1f} MB")
    print(f"  Overall completeness: {final_validation['overall_completeness']:.1f}%")
    print(f"  Duplicate reduction: {duplicate_detection['duplicate_percentage']:.1f}%")
    
    # Year distribution
    if 'year_distribution' in final_validation:
        print(f"\\nYear distribution:")
        year_dist = final_validation['year_distribution']
        total_records = sum(year_dist.values())
        for year in sorted(year_dist.keys()):
            count = year_dist[year]
            percentage = (count / total_records) * 100
            print(f"  {year}: {count:,} records ({percentage:.1f}%)")
    
    # Column completeness analysis
    completeness = final_validation['completeness_by_column']
    
    # Most complete columns
    print(f"\\nMost complete columns (top 5):")
    top_complete = sorted(completeness.items(), key=lambda x: x[1], reverse=True)[:5]
    for col, comp in top_complete:
        print(f"  {col}: {comp:.1f}%")
    
    # Columns needing attention
    incomplete_columns = [(col, comp) for col, comp in completeness.items() if comp < 90]
    if incomplete_columns:
        print(f"\\nColumns with <90% completeness ({len(incomplete_columns)}):")
        bottom_complete = sorted(incomplete_columns, key=lambda x: x[1])[:5]
        for col, comp in bottom_complete:
            print(f"  {col}: {comp:.1f}%")

DATA QUALITY ASSESSMENT
Dataset characteristics:
  Total records: 250,789
  Total columns: 21
  Memory usage: 257.4 MB
  Overall completeness: 78.0%
  Duplicate reduction: 10.3%
\nYear distribution:
  2020: 71,216 records (28.4%)
  2021: 70,814 records (28.2%)
  2022: 68,965 records (27.5%)
  2023: 27,815 records (11.1%)
  2024: 11,979 records (4.8%)
\nMost complete columns (top 5):
  ano: 100.0%
  codigo_br: 100.0%
  descricao_catmat: 100.0%
  generico: 100.0%
  compra: 100.0%
\nColumns with <90% completeness (7):
  modalidade_compra: 0.0%
  unidade_fornecimento: 15.8%
  preco_total: 15.9%
  cnpj_fabricante: 43.6%
  cnpj_instituicao: 44.1%
