Common URL API: https://ncbi.github.io/blast-cloud/dev/api.html  
Biopython BLAST code: https://github.com/biopython/biopython/blob/171697883aca6894f8367f8f20f1463ce7784d0c/Bio/Blast/NCBIWWW.py  
Useful comments: https://github.com/biopython/biopython/blob/a338d04074585afe7e1f8f418c9107c05dd5e38d/Doc/Tutorial/chapter_blast.tex  
Example use: http://fenyolab.org/presentations/Introduction_Biostatistics_Bioinformatics_2014/tutorials/week3/BLAST%20with%20BioPython.pdf

___

In [1]:
%load_ext blackcellmagic

___

In [69]:
import time
import logging
# Add time stamp to logging messages
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%d %b %Y %H:%M:%S")
import sys

from urllib.request import urlopen
from urllib.parse import urlencode
from urllib.request import Request

import re
from bs4 import BeautifulSoup
from bs4 import Comment


def blast(
    program,
    database,
    sequence,
    ncbi_gi=False,
    descriptions=500,
    alignments=500,
    hitlist_size=50,
    expect=10.0,
    low_comp_filt=False,
    megablast=True,
    verbose=True,
):
    """
    BLAST search using NCBI's QBLAST server.
    Parameters:
     - program        blastn, blastp, blastx, tblastn, or tblastx
     - database       nt, nr, refseq_rna, refseq_protein, swissprot, pdbaa, or pdbnt
                      (More info: https://ncbi.github.io/blast-cloud/blastdb/available-blastdbs.html)
     - sequence       The sequence to search.
     - ncbi_gi        True/False whether to return NCBI GI identifiers. Default False.
     - descriptions   int or None. Limit number of descriptions to show. Default 500.
     - alignments     int or None. Limit number of alignments to show. Default 500.
     - hitlist_size   int or None. Limit number of hits to return. Default 50.
     - expect         int or None. An expect value cutoff. Default 10.0.
     - low_comp_filt  True/False whether to low complexity filter. Default False.
     - megablast      True/False whether to use the MEGABLAST algorithm (blastn only). Default True.
     - verbose        True/False whether to print progress information. Default True.

    Please note this NCBI server rule:
    Run scripts weekends or between 9 pm and 5 am Eastern time
    on weekdays if more than 50 searches will be submitted.

    Please note that NCBI uses the new Common URL API for BLAST searches
    on the internet (http://ncbi.github.io/blast-cloud/dev/api.html). Thus,
    some of the parameters used by this function are not (or are no longer)
    officially supported by NCBI. Although they are still functioning, this
    may change in the future.

    This function does not check the validity of the parameters
    and passes the values to the server as is. More help is available at:
    https://ncbi.github.io/blast-cloud/dev/api.html

    Code partly adapted and modified from the Biopython BLAST NCBIWWW project written
    by Jeffrey Chang (Copyright 1999), Brad Chapman, and Chris Wroe distributed under the
    Biopython License Agreement and BSD 3-Clause License
    https://github.com/biopython/biopython/blob/171697883aca6894f8367f8f20f1463ce7784d0c/LICENSE.rst
    """
    # Server rules:
    # 1. Do not contact the server more often than once every 10 seconds.
    # 2. Do not poll for any single RID more often than once a minute.
    # 3. Use the URL parameter email and tool, so that the NCBI
    #    can contact you if there is a problem.
    # 4. Run scripts weekends or between 9 pm and 5 am Eastern time
    #    on weekdays if more than 50 searches will be submitted.
    # Reference: https://blast.ncbi.nlm.nih.gov/Blast.cgi?CMD=Web&PAGE_TYPE=BlastDocs&DOC_TYPE=DeveloperInfo

    # Define server URL and client
    url = "https://blast.ncbi.nlm.nih.gov/Blast.cgi"
    client = "ggetClient"

    ## Clean up parameters
    # If the path to a fasta file was provided instead of a nucleotide sequence,
    # read the file and extract the sequence
    if ".fa" in sequence:
        from Bio import SeqIO

        sequence = SeqIO.read(sequence, format="fasta").seq

    # Convert program to lower case
    program = program.lower()
    # Check if programs was defined as expected
    programs = ["blastn", "blastp", "blastx", "tblastn", "tblastx"]
    if program not in programs:
        raise ValueError(
            "Program specified is %s. Expected one of %s"
            % (program, ", ".join(programs))
        )

    # Translate filter and ncbi_gi parameters
    if low_comp_filt == False:
        low_comp_filt = None
    else:
        low_comp_filt = "T"

    if ncbi_gi == False:
        ncbi_gi = None
    else:
        ncbi_gi = "T"

    if megablast == False:
        megablast = None
    else:
        megablast = "on"

    ## Submit search
    # Parameters for the PUT command
    put_parameters = [
        ("PROGRAM", program),
        ("DATABASE", database),
        ("QUERY", sequence),
        ("NCBI_GI", ncbi_gi),
        ("DESCRIPTIONS", descriptions),
        ("ALIGNMENTS", alignments),
        ("HITLIST_SIZE", hitlist_size),
        ("EXPECT", expect),
        ("FILTER", low_comp_filt),
        ("MEGABLAST", megablast),
        ("CMD", "Put"),
    ]

    # Define query
    put_query = [x for x in put_parameters if x[1] is not None]
    put_message = urlencode(put_query).encode()

    # Submit search to server
    request = Request(url, put_message, {"User-Agent": client})
    handle = urlopen(request)

    ## Fetch Request ID (RID) and estimated time to completion (RTOE)
    RID, RTOE = _parse_qblast_ref_page(handle)
        
    # Wait for search to complete
    # (At least 11 seconds to comply with server rule 1)
    if RTOE < 11:
        time.sleep(11)
        # Communicate RTOE
        if verbose == True:
            logging.warning(f"BLAST initiated. Estimated time to completion: 11 seconds.")
    else:
        time.sleep(int(RTOE))
        # Communicate RTOE
        if verbose == True:
            logging.warning(f"BLAST initiated. Estimated time to completion: {RTOE} seconds.")

    ## Poll server for status and fetch search results
    # Parameters for the GET command
    get_parameters = [
        ("RID", RID),
        ("ALIGNMENTS", alignments),
        ("DESCRIPTIONS", descriptions),
        ("HITLIST_SIZE", hitlist_size),
        ("FORMAT_TYPE", "TEXT"),
        ("NCBI_GI", ncbi_gi),
        ("CMD", "Get"),
    ]
    get_query = [x for x in get_parameters if x[1] is not None]
    get_message = urlencode(get_query).encode()

    ## Poll NCBI until the results are ready
    searching = True
    i = 0
    while searching:
        if i > 0:
            # Sleep for 61 seconds if first fetch was not succesful
            # to comply with server rules
            time.sleep(61)

        # Query for search status
        request = Request(url, get_message, {"User-Agent": client})
        handle = urlopen(request)
        results = handle.read().decode()
        
        # Fetch search status
        i = results.index("Status=")
        j = results.index("\n", i)
        status = results[i + len("Status=") : j].strip()

        if status == "WAITING":
            if verbose == True:
                logging.warning("BLASTING...")
            i += 1
            continue

        if status == "FAILED":
            raise ValueError(f"Search {RID} failed; please report to blast-help@ncbi.nlm.nih.gov.")

        if status == "UNKNOWN":
            raise ValueError(f"NCBI status {status}. Search {RID} expired.")

        if status == "READY":
            if verbose == True:
                logging.warning("Retrieving results...")
            # Stop search
            searching = False

            ## Return results
            soup = BeautifulSoup(results, "html.parser")

            if verbose == True:
                logging.warning("BLAST complete...")
    
            return soup.find("pre")

        else:
            raise ValueError(f"""
            Something unexpected happened. \n
            Search {RID} possibly failed; please report to blast-help@ncbi.nlm.nih.gov\n
            or post an issue on Github: https://github.com/lauraluebbert/gget\n
            """
            )

