# KBDatalakeApps Pipeline Step-by-Step Testing

This notebook is a **test harness** for `KBDataLakeUtils` from
`lib/KBDatalakeApps/KBDatalakeUtils.py`. It mirrors how the KBase SDK impl file
works: the notebook environment reads token/config from files, then creates
a separate `KBDataLakeUtils` instance with those credentials injected.

**Architecture:**
- `util.py` (NotebookUtil) = test harness that reads token/config from files
- `KBDataLakeUtils` = the code under test, receives token/config as arguments
- Each cell creates a fresh `KBDataLakeUtils` instance (state lives on filesystem)

**Pipeline steps tested:**
1. Process input arguments into user genome table
2. Download genome assemblies
3. Download genome genes & annotations
4. Run SKANI analysis
5. Annotate genomes with RAST
6. Build metabolic models (single + parallel)
7. Run phenotype simulations
8. Build SQLite database
9. Save annotated genomes to KBase
10. Save models to KBase
11. Generate KBase report

## Step 0: Setup & Configuration

Configure the pipeline parameters and save them to datacache. Every subsequent
cell reloads this config and creates a fresh `KBDataLakeUtils` instance.

The notebook's `util` reads token/config from your standard files
(`~/.tokens`, `~/.kbase/token`, `~/.kbutillib/config.yaml`). The
`create_pipeline_utils()` helper then injects those into `KBDataLakeUtils`
while blocking it from reading files itself.

- **Edit**: `workspace_name`, `input_refs`, and optionally `pipeline_dir`
- **Output**: `pipeline_config` saved to datacache

In [None]:
%run util.py
import os

# ---- EDIT THESE PARAMETERS FOR YOUR TEST ----
workspace_name = 'chenry:narrative_1234567890'  # Your KBase workspace
input_refs = [
    # Add genome or genome set references here, e.g.:
    # '12345/6/7',
    # '12345/8/1',
]
worker_count = 4
pipeline_dir = os.path.join(util.data_dir, 'pipeline_test')
# ---- END PARAMETERS ----

parameters = {
    'input_refs': input_refs,
    'workspace_name': workspace_name,
    'suffix': '.datalake',
}

# Verify the notebook util has a token loaded
token = util.get_token('kbase')
print(f'KBase token loaded: {"yes" if token else "NO - check ~/.kbase/token or ~/.tokens"}')
print(f'Config keys: {list(util._config_hash.keys()) if util._config_hash else "none"}')

# Save configuration so other cells can reload it
util.save('pipeline_config', {
    'workspace_name': workspace_name,
    'input_refs': input_refs,
    'parameters': parameters,
    'pipeline_dir': pipeline_dir,
    'worker_count': worker_count,
})

# Quick smoke test: create a pipeline utils instance
os.makedirs(pipeline_dir, exist_ok=True)
pipeline = util.create_pipeline_utils(
    directory=pipeline_dir,
    workspace_name=workspace_name,
    parameters=parameters,
    worker_count=worker_count,
)
print(f'Pipeline directory: {pipeline.directory}')
print(f'Pipeline workspace: {pipeline.workspace_name}')
print(f'Pipeline token loaded: {"yes" if pipeline.get_token("kbase") else "NO"}')
print(f'\nSetup complete. Ready to test pipeline steps.')

## Step 1: Process Arguments into User Genome Table

Tests `pipeline_process_arguments_into_user_genome_table()`.

Translates the input reference list (genomes or genome sets) into a structured
metadata table saved as `user_genomes.tsv`.

- **Input**: `input_refs` from KBase workspace
- **Output**: `<pipeline_dir>/user_genomes.tsv`
- **Columns**: genome_id, species_name, taxonomy, genome_ref, assembly_ref,
  genome_type, genome_source_id, genome_source_name, num_contigs, num_proteins,
  num_noncoding_genes

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_process_arguments_into_user_genome_table()

# Inspect the output
output_path = os.path.join(config['pipeline_dir'], 'user_genomes.tsv')
if os.path.exists(output_path):
    df = pd.read_csv(output_path, sep='\t')
    print(f'\nGenomes table: {len(df)} rows, {len(df.columns)} columns')
    display(df)
else:
    print(f'Output file not created: {output_path}')

