In [44]:
import requests
import pandas as pd

HEADERS = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def print_error(self, error):
    print('[!] '+ str(error.response.status_code))
    print('[!]  '+ error.response.text)

class Server:
    """
    A remote SNOMED server.
    """
    def __init__(self, uri):
        self.server_uri = uri
        self.branch_uri = None
        # future: allow branch to be passed as a parameter and automatically filled if known
        # if no branch specified, print warning message + advice on getting branches

    def get_response_return_object(self, response, return_type):
        if return_type=='json':
            return response.json()
        elif return_type=='df':
            return pd.DataFrame( response.json()['items'] )
        else:
            print("Error: return_type should be either 'json' (for raw JSON) or 'df' (for pandas DataFrame)")


    def get_snomed_editions(self, return_type='df'):
        """
        Get the SNOMED editions available at the server.
        """
        try:
            r = requests.get(self.server_uri+'codesystems', headers=HEADERS)
            r.raise_for_status()

        except requests.exceptions.HTTPError as e:
            print_error(e)

        if return_type=='json':
            return r.json()
        elif return_type=='df':
            return pd.DataFrame( r.json()['items'] )
        else:
            print("Error: return_type should be either 'json' (for raw JSON) or 'df' (for pandas DataFrame)")

    def get_snomed_versions(self, edition, return_type='df'):
        """
        Get available SNOMED versions for a given edition
        """
        try:
            r = requests.get(self.server_uri+'codesystems/'+edition+'/versions', headers=HEADERS)
            r.raise_for_status()

        except requests.exceptions.HTTPError as e:
            print_error(e)

        return self.get_response_return_object(r, return_type)
    
    def set_branch(self, branch):
        self.branch_uri = self.server_uri + branch


    def run_ecl_query(self, id, max, uri, searchafter=None):
        try:

            request_string = self.branch_uri+'/concepts?ecl='+id+'&limit='+str(max)
            if searchafter is not None:
                request_string = request_string +'&searchAfter=' + str(searchafter)

            r = requests.get(request_string, headers=HEADERS)

            r.raise_for_status()

            return r.json()
        except requests.exceptions.HTTPError as e:
            print('[!] '+ str(e.response.status_code))
            print('[!]  '+ e.response.text)


    def query(self, query_text, max_concepts = 10000):
        # Run the query against the SNOMED server

        # Server can't return any more than 10000 concepts at a time;
        # so if there are more than 10000 result concepts, we repeat the query until they're all retrieved.

        total_concepts = 0 
        concepts_retrieved = 0 # Number of concepts retrieved
        query_results_sets = []

        complete = False
        searchafter = None

        while complete==False:

            # Run the query
            query_results = self.run_ecl_query(query_text, max_concepts, self.branch_uri, searchafter)
            print(query_results)

            total_concepts = query_results['total'] # Get the total number of returned concepts
            concepts_retrieved = concepts_retrieved + max_concepts # Update counter
            query_results_sets.append(query_results) # Added retrieved concepts to output set

            print('Retrieved '+ str(len( query_results['items'] )) +' concepts')

            # If number of retrieved concepts < total number of possible concepts, loop again
            if concepts_retrieved >= total_concepts:
                complete=True
            else:
                searchafter = query_results['searchAfter']

        query_results_df = pd.concat( [pd.DataFrame(i['items']) for i in query_results_sets] )
        return query_results_df



        
# parses results as Pandas dataframes


In [152]:
class Query:
    def __init__(self, query_text=None):
        if query_text is not None:
            self.query_text = query_text

class QueryBuilder:
    def __init__(self):
        pass

class expressionConstraint:
    def getString(self, bracket=True):
        if bracket:
            return '( ' + self.string + ' )'
        else:
            return self.string
    
    # Resolve a mixed list of strings and expressionConstraints into a single list of strings
    def argsToStrings(self, args):
        arg_strings = []
        for a in args:
            if isinstance(a, str):
                arg_strings = arg_strings + [a]
            elif isinstance(a, expressionConstraint):
                arg_strings = arg_strings + [a.getString()]

        return arg_strings

class refinedExpressionConstraint( expressionConstraint ):
    # subExpressionConstraint ws ":" ws eclRefinement
    pass
class compoundExpressionConstraint( expressionConstraint ):
    pass
class dottedExpressionConstraint( expressionConstraint ):
    # subExpressionConstraint 1*(ws dottedExpressionAttribute)
    pass
class subExpressionConstraint( expressionConstraint ):
    pass




class conjunctionExpressionConstraint( compoundExpressionConstraint ):
    # subExpressionConstraint 1*(ws conjunction ws subExpressionConstraint)
    def __init__(self, *args):
        # Convert arguments to strings
        arg_strings = self.argsToStrings(args)
        self.string = (' '+CONJUCTION+' ').join(arg_strings)
                
            

class disjunctionExpressionConstraint( compoundExpressionConstraint ):
    # subExpressionConstraint 1*(ws disjunction ws subExpressionConstraint)
    def __init__(self, *args):
        # Convert arguments to strings
        arg_strings = self.argsToStrings(args)
        self.string = (' '+DISJUNCTION+' ').join(arg_strings)

