# Pipelines Tutorial

In [21]:
# Create pipeline
from pathlib import Path
from beak.remote import Pipeline

# line magic to auto-reload modules
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [7]:
pipe = Pipeline(
    host="shr-zion.stanford.edu",
    user="mbolivas",
    key_path="~/.ssh/shr-zion",
)

In [None]:
test_fasta = Path("hAcyP2_expansive_query.fasta")
test_fasta.write_text(""">hAcyP2
MSTAQSLKSVDYEVFGRVQGVCFRMYTEDEARKIGVVGWVKNTSKGTVTGQVQGPEDKVNSMKSWLSKVGSPSSRIDRTNFSNEKTISKLEYSNFSIRY
""")

pipe.search("hAcyP2_expansive_query.fasta", database="uniprotkb", threads=8) \
    .taxonomy(database="uniprotkb") \
    .align(threads=4)

print(pipe)

Pipeline:
  Input: acyp_test.fasta
  Steps (3):
    1. search (database=uniref90, threads=8)
    2. taxonomy (database=uniref90)
    3. align (threads=4)


In [9]:
# Execute
job_id = pipe.execute(job_name="acyp2_analysis")

Created remote directory: /home/mbolivas/beak_jobs/5639d466
Uploading input file...
GENERATED SCRIPT:
#!/bin/bash
set -e

# Pipeline execution script
echo "Pipeline started: $(date)" > /home/mbolivas/beak_jobs/5639d466/status.txt
echo 'RUNNING' >> /home/mbolivas/beak_jobs/5639d466/status.txt

# Initialize context
declare -A CONTEXT

# Step 1: search
mkdir -p /home/mbolivas/beak_jobs/5639d466/01_search
mmseqs easy-search \
  /home/mbolivas/beak_jobs/5639d466/input.fasta \
  /srv/protein_sequence_databases/UniRef90 \
  /home/mbolivas/beak_jobs/5639d466/01_search/results.m8 \
  /home/mbolivas/beak_jobs/5639d466/01_search/tmp \
  --threads 8

# Extract hit sequences
cut -f2 /home/mbolivas/beak_jobs/5639d466/01_search/results.m8 | sort -u > /home/mbolivas/beak_jobs/5639d466/01_search/acc_list.txt
CONTEXT[hit_count]=$(wc -l < /home/mbolivas/beak_jobs/5639d466/01_search/acc_list.txt)
echo "Found ${CONTEXT[hit_count]} hits"
if [ -f /srv/protein_sequence_databases/UniRef90.lookup ]; then
  grep

In [10]:
# Monitor
pipe.status(job_id)

{'job_id': '5639d466',
 'name': 'acyp2_analysis',
 'status': 'RUNNING',
 'runtime': '0:00:06',
 'job_type': 'pipeline'}

In [13]:
# Pretty version
pipe.print_detailed_status(job_id)


Pipeline: acyp2_analysis (5639d466)
Status: RUNNING | Runtime: 0:02:50
  ⟳ Step 1: search (database=uniref90, threads=8) [RUNNING]
  ○ Step 2: taxonomy (database=uniref90) [PENDING]
  ○ Step 3: align (threads=4) [PENDING]



In [None]:
# Get results from specific step
results = pipe.get_step_results(job_id, step_number=1)

In [10]:
pipe.cleanup(job_id)

✓ Cleaned up job ada0e546


In [20]:
from beak.remote import MMseqsTaxonomy

# Initialize
tax = MMseqsTaxonomy(
    host="shr-zion.stanford.edu",
    user="mbolivas",
    key_path="~/.ssh/shr-zion",
)

# List databases with taxonomy support
tax.list_databases()

Unnamed: 0,alias,database_name,path,exists,has_taxonomy,size
4,swissprot,swissprot,/srv/protein_sequence_databases/swissprot,True,True,200M
5,trembl,TrEMBL,/srv/protein_sequence_databases/TrEMBL,True,True,85G
3,uniprotkb,UniProtKB,/srv/protein_sequence_databases/UniProtKB,True,True,85G
1,uniref100,UniRef100,/srv/protein_sequence_databases/UniRef100,True,False,66G
2,uniref50,uniref50,/srv/protein_sequence_databases/uniref50,True,True,19G
0,uniref90,UniRef90,/srv/protein_sequence_databases/UniRef90,True,True,67G


In [None]:
# Use the same test file from before
job_id = tax.submit(
    query_file="acyp_test.fasta",
    job_name="test_taxonomy",
    tax_lineage=True
)

# Check status
tax.status(job_id)

Created remote directory: /home/mbolivas/beak_jobs/ac889d20
Uploading query file...
✓ Job submitted: test_taxonomy (ID: ac889d20)
  Database: uniref90 (/srv/protein_sequence_databases/UniRef90)
  PID: 201031


{'job_id': 'ac889d20',
 'name': 'test_taxonomy',
 'status': 'RUNNING',
 'runtime': '0:00:00',
 'job_type': 'taxonomy'}