## Step 2: Download Genome Assemblies

Tests `pipeline_download_user_genome_assmemblies()`.

Downloads FASTA assembly files for all genomes listed in `user_genomes.tsv`.

- **Input**: `user_genomes.tsv` (assembly_ref column)
- **Output**: FASTA files in `<pipeline_dir>/assemblies/`
- **Requires**: KBase workspace access

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_download_user_genome_assmemblies()

# Inspect output
assemblies_dir = os.path.join(config['pipeline_dir'], 'assemblies')
if os.path.exists(assemblies_dir):
    files = os.listdir(assemblies_dir)
    print(f'\nAssembly files ({len(files)}):')
    for f in sorted(files):
        size_kb = os.path.getsize(os.path.join(assemblies_dir, f)) / 1024
        print(f'  {f}: {size_kb:.1f} KB')
else:
    print('No assemblies directory created')

## Step 3: Download Genome Genes & Annotations

Tests `pipeline_download_user_genome_genes()`.

Downloads genes, features, and existing annotations for each genome into
per-genome TSV files.

- **Input**: `user_genomes.tsv` (genome_ref column)
- **Output**: `<pipeline_dir>/genomes/<genome_id>.tsv` per genome
- **Columns**: gene_id, aliases, contig, start, end, strand, type, functions,
  protein_translation, dna_sequence, ontology_terms

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_download_user_genome_genes()

# Inspect output
genomes_dir = os.path.join(config['pipeline_dir'], 'genomes')
if os.path.exists(genomes_dir):
    files = [f for f in os.listdir(genomes_dir) if f.endswith('.tsv')]
    print(f'\nGenome gene files ({len(files)}):')
    for f in sorted(files):
        df = pd.read_csv(os.path.join(genomes_dir, f), sep='\t')
        print(f'  {f}: {len(df)} features')
    # Show sample from first genome
    if files:
        sample = pd.read_csv(os.path.join(genomes_dir, files[0]), sep='\t')
        print(f'\nSample from {files[0]}:')
        display(sample.head())
else:
    print('No genomes directory created')

## Step 4: Run SKANI Analysis

Tests `pipeline_run_skani_analysis()`.

Runs SKANI (fast genomic distance estimation) against three sketch databases:
pangenome, fitness, and phenotype.

- **Input**: FASTA files in `<pipeline_dir>/assemblies/`
- **Output**: TSV files in `<pipeline_dir>/skani/` (one per database)
- **Columns**: genome_id, reference_genome, ani_percentage
- **Requires**: SKANI sketch databases configured in kbutillib config

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_run_skani_analysis()

# Inspect output
skani_dir = os.path.join(config['pipeline_dir'], 'skani')
if os.path.exists(skani_dir):
    files = [f for f in os.listdir(skani_dir) if f.endswith('.tsv')]
    print(f'\nSKANI result files ({len(files)}):')
    for f in sorted(files):
        df = pd.read_csv(os.path.join(skani_dir, f), sep='\t')
        print(f'  {f}: {len(df)} hits')
        display(df.head())
else:
    print('No skani directory created')

## Step 5: Annotate Genomes with RAST

Tests `pipeline_annotate_user_genome_with_rast()`.

Submits protein sequences from each genome to RAST for functional annotation.
Translates RAST functions to SSO (Subsystem Ontology) terms and populates the
`Annotation:SSO` column in each genome TSV file. Skips genomes that already
have `Annotation:SSO` data (e.g., from the original KBase genome object).

- **Input**: Genome TSV files in `<pipeline_dir>/genomes/`
- **Output**: Updated genome TSV files with `Annotation:SSO` column
- **Format**: `SSO:nnnnn:description|rxn1,rxn2` entries separated by `;`
- **Requires**: RAST SDK service access

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    parameters=config['parameters'],
    kb_version=config.get('kb_version', 'dev'),
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_annotate_user_genome_with_rast()