def _parse_qblast_ref_page(handle):
    """
    Extract a tuple of RID, RTOE from the NCBI 'please wait' page.
    RTOE = 'Estimated time fo completion.'
    RID = 'Request ID'.

    Code adapted from the Biopython BLAST NCBIWWW project written
    by Jeffrey Chang (Copyright 1999), Brad Chapman, and Chris Wroe distributed under the
    Biopython License Agreement and BSD 3-Clause License
    https://github.com/biopython/biopython/blob/171697883aca6894f8367f8f20f1463ce7784d0c/LICENSE.rst
    """
    s = handle.read().decode()
    i = s.find("RID =")
    if i == -1:
        rid = None
    else:
        j = s.find("\n", i)
        rid = s[i + len("RID =") : j].strip()

    i = s.find("RTOE =")
    if i == -1:
        rtoe = None
    else:
        j = s.find("\n", i)
        rtoe = s[i + len("RTOE =") : j].strip()

    if not rid and not rtoe:
        # Can we reliably extract the error message from the HTML page?
        # e.g.  "Message ID#24 Error: Failed to read the Blast query:
        #       Nucleotide FASTA provided for protein sequence"
        # or    "Message ID#32 Error: Query contains no data: Query
        #       contains no sequence data"
        #
        # This used to occur inside a <div class="error msInf"> entry:
        i = s.find('<div class="error msInf">')
        if i != -1:
            msg = s[i + len('<div class="error msInf">') :].strip()
            msg = msg.split("</div>", 1)[0].split("\n", 1)[0].strip()
            if msg:
                raise ValueError("Error message from NCBI: %s" % msg)
        # In spring 2010 the markup was like this:
        i = s.find('<p class="error">')
        if i != -1:
            msg = s[i + len('<p class="error">') :].strip()
            msg = msg.split("</p>", 1)[0].split("\n", 1)[0].strip()
            if msg:
                raise ValueError("Error message from NCBI: %s" % msg)
        # Generic search based on the way the error messages start:
        i = s.find("Message ID#")
        if i != -1:
            # Break the message at the first HTML tag
            msg = s[i:].split("<", 1)[0].split("\n", 1)[0].strip()
            raise ValueError("Error message from NCBI: %s" % msg)
        # If we cannot recognise the error layout:
        raise ValueError(
            "No request ID and no estimated time to completion found in the NCBI 'please wait' page, "
            "there was probably an error in your request but we "
            "could not extract a helpful error message."
        )
    elif not rid:
        # Can this happen?
        raise ValueError(
            "No request ID found in the 'please wait' page. (Although estimated time to completion = %r)"
            % rtoe
        )
    elif not rtoe:
        # Can this happen?
        raise ValueError(
            "No estimated time to completion found in the 'please wait' page. (Although request ID = %r)"
            % rid
        )

    try:
        return rid, int(rtoe)
    except ValueError:
        raise ValueError(
            "A non-integer estimated time to completion found in the 'please wait' page, %r"
            % rtoe
        ) from None