class exclusionExpressionConstraint( compoundExpressionConstraint ):
    # subExpressionConstraint ws exclusion ws subExpressionConstraint
    def __init__(self, x, y):
        if (isinstance(x, expressionConstraint) ):
            x = x.getString()
        if (isinstance(y, expressionConstraint) ):
            y = y.getString()

        self.string = x + ' ' + EXCLUSION + ' ' + y

    pass

EXCLUSION = 'MINUS'
CONJUCTION = 'AND'
DISJUNCTION = 'OR'

ecl_and = conjunctionExpressionConstraint
ecl_or = disjunctionExpressionConstraint

In [154]:
exclusionExpressionConstraint('a', ecl_and('a','b')).getString()

'( a MINUS ( a AND b ) )'

In [50]:
q = Query('test')

In [45]:
snomed_server = Server('https://browser.ihtsdotools.org/snowstorm/snomed-ct/')
snomed_server.set_branch('MAIN/2024-09-01')

In [46]:
snomed_server.query('<< 74400008 | Appendicitis (disorder) |')

{'items': [{'conceptId': '1255222000', 'active': True, 'definitionStatus': 'FULLY_DEFINED', 'moduleId': '900000000000207008', 'effectiveTime': '20221031', 'fsn': {'term': 'Chronic appendicitis with perforation of appendix (disorder)', 'lang': 'en'}, 'pt': {'term': 'Chronic appendicitis with perforation of appendix', 'lang': 'en'}, 'id': '1255222000', 'idAndFsnTerm': '1255222000 | Chronic appendicitis with perforation of appendix (disorder) |'}, {'conceptId': '1145117000', 'active': True, 'definitionStatus': 'FULLY_DEFINED', 'moduleId': '900000000000207008', 'effectiveTime': '20210731', 'fsn': {'term': 'Gangrenous appendicitis (disorder)', 'lang': 'en'}, 'pt': {'term': 'Gangrenous appendicitis', 'lang': 'en'}, 'id': '1145117000', 'idAndFsnTerm': '1145117000 | Gangrenous appendicitis (disorder) |'}, {'conceptId': '735591005', 'active': True, 'definitionStatus': 'FULLY_DEFINED', 'moduleId': '900000000000207008', 'effectiveTime': '20180131', 'fsn': {'term': 'Acute phlegmonous appendicitis 

Unnamed: 0,conceptId,active,definitionStatus,moduleId,effectiveTime,fsn,pt,id,idAndFsnTerm
0,1255222000,True,FULLY_DEFINED,900000000000207008,20221031,{'term': 'Chronic appendicitis with perforatio...,{'term': 'Chronic appendicitis with perforatio...,1255222000,1255222000 | Chronic appendicitis with perfora...
1,1145117000,True,FULLY_DEFINED,900000000000207008,20210731,"{'term': 'Gangrenous appendicitis (disorder)',...","{'term': 'Gangrenous appendicitis', 'lang': 'en'}",1145117000,1145117000 | Gangrenous appendicitis (disorder) |
2,735591005,True,FULLY_DEFINED,900000000000207008,20180131,{'term': 'Acute phlegmonous appendicitis (diso...,"{'term': 'Acute phlegmonous appendicitis', 'la...",735591005,735591005 | Acute phlegmonous appendicitis (di...
3,733157003,True,FULLY_DEFINED,900000000000207008,20170731,{'term': 'Crohn disease of appendix (disorder)...,"{'term': 'Crohn disease of appendix', 'lang': ...",733157003,733157003 | Crohn disease of appendix (disorde...
4,698294004,True,FULLY_DEFINED,900000000000207008,20140131,{'term': 'Acute appendicitis with localized pe...,{'term': 'Acute appendicitis with localized pe...,698294004,698294004 | Acute appendicitis with localized ...
5,418171008,True,FULLY_DEFINED,900000000000207008,20200131,{'term': 'Complicated appendicitis (disorder)'...,"{'term': 'Complicated appendicitis', 'lang': '...",418171008,418171008 | Complicated appendicitis (disorder) |
6,286967008,True,FULLY_DEFINED,900000000000207008,20020131,{'term': 'Acute perforated appendicitis (disor...,"{'term': 'Acute perforated appendicitis', 'lan...",286967008,286967008 | Acute perforated appendicitis (dis...
7,266439004,True,FULLY_DEFINED,900000000000207008,20020131,{'term': 'Acute appendicitis with appendix abs...,{'term': 'Acute appendicitis with appendix abs...,266439004,266439004 | Acute appendicitis with appendix a...
8,235770006,True,FULLY_DEFINED,900000000000207008,20020131,{'term': 'Acute suppurative appendicitis (diso...,"{'term': 'Acute suppurative appendicitis', 'la...",235770006,235770006 | Acute suppurative appendicitis (di...
9,235769005,True,FULLY_DEFINED,900000000000207008,20040731,{'term': 'Acute focal appendicitis (disorder)'...,"{'term': 'Acute focal appendicitis', 'lang': '...",235769005,235769005 | Acute focal appendicitis (disorder) |
