In [1]:
import boto3
import yaml
from src.logger import setup_logger
from src.minio_uploader import upload_to_minio

# Set up logger
pipeline_name = "pangenome"
target_table = "genome"
schema = "pangenome"

logger = setup_logger(
    pipeline_name=pipeline_name,
    target_table=target_table,
    schema=schema
)

logger.info("Notebook started")

from config.config_loader import ConfigLoader

# Path to config file in MinIO
config_path = "genome"

# Load config
loader = ConfigLoader(config_path, logger)
loader.load_and_validate()

# (Optional) Print something
print(loader.get_target_table())

# Start Spark
from src.spark_session import start_spark_session
spark = start_spark_session(logger=logger)

{"time": "2025-07-14 22:04:31,480", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "2161914337", "msg": "Notebook started"}
{"time": "2025-07-14 22:04:31,492", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "config_loader", "msg": "ConfigLoader initialized for target table: genome"}
{"time": "2025-07-14 22:04:31,492", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "config_loader", "msg": "Resolved config path: s3a://cdm-lake/config-json/genome.json"}
{"time": "2025-07-14 22:04:31,493", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "config_loader", "msg": "Loading config from MinIO: bucket=cdm-lake, key=config-json/genome.json"}
{"time": "2025-07-14 22:04:31,656", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "config_loader", "msg": "Config loaded succes

In [2]:

# Validate input files
from src.input_file_validator import validate_input_files
validate_input_files(loader, logger)


# Check schema headers
from src.validate_schema import validate_schema_against_file
validate_schema_against_file(loader, logger)


# Check TSV format
#from src.input_file_validator import validate_input_files
#validate_input_files(loader)

# Run validations
from src.run_validations import run_validations_from_config
run_validations_from_config(loader, logger)


# Referential integrity
from src.run_referential_integrity_checks import run_referential_integrity_checks
run_referential_integrity_checks(spark, loader, logger)


#import importlib
#import src.run_great_expectations_validations

#importlib.reload(src.run_great_expectations_validations)




{"time": "2025-07-14 22:04:37,841", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "config_loader", "msg": "Input files: [{'source': 'GTDB', 'file_path': 's3a://cdm-lake/bronze/gtdb/sp_clusters_r214.tsv', 'file_type': 'tsv', 'delimiter': '\t', 'ignore_first_line': 'no'}, {'source': 'NCBI', 'file_path': 's3a://cdm-lake/bronze/ncbi/assembly_summary_genbank.txt', 'file_type': 'tsv', 'delimiter': '\t', 'ignore_first_line': 'yes'}]"}
{"time": "2025-07-14 22:04:37,842", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "input_file_validator", "msg": "Starting input file validation..."}
Validating input files...

{"time": "2025-07-14 22:04:37,848", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "input_file_validator", "msg": "Checking file: s3a://cdm-lake/bronze/gtdb/sp_clusters_r214.tsv"}
🔍 Checking: s3a://cdm-lake/bronze/gtdb/sp_clusters_r214.tsv
{"time"

{"time": "2025-07-14 22:04:49,780", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "run_validations", "msg": "Delimiter: 	, Ignore first line: False"}
{"time": "2025-07-14 22:04:49,780", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "config_loader", "msg": "Validations: [{'column': 'genome_id', 'validation_type': 'not_null', 'error_message': 'Missing genome_id'}, {'column': 'genome_id', 'validation_type': 'regex_match', 'pattern': '^[A-Za-z0-9_.-]+$', 'error_message': 'Invalid genome_id format'}, {'column': 'gtdb_taxonomy_id', 'validation_type': 'not_null', 'error_message': 'Missing taxonomy ID'}, {'column': 'gtdb_species_clade_id', 'validation_type': 'not_null', 'error_message': 'Missing species clade ID'}]"}
{"time": "2025-07-14 22:04:49,781", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "run_validations", "msg": "Validation rules: [{'colu

{"time": "2025-07-14 22:05:12,215", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "ERROR", "module": "run_referential_integrity_checks", "msg": "Referential integrity violations written to Delta table: pangenome.genome_errors at s3a://cdm-lake/logs/errors/genome"}

🚨 All violations logged to: pangenome.genome_errors

❌ Some referential integrity checks failed.


False

In [3]:
# Great Expectations
from src.run_great_expectations_validations import run_great_expectations_validation
run_great_expectations_validation(spark, loader, logger)


# Upload log file to MinIO
upload_to_minio(logger.log_file_path)

{"time": "2025-07-14 22:06:56,905", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "config_loader", "msg": "Target table: pangenome.genome"}
{"time": "2025-07-14 22:06:56,906", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Starting Great Expectations validation for table: pangenome.genome with suite: default_suite"}
{"time": "2025-07-14 22:06:56,963", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Loaded Spark table: pangenome.genome"}
{"time": "2025-07-14 22:06:57,109", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Great Expectations context initialized."}
{"time": "2025-07-14 22:06:57,129", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "lev

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,120", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_values_to_not_be_null with args {'column': 'genome_id'} → result: True"}


Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,459", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_values_to_match_regex with args {'column': 'genome_id', 'regex': '^[A-Za-z0-9_.-]+$'} → result: True"}


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,668", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_values_to_not_be_null with args {'column': 'gtdb_species_clade_id'} → result: True"}


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,854", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_values_to_not_be_null with args {'column': 'gtdb_taxonomy_id'} → result: True"}


Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,865", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_to_exist with args {'column': 'ncbi_biosample_id'} → result: True"}


Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:58,877", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Applied GE expectation: expect_column_to_exist with args {'column': 'sample_id'} → result: False"}
{"time": "2025-07-14 22:06:58,905", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Expectations saved to suite."}


Calculating Metrics:   0%|          | 0/20 [00:00<?, ?it/s]

{"time": "2025-07-14 22:06:59,365", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Validation run completed."}
{"time": "2025-07-14 22:06:59,369", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "DEBUG", "module": "run_great_expectations_validations", "msg": "Validation summary: {'success': False, 'successful_expectations': 5, 'unsuccessful_expectations': 1, 'success_percent': 83.33333333333334}"}
{"time": "2025-07-14 22:07:01,393", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Data Docs built after validation."}


Calculating Metrics:   0%|          | 0/28 [00:00<?, ?it/s]

{"time": "2025-07-14 22:07:03,244", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Checkpoint executed."}
{"time": "2025-07-14 22:07:05,259", "pipeline": "pangenome", "schema": "pangenome", "table": "genome", "level": "INFO", "module": "run_great_expectations_validations", "msg": "Data Docs rebuilt with checkpoint results."}
✅ GE validation and checkpoint complete. Data Docs generated.
✅ Uploaded log to MinIO at: s3://cdm-lake/logs/pangenome/pipeline_run_20250714_220431.log


In [None]:
df = spark.read.format("delta").load("s3a://cdm-lake/logs/errors/genome")
df.show(truncate=False)


In [None]:
spark.sql("""
SELECT count(1) 
FROM delta.`s3a://cdm-lake/logs/errors/genome`
""").show(truncate=False)

In [None]:
spark.catalog.listTables("pangenome")

In [None]:
tables = spark.catalog.listTables("pangenome")

for table in tables:
    table_name = table.name
    print(f"\n📄 Table: {table_name}")
    try:
        location = spark.sql(f"DESCRIBE DETAIL pangenome.{table_name}").select("location").collect()[0]["location"]
        print(f"   📍 Location: {location}")
    except Exception as e:
        print(f"   ⚠️ Could not get location: {e}")