# Inspect output - check that Annotation:SSO column was added/populated
genomes_dir = os.path.join(config['pipeline_dir'], 'genomes')
if os.path.exists(genomes_dir):
    files = [f for f in os.listdir(genomes_dir) if f.endswith('.tsv')]
    for f in sorted(files):
        df = pd.read_csv(os.path.join(genomes_dir, f), sep='\t')
        has_sso = 'Annotation:SSO' in df.columns
        annotated = df['Annotation:SSO'].fillna('').astype(str).str.strip().ne('').sum() if has_sso else 0
        print(f'{f}: {len(df)} features, Annotation:SSO={has_sso}, annotated={annotated}')
    # Show sample
    if files:
        sample = pd.read_csv(os.path.join(genomes_dir, files[0]), sep='\t')
        if 'Annotation:SSO' in sample.columns:
            print(f'\nSample annotations from {files[0]}:')
            display(sample[['gene_id', 'functions', 'Annotation:SSO']].head(10))

## Step 6a: Build Single Metabolic Model (Debug Mode)

Tests model building for **one** genome in the current process (no parallelism).
This calls the same core logic as `pipeline_run_moddeling_analysis()` but is
easier to debug. We manually replicate the worker logic here to allow
step-through inspection.

- **Input**: Genome TSV with `Annotation:SSO` column
- **Output**: COBRA JSON model in `<pipeline_dir>/models/<genome_id>_model.json`
- **Note**: Edit `test_genome_id` to pick which genome to test

In [None]:
%run util.py
import os
import cobra
from modelseedpy.core.msgenome import MSGenome, MSFeature
from modelseedpy import MSModelUtil

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    parameters=config['parameters'],
    kb_version=config.get('kb_version', 'dev'),
    worker_count=config['worker_count'],
)

# Pick a genome to test
genomes_file = os.path.join(config['pipeline_dir'], 'user_genomes.tsv')
user_genomes = pd.read_csv(genomes_file, sep='\t')
test_genome_id = user_genomes.iloc[0]['genome_id']  # change index to pick another
print(f'Building model for: {test_genome_id}')
print('=' * 60)

# Load features from genome TSV
genome_tsv = os.path.join(config['pipeline_dir'], 'genomes', f'{test_genome_id}.tsv')
gene_df = pd.read_csv(genome_tsv, sep='\t')

safe_id = test_genome_id.replace('.', '_')
models_dir = os.path.join(config['pipeline_dir'], 'models')
os.makedirs(models_dir, exist_ok=True)

# Create MSGenome from features
genome = MSGenome()
genome.id = safe_id
genome.scientific_name = test_genome_id

ms_features = []
for _, gene in gene_df.iterrows():
    protein = gene.get('protein_translation', '')
    gene_id = gene.get('gene_id', '')
    if pd.notna(protein) and protein:
        feature = MSFeature(gene_id, str(protein))
        # Parse Annotation:SSO column
        # Format: SSO:nnnnn:description|rxn1,rxn2;SSO:mmmmm:desc2|rxn3
        sso_col = gene.get('Annotation:SSO', '')
        if pd.notna(sso_col) and sso_col:
            for entry in str(sso_col).split(';'):
                entry = entry.strip()
                if not entry:
                    continue
                term_part = entry.split('|')[0]
                parts = term_part.split(':')
                if len(parts) >= 2 and parts[0] == 'SSO':
                    sso_id = parts[0] + ':' + parts[1]
                    feature.add_ontology_term('SSO', sso_id)
                    # Extract description for classifier
                    if len(parts) >= 3:
                        description = ':'.join(parts[2:])
                        if description:
                            feature.add_ontology_term('RAST', description)
        ms_features.append(feature)

genome.add_features(ms_features)
print(f'MSGenome: {len(ms_features)} protein features')

# Build the model using pipeline's reconstruction utils
genome_classifier = pipeline.get_classifier()
build_output, mdlutl = pipeline.build_metabolic_model(
    genome=genome,
    genome_classifier=genome_classifier,
    model_id=safe_id,
    model_name=test_genome_id,
    gs_template='auto',
    atp_safe=True,
    load_default_medias=True,
    max_gapfilling=10,
    gapfilling_delta=0,
)