___

Get a sequence to BLAST:

In [3]:
from gget import *

In [4]:
search("swiss cheese", "drosophila")

Fetching results from database: drosophila_melanogaster_core_105_9
Query time: 2.74 seconds
Genes fetched: 1


Unnamed: 0,Ensembl_ID,Gene_name,Ensembl_description,Ext_ref_description,Biotype,URL
0,FBgn0003656,sws,swiss cheese,,protein_coding,https://uswest.ensembl.org/drosophila_melanoga...


In [5]:
fastsa = seq("FBgn0003656", save=True)

___

BLAST sequence:

In [70]:
program = "blastn"
db = "nt"

# seq = "seq_results.fa"
seq = "TTTGTAGTTACATAGCAAAATGCGCGTTTATTTCGGCTCAGTAAATTAAGAACATTTTCGTTACACGTTGCCACGCCCCCCCATCCAACGGCAAACCACCCCCCGCCTCCGCCTTCAAGACAGAGAGAGAGATAGAATAAAACAGAGAGAGAGCAACAACAACAATATATAACCAAAACGAAAGATTTCCGCAAGAAACCACAAAAAAAAACAACAATTGCTTTGGTTCTGATGATTCTCGTAACAGCAAAAACAACAAGATTTATAATCTAAATCAAAAGAAACTGTGCTCTCCGCTGTGCGTGGCTGTGAGTGCGTTTGTGCGGTACGGTGCGTGCATTATAACAATTATTGCACAAAAGGCATGACAAACGTGGGGCGGGGGTGTCTGTAAATGTGAGTGCGATAGAGCGCGACATATTCAAACAAGCTAGGGACGGAGTAGGTGAGCGAGGGAGAGAGAGAACGCGACGGTGGCAACAACAAAGGGCTGTGATTTTACTTTGCCCCCTCCACCCTGTACCCCGTCCCATCCATACACTTGCGTATTTACCAAATAAACGGAACATACATAAGCCAAAATAGGCAGAACAACAACAACATTAAGAGTCAACTAAACAGCAAATCGAATAGTTTCGTGGGAGAAAGGACAGCGTAGCAGAGTCATTGGAAAACTGGCCGCGTAAGGGAGCGCAATGGATGTCCTGGAAATGCTGCGCGCCAGCGCCAGCGGCAGCTACAACACAATATTCTCGGACGCGTGGTGCCAATATGTCTCCAAGCAGATCACAGCAACGGTAAGTGCACTTCACCCTTTCTTTGTCTTTTTATTTGCTTTGACTTTGGCTTGCTGTTTGTTTGCAATATGAGTGAGTCAGTTGGAAGACAATATGTGCGAGAGCGAGAGGA"

In [71]:
blast_result = blast(program, db, seq)

27 Mar 2022 12:31:15 PM BLAST initiated. Estimated time to completion: 30 seconds.
27 Mar 2022 12:31:47 PM Retrieving results...
27 Mar 2022 12:31:47 PM BLAST complete...


In [68]:
blast_result

<pre>
BLASTN 2.13.0+
Reference: Zheng Zhang, Scott Schwartz, Lukas Wagner, and
Webb Miller (2000), "A greedy algorithm for aligning DNA
sequences", J Comput Biol 2000; 7(1-2):203-14.


Reference for database indexing: Aleksandr Morgulis, George
Coulouris, Yan Raytselis, Thomas L. Madden, Richa Agarwala,
Alejandro A. Schaffer (2008), "Database Indexing for
Production MegaBLAST Searches", Bioinformatics 24:1757-1764.


RID: 41ESJX1K013


Database: Nucleotide collection (nt)
           80,647,477 sequences; 709,742,327,574 total letters
Query= 
Length=909


                                                                   Score     E     Max
Sequences producing significant alignments:                       (Bits)  Value  Ident

CP023335.1 Drosophila melanogaster strain rover (forR) chromos...  1679    0.0    100%        
CP023329.1 Drosophila melanogaster strain sitter (fors) chromo...  1679    0.0    100%        
AE014298.5 Drosophila melanogaster chromosome X                    1679   