In [1]:
from pprint import pprint
import requests
import pandas as pd
search_url = 'https://www.ebi.ac.uk/pdbe/search/pdb/select?'

In [2]:
def make_request_post(search_dict, number_of_rows=10):
    """
    makes a post request to the PDBe API
    :param dict search_dict: the terms used to search
    :param number_of_rows: number or rows to return - initially limited to 10
    :return dict: response JSON
    """
    # make sure we get the number of rows we need
    if 'rows' not in search_dict:
        search_dict['rows'] = number_of_rows
    # set the return type to JSON
    search_dict['wt'] = 'json'

    # do the query
    response = requests.post(search_url, data=search_dict)

    if response.status_code == 200:
        return response.json()
    else:
        print("[No data retrieved - %s] %s" % (response.status_code, response.text))

    return {}
def format_sequence_search_terms(sequence, filter_terms=None):
    """
    Format parameters for a sequence search
    :param str sequence: one letter sequence
    :param lst filter_terms: Terms to filter the results by
    :return str: search string
    """
    # first we set the parameters which we will pass to PDBe's search
    params = {
        'json.nl': 'map',
        'start': '0',
        'sort': 'fasta(e_value) asc',
        'xjoin_fasta': 'true',
        'bf': 'fasta(percentIdentity)',
        'xjoin_fasta.external.expupperlim': '0.1',
        'xjoin_fasta.external.sequence': sequence,
        'q': '*:*',
        'fq': '{!xjoin}xjoin_fasta'
    }
    # we make sure that we add required filter terms if they aren't present
    if filter_terms:
        for term in ['pdb_id', 'entity_id', 'entry_entity', 'chain_id']:
            filter_terms.append(term)
        filter_terms = list(set(filter_terms))
        params['fl'] = ','.join(filter_terms)

    # returns the parameter dictionary
    return params
def run_sequence_search(sequence, filter_terms=None, number_of_rows=100):
    """
    Runs a sequence search and results the results
    :param str sequence: sequence in one letter code
    :param lst filter_terms: terms to filter the results by
    :param int number_of_rows: number of results to return
    :return lst: List of results
    """
    search_dict = format_sequence_search_terms(sequence=sequence, filter_terms=filter_terms)
    response = make_request_post(search_dict=search_dict, number_of_rows=number_of_rows)
    results = response.get('response', {}).get('docs', [])
    print('Number of results {}'.format(len(results)))

    # we now have to go through the FASTA results and join them with the main results

    raw_fasta_results = response.get('xjoin_fasta').get('external')
    fasta_results = {} # results from FASTA will be stored here - key'd by PDB ID and Chain ID

    # go through each FASTA result and get the E value, percentage identity and sequence from the result

    for fasta_row in raw_fasta_results:
        # join_id = fasta_row.get('joinId')
        fasta_doc = fasta_row.get('doc', {})
        percent_identity = fasta_doc.get('percent_identity')
        e_value = fasta_doc.get('e_value')
        return_sequence = fasta_row.get('return_sequence_string')
        pdb_id_chain = fasta_doc.get('pdb_id_chain').split('_')
        pdb_id = pdb_id_chain[0].lower()
        chain_id = pdb_id_chain[-1]
        join_id = '{}_{}'.format(pdb_id, chain_id)
        fasta_results[join_id] = {'e_value': e_value,
                                  'percentage_identity': percent_identity,
                                  'return_sequence': return_sequence}
    # now we go through the main results and add the FASTA results
    ret = [] # final results will be stored here.
    for row in results:
        pdb_id = row.get('pdb_id').lower()
        chain_ids = row.get('chain_id')
        for chain_id in chain_ids:
            search_id = '{}_{}'.format(pdb_id, chain_id)
            entry_fasta_results = fasta_results.get(search_id, {})
            # we will only keep results that match the search ID
            if entry_fasta_results:
                row['e_value'] = entry_fasta_results.get('e_value')
                row['percentage_identity'] = entry_fasta_results.get('percentage_identity')
                row['result_sequence'] = entry_fasta_results.get('return_sequence_string')

                ret.append(row)
    return ret

In [None]:
import os
os.chdir('/Users/avani_mahadik/Documents/avani/Data')

In [4]:
results_list = []

with open('ha.txt','r') as fh:
    for line in fh:
        sequence_to_search = line.strip()

        filter_list = ['pfam_accession', 'pdb_id', 'molecule_name',
                       'uniprot_accession_best','resolution','mutation','has_modified_residues']

        first_results = run_sequence_search(sequence_to_search, filter_terms=filter_list)
        first_results = [sequence_to_search]+first_results
        results_list.append(first_results)

Number of results 90
Number of results 0
Number of results 0
Number of results 2
Number of results 90
Number of results 80
Number of results 80
Number of results 100
Number of results 100
Number of results 100
Number of results 100
Number of results 100
Number of results 100
Number of results 100
Number of results 100


In [5]:
len(results_list)

15

In [16]:
def change_lists_to_strings(results):
    """
    updates lists to strings for loading into Pandas
    :param dict results: dictionary of results to process
    :return dict: dictionary of results
    """
    for row in results:
        for data in row:
            if type(row[data]) == list:
                # if there are any numbers in the list change them into strings
                row[data] = [str(a) for a in row[data]]
                # unique and sort the list and then change the list into a string
                row[data] = ','.join(sorted(list(set(row[data]))))

    return results

def pandas_dataset(list_of_results):
    df_dict = {}
    for res in list_of_results:
        query_seq = res[0]
        results = change_lists_to_strings(res[1:])  # we have added our function to change lists to strings
        df = pd.DataFrame(results)
        df_dict[query_seq] = df
    return df_dict

In [18]:
df_dict = pandas_dataset(results_list)

In [39]:
def apply_filter(df_dict, query):
    return {seq: df.query(query) for seq, df in df_dict.items() if not df.empty}

In [40]:
df_dict2 = apply_filter(df_dict, 'percentage_identity>50')

In [None]:
len(df2)

In [None]:
df2.head()

In [None]:
df2.to_csv("Search_results.csv")