**BLAST Search on Google Batch**

Run a simple BLAST search against PDB SEQRES on Google Batch

Set basic parameters (project, region, bucketname, etc)


In [None]:
project_id = "<project_id>"
region  = "us-central1"
bucket_name = "<bucket_name>"
query_seq_filename = "query.fasta"
machine_type = "e2-standard-4"
provisioning_model = "SPOT"


Install required python modules

In [None]:
!pip install google-cloud-storage
!pip install google-cloud-batch
!pip install gcsfs

In [None]:
from google.cloud import storage
from google.cloud import batch_v1
from google.cloud.batch_v1.types import JobStatus
import time
import datetime
import gcsfs

In [None]:
sc = storage.Client()
b = storage.Bucket(sc,name=bucket_name, user_project=project_id)
if(not b.exists()):
  print(f"creating bucket: {bucket_name}")
  b.create()

Define our query sequence and write to GCS bucket

In [None]:
query_seq = """
>2PE4_1|Chain A|Hyaluronidase-1|Homo sapiens (9606)
RSFRGPLLPNRPFTTVWNANTQWCLERHGVDVDVSVFDVVANPGQTFRGPDMTIFYSSQLGTYPYYTPTGEPVFGGLPQNASLIAHLARTFQDILAAIPAPDFSGLAVIDWEAWRPRWAFNWDTKDIYRQRSRALVQAQHPDWPAPQVEAVAQDQFQGAARAWMAGTLQLGRALRPRGLWGFYGFPDCYNYDFLSPNYTGQCPSGIRAQNDQLGWLWGQSRALYPSIYMPAVLEGTGKSQMYVQHRVAEAFRVAVAAGDPNLPVLPYVQIFYDTTNHFLPLDELEHSLGESAAQGAAGVVLWVSWENTRTKESCQAIKEYMDTTLGPFILNVTSGALLCSQALCSGHGRCVRRTSHPKALLLLNPASFSIQLTPGGGPLSLRGALSLEDQAQMAVEFKCRCYPGWQAPWCERKSMWTGHHHHHH
"""

In [None]:
fs = gcsfs.GCSFileSystem(project=project_id)
with fs.open(f"gs://{bucket_name}/{query_seq_filename}", "w") as f:
  f.write(query_seq)

In [None]:
#download PDB SEQRES records (if necessary)
if storage.blob.Blob(bucket=b, name="pdb_seqres.txt.gz").exists():
  print(f"pdb_seqres.txt.gz already exists")
else:
  print("downloading pdb_seqres.txt.gz")
  !wget -nv https://files.wwpdb.org/pub/pdb/derived_data/pdb_seqres.txt.gz
  !gsutil cp pdb_seqres.txt.gz gs://{bucket_name}

In [None]:
#download BLAST binaries (if necessary
if storage.blob.Blob(bucket=b, name="ncbi-blast-2.15.0+-x64-linux.tar.gz").exists():
  print(f"ncbi-blast-2.15.0+-x64-linux.tar.gz already exists")
else:
  print("downloading ncbi-blast-2.15.0+-x64-linux.tar.gz")
  !wget -nv https://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/LATEST/ncbi-blast-2.15.0+-x64-linux.tar.gz
  !gsutil cp ncbi-blast-2.15.0+-x64-linux.tar.gz gs://{bucket_name}

The batch script for running Blast

In [None]:
blast_batch_script = f"""
    echo "Run blast"
    date
    cd /tmp

    echo "INSTALL BLAST"
    gsutil cp gs://{bucket_name}/ncbi-blast-2.15.0+-x64-linux.tar.gz .
    tar zxvf ncbi-blast-2.15.0+-x64-linux.tar.gz
    mv ncbi-blast-2.15.0+/ blast

    echo "COPY pdb_seqres"
    gsutil cp gs://{bucket_name}/pdb_seqres.txt.gz .

    echo "DECOMPRESS"
    gzip -d pdb_seqres.txt.gz

    echo "RENAME"
    mv pdb_seqres.txt pdb_seqres
    echo "CREATE BLAST DB"
    /tmp/blast/bin/makeblastdb -in pdb_seqres -out pdb_seqres -title pdb_seqres -dbtype prot -parse_seqids

    echo "RETRIEVE/CREATE QUERY SEQUENCE"
    #/tmp/blast/bin/blastdbcmd -db pdb_seqres -dbtype prot -entry 1mbd_A -out 1mbd_A.fasta
    gsutil cp gs://{bucket_name}/{query_seq_filename} .
    ls -l
    cat {query_seq_filename}

    echo "RUN BLAST SEARCH"
    #remove any previous search results
    gsutil rm gs://{bucket_name}/search.out
    /tmp/blast/bin/blastp -db pdb_seqres -query {query_seq_filename} -out search.out -outfmt 10

    echo "DISPLAY AND SAVE RESULTS"
    head search.out
    gsutil cp search.out gs://{bucket_name}
    gsutil ls gs://{bucket_name}
    date
    echo "===FINISH==="
    """

Inspect expanded batch script if necessary

In [None]:
print(blast_batch_script)

In [None]:
#adapted from https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/batch/create/create_with_script_no_mounting.py

def create_script_job(project_id: str, region: str, job_name: str, script: str) -> batch_v1.Job:
    client = batch_v1.BatchServiceClient()

    # Define what will be done as part of the job.
    task = batch_v1.TaskSpec()
    runnable = batch_v1.Runnable()
    runnable.script = batch_v1.Runnable.Script()
    runnable.display_name = "blast_search"
    runnable.script.text = script

    task.runnables = [runnable]

    #Specify what resources are requested by each task.
    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 4000
    resources.memory_mib = 16000
    task.compute_resource = resources

    task.max_retry_count = 1
    task.max_run_duration = "3600s"

    group = batch_v1.TaskGroup()
    group.task_count = 1
    group.task_spec = task

    allocation_policy = batch_v1.AllocationPolicy()
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = machine_type
    policy.provisioning_model = provisioning_model

    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    # We use Cloud Logging as it's an out of the box available option
    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    create_request = batch_v1.CreateJobRequest()
    create_request.job = job
    create_request.job_id = job_name
    # The job's parent is the region in which the job will run
    create_request.parent = f"projects/hardtack-1998/locations/us-central1"

    return client.create_job(create_request)

Block until batch job has completed and display progress

In [None]:
ts = int(time.time())
jobname = "pyjob-{}".format(ts)
print("jobname: {}".format(jobname))
job = create_script_job(project_id, region, jobname, blast_batch_script )
print(f"job.name: {job.name}")
client = batch_v1.BatchServiceClient()
while True:
  request = batch_v1.GetJobRequest(
    name = job.name
  )
  response = client.get_job(request=request)
  now = datetime.datetime.now()
  match response.status.state:
    case JobStatus.State.QUEUED:
      print(f"job is QUEUED     @ {now}")
    case JobStatus.State.SCHEDULED:
      print(f"job is SCHEDULED  @ {now}")
    case JobStatus.State.RUNNING:
      print(f"job is RUNNING    @ {now}")
    case JobStatus.State.SUCCEEDED:
      print(f"job has SUCCEEDED @ {now}")
      break
    case JobStatus.State.FAILED:
      print(f"job has FAILED    @ {now}")
      break
  time.sleep(15)

In [None]:
!gsutil cp gs://{bucket_name}/search.out .

In [None]:
!head search.out