if mdlutl is not None:
    model = mdlutl.model
    print(f'\nModel built: {model.id}')
    print(f'  Reactions: {len(model.reactions)}')
    print(f'  Metabolites: {len(model.metabolites)}')
    print(f'  Genes: {len(model.genes)}')
    print(f'  Class: {build_output.get("Class", "N/A")}')
    print(f'  Core GF: {build_output.get("Core GF", 0)}')

    # Gapfill on Carbon-Pyruvic-Acid
    gapfill_media = pipeline.get_media('KBaseMedia/Carbon-Pyruvic-Acid')
    gf_output, _, _, _ = pipeline.gapfill_metabolic_model(
        mdlutl=mdlutl,
        genome=genome,
        media_objs=[gapfill_media],
        templates=[model.template],
        atp_safe=True,
        objective='bio1',
        minimum_objective=0.01,
        gapfilling_mode='Sequential',
    )
    print(f'  GS GF: {gf_output.get("GS GF", 0)}')
    print(f'  Growth: {gf_output.get("Growth", "N/A")}')

    # Save model
    model_path = os.path.join(models_dir, f'{safe_id}_model.json')
    cobra.io.save_json_model(model, model_path)
    print(f'  Saved: {model_path}')
else:
    print(f'\nModel build returned None: {build_output}')

## Step 6b: Build All Metabolic Models (Parallel)

Tests `pipeline_run_moddeling_analysis()`.

Builds metabolic models for **all** genomes using ProcessPoolExecutor,
matching the production pipeline behavior. Uses `Annotation:SSO` column
from genome TSV files as the source of functional annotations.

- **Input**: All genome TSV files in `<pipeline_dir>/genomes/` with `Annotation:SSO`
- **Output**: COBRA JSON models in `<pipeline_dir>/models/`
- **Uses**: `worker_count` parallel processes

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_run_moddeling_analysis()

# Inspect output
import cobra
models_dir = os.path.join(config['pipeline_dir'], 'models')
if os.path.exists(models_dir):
    model_files = [f for f in os.listdir(models_dir) if f.endswith('_model.json')]
    print(f'\nModels built ({len(model_files)}):')
    for mf in sorted(model_files):
        model = cobra.io.load_json_model(os.path.join(models_dir, mf))
        print(f'  {mf}: {len(model.reactions)} rxns, {len(model.metabolites)} mets, {len(model.genes)} genes')
else:
    print('No models directory created')

## Step 7: Run Phenotype Simulations

Tests `pipeline_run_phenotype_simulations()`.

Runs phenotype simulations for all built models using ProcessPoolExecutor.
Also builds summary tables from the individual simulation results.

- **Input**: COBRA JSON models in `<pipeline_dir>/models/`
- **Output**: Per-model JSON in `<pipeline_dir>/phenotypes/` and summary TSV tables

In [None]:
%run util.py
import os

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_run_phenotype_simulations()

# Inspect output
phenotypes_dir = os.path.join(config['pipeline_dir'], 'phenotypes')
if os.path.exists(phenotypes_dir):
    files = os.listdir(phenotypes_dir)
    json_count = len([f for f in files if f.endswith('.json')])
    tsv_count = len([f for f in files if f.endswith('.tsv')])
    print(f'\nPhenotype results: {json_count} simulation JSONs, {tsv_count} summary TSVs')
    for f in sorted(files):
        if f.endswith('.tsv'):
            df = pd.read_csv(os.path.join(phenotypes_dir, f), sep='\t')
            print(f'\n  {f}: {len(df)} rows')
            display(df.head())
else:
    print('No phenotypes directory created')

## Step 8: Build SQLite Database

Tests `pipeline_build_sqllite_db()`.

Compiles all output data into a single SQLite database.

- **Input**: All TSV output files from previous pipeline steps
- **Output**: `<pipeline_dir>/berdl_tables.db` with tables:
  genome, genome_ani, genome_features, genome_accuracy,
  genome_gene_phenotype_reactions, genome_phenotype_gaps, gapfilled_reactions

In [None]:
%run util.py
import os
import sqlite3

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_build_sqllite_db()

