In [112]:
import yaml
import sys
import warnings
import json
import flatdict
from neo4j_tools import neo4j_connect,  results_2_dict_list

config = yaml.load(open('schema/neo_schema.yml'))
nc = neo4j_connect(config['url'], config['usr'], config['pwd'])
schema = config['schema']

In [113]:
class Test:
    def __init__(self, ds, schema):
        self.ds = ds
        self.base_match = schema_map(schema)
        self.base_match_extended = self.base_match + "<-[:Related { short_form: 'depicts' }]-(j:Individual)"
        
        self.base_final = " WHERE ds.label = '%s' RETURN COUNT (DISTINCT i) as ind_count" \
                    ", COLLECT(i.short_form) as ind_list" % ds
   
        self.base_query = self.base_match + self.base_final
        self.base_query_extended = self.base_match_extended + self.base_final
        self.comparison_query = self.base_query
        
    def set_extended_base_comparison_query(self,extended_match):
        self.comparison_query = self.base_match_extended + extended_match + self.base_final

    def set_base_comparison_query(self,extended_match):
        self.comparison_query = self.base_match + extended_match + self.base_final
        
    def set_description(self,description):
        self.description = description
    
    def test(self):
        return compare(dataset=self.ds,
                       description=self.description,
                       query1=self.base_query,
                       query2=self.comparison_query)


In [114]:
def schema_map(x):
    return {
        'old': "MATCH (ds:DataSet)<-[:has_source]-(i:Individual)",
        'new': "MATCH (ds:DataSet)<-[:Annotation { short_form: 'source'}]-(i:Individual)"
    }.get(x,"MATCH (ds:DataSet)<-[:has_source]-(i:Individual)")
    
def write_returncode(test_failed):
    if test_failed:
        returncode = '1'
    else: 
        returncode = '0'
    
    with open("schemaconformance.txt", "w") as text_file:
        print(f"{returncode}", file=text_file)

def query(query):
    q = nc.commit_list([query])
    if not q:
        return False
    dc = results_2_dict_list(q)
    if not dc:
        return False
    else:
        return dc

def query_ind_count(query):
    q = nc.commit_list([query])
    if not q:
        return False
    dc = results_2_dict_list(q)
    if not dc:
        return False
    if not ('ind_count' in dc[0].keys()):
        warnings.warn("Query has no ind_count")
        return False
    else:
        return dc[0]['ind_count']

def runtest(ds, description, expanded_base_comparison_query, extend_base, out):
    for d in dc:
        ds = d['ds.label']
        #print ("Testing: " + ds)
        t = Test(ds,schema)
        t.set_description(description)
        if extend_base:
            t.set_base_comparison_query(expanded_base_comparison_query)
        else:
            t.set_extended_base_comparison_query(expanded_base_comparison_query)

        out[ds][description] = t.test() 
    
def compare(dataset, description, query1, query2, verbose = False, write_reports = False):
    r1 = query(query1)[0]
    r2 = query(query2)[0]
    if r1['ind_count'] == r2['ind_count']:
        if verbose:
            print(query2)
            print("Testing assertion:" + description)
            print("Result: True")
        return True
    else:
        if verbose:
            print("Testing assertion:" + description)
            print(query2)
            print("Result: inds_in_datset: %d ; Compliant with pattern: %d" % (r1['ind_count'],  r2['ind_count']))
        # Should probably turn this into a report
        if write_reports:
            bad_inds = list(set(r1['ind_list']) - set(r2['ind_list']))
            file = open(dataset + ".report", 'w')
            file.write(json.dumps(bad_inds))
            file.close()
        return False

In [115]:
datasets = nc.commit_list(["MATCH (ds:DataSet) RETURN ds.label"])
dc = results_2_dict_list(datasets)

return_state = True
test_stats = {}
for d in dc:
    ds = d['ds.label']
    test_stats[ds] = {}

# All anatomical individuals in dataset have matching channel individuals.

In [116]:
runtest(dc, 
        "All anatomical individuals in dataset have matching channel individuals.",
        "",False,
        test_stats)    

# All anatomical individuals in dataset have matching registered channel individuals.

In [117]:
runtest(dc, 
        "All anatomical individuals in dataset have matching registered channel individuals.",
        "-[{ iri: 'http://purl.obolibrary.org/obo/RO_0002026' }]->(k:Individual)", False,
        test_stats)    

# All anatomical individuals in dataset have matching channel individuals with imaging method.

In [118]:
runtest(dc, 
        "All anatomical individuals in dataset have matching channel individuals with imaging method.",
        "-[:Related { label: 'is_specified_output_of'} ]->(:Class)", False,
        test_stats)    

# All anatomical individuals in dataset have matching channel, typed individuals.

In [119]:
runtest(dc, 
        "All anatomical individuals in dataset have matching channel, typed individuals.",
        "-[:INSTANCEOF]->(c:Class { label: 'channel'})", False,
        test_stats)    

# All anatomical individuals in dataset are typed.

In [120]:
runtest(dc, 
        "All anatomical individuals in dataset are typed.",
        "-[:INSTANCEOF]->(c:Class)", True,
        test_stats)    

# Results

In [121]:
print(json.dumps(test_stats, sort_keys=True, indent=4))

{
    "Aso2014": {
        "All anatomical individuals in dataset are typed.": true,
        "All anatomical individuals in dataset have matching channel individuals with imaging method.": true,
        "All anatomical individuals in dataset have matching channel individuals.": true,
        "All anatomical individuals in dataset have matching channel, typed individuals.": true,
        "All anatomical individuals in dataset have matching registered channel individuals.": true
    },
    "BrainName_Ito_half_brain": {
        "All anatomical individuals in dataset are typed.": true,
        "All anatomical individuals in dataset have matching channel individuals with imaging method.": true,
        "All anatomical individuals in dataset have matching channel individuals.": true,
        "All anatomical individuals in dataset have matching channel, typed individuals.": true,
        "All anatomical individuals in dataset have matching registered channel individuals.": true
    },
    "Ca

# Conclusion

In [122]:
failures = flatdict.FlatDict(test_stats).values()
fail = False in failures
if fail:
    failed = failures.count(False)
    print(str(failed) + " out of " + str(len(failures))+ " tests failed.")
else:
    print("All tests passed!")

write_returncode(fail)

All tests passed!