# Inspect the database
db_path = os.path.join(config['pipeline_dir'], 'berdl_tables.db')
if os.path.exists(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = [row[0] for row in cursor.fetchall()]
    print(f'\nSQLite database: {db_path}')
    print(f'Tables ({len(tables)}):')
    for table in tables:
        cursor.execute(f'SELECT COUNT(*) FROM [{table}]')
        count = cursor.fetchone()[0]
        print(f'  {table}: {count} rows')
        sample = pd.read_sql_query(f'SELECT * FROM [{table}] LIMIT 3', conn)
        display(sample)
    conn.close()
else:
    print(f'Database not found: {db_path}')

## Step 9: Save Annotated Genomes to KBase

Tests `pipeline_save_annotated_genomes()`.

Saves RAST-annotated genomes back to the KBase workspace and creates a GenomeSet.

- **Input**: Genome TSV files with `Annotation:SSO`, user_genomes.tsv for refs
- **Output**: New genome objects + GenomeSet in KBase workspace
- **Warning**: This writes to the KBase workspace - use a test workspace

In [None]:
%run util.py

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_save_annotated_genomes()

## Step 10: Save Models to KBase

Tests `pipeline_save_models_to_kbase()`.

Saves built COBRA metabolic models to the KBase workspace.

- **Input**: COBRA JSON models in `<pipeline_dir>/models/`, user_genomes.tsv
- **Output**: FBAModel objects in KBase workspace
- **Warning**: This writes to the KBase workspace - use a test workspace

In [None]:
%run util.py

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_save_models_to_kbase()

## Step 11: Generate KBase Report

Tests `pipeline_save_kbase_report()`.

Generates an HTML report summarizing pipeline results and saves it to KBase.

- **Input**: All pipeline output data (genomes, models, SQLite DB)
- **Output**: KBase report object with HTML viewer and downloadable SQLite DB
- **Warning**: This writes to the KBase workspace - use a test workspace

In [None]:
%run util.py

config = util.load('pipeline_config')
pipeline = util.create_pipeline_utils(
    directory=config['pipeline_dir'],
    workspace_name=config['workspace_name'],
    parameters=config['parameters'],
    worker_count=config['worker_count'],
)

# Run the actual pipeline method
pipeline.pipeline_save_kbase_report()

if hasattr(pipeline, 'report_name'):
    print(f'\nReport: {pipeline.report_name} ({pipeline.report_ref})')

## Inspection: Review All Pipeline Outputs

Comprehensive view of all outputs across all pipeline steps.
No `KBDataLakeUtils` needed - just reads the filesystem.

In [None]:
%run util.py
import os
import sqlite3

config = util.load('pipeline_config')
pipeline_dir = config['pipeline_dir']

print('=' * 60)
print('PIPELINE OUTPUT SUMMARY')
print('=' * 60)

dirs_to_check = [
    ('user_genomes.tsv', 'User Genomes Table'),
    ('assemblies', 'Assemblies'),
    ('genomes', 'Genome Gene Tables'),
    ('skani', 'SKANI Results'),
    ('models', 'Metabolic Models'),
    ('phenotypes', 'Phenotype Simulations'),
    ('berdl_tables.db', 'SQLite Database'),
]

for item, label in dirs_to_check:
    full_path = os.path.join(pipeline_dir, item)
    if os.path.isfile(full_path):
        size_kb = os.path.getsize(full_path) / 1024
        print(f'\n{label}: {full_path} ({size_kb:.1f} KB)')
        if full_path.endswith('.tsv'):
            df = pd.read_csv(full_path, sep='\t')
            print(f'  Rows: {len(df)}, Columns: {list(df.columns)}')
    elif os.path.isdir(full_path):
        files = os.listdir(full_path)
        print(f'\n{label}: {full_path} ({len(files)} files)')
        for f in sorted(files):
            fp = os.path.join(full_path, f)
            size_kb = os.path.getsize(fp) / 1024
            if f.endswith('.tsv'):
                row_count = len(pd.read_csv(fp, sep='\t'))
                print(f'  {f}: {row_count} rows ({size_kb:.1f} KB)')
            else:
                print(f'  {f}: {size_kb:.1f} KB')
    else:
        print(f'\n{label}: NOT FOUND')

# SQLite summary
db_path = os.path.join(pipeline_dir, 'berdl_tables.db')
if os.path.exists(db_path):
    print(f'\n{"=" * 60}')
    print('SQLite Database Tables:')
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    for row in cursor.fetchall():
        cursor.execute(f'SELECT COUNT(*) FROM [{row[0]}]')
        print(f'  {row[0]}: {cursor.fetchone()[0]} rows')
    conn.close()