In [None]:
from datetime import date
import itertools
import logging
import pandas as pd
import numpy as np
import datetime
import glob
import os
import pickle
import ete3
import geopy
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter

In [None]:
VALIDATION_VERSION = '0.1.0'
ANOSPP_VERSION = '4.0'
BIOSCAN_VERSION = '2.0'

In [None]:
logging.getLogger().setLevel(logging.INFO)
# logging.getLogger().setFormat('[%(levelname)s] %(message)s')

def setup_logging(verbose=False):
    try: 
        del logging.root.handlers[:]
    except:
        pass
    if verbose:
        logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s')
    else:
        logging.basicConfig(level=logging.WARNING, format='[%(levelname)s] %(message)s')
setup_logging(verbose=True)   
logging.info('test')

In [None]:
anospp_fn = '../data/Anopheles_Metadata_Manifest_V4.0_20221220.xlsx'
biosc_fn = '../data/BIOSCAN_Manifest_V2.0_20221017.xlsx'

In [None]:
# download and install taxonomy
ncbi = ete3.NCBITaxa()
# run update if needed
# ncbi.update_taxonomy_database()

In [None]:
def get_data(fn, sheet='TAB 2 Metadata Entry'):

    logging.info('reading data from {!r} sheet {!r}'.format(fn, sheet))
    
    df = pd.read_excel(fn, dtype=str, index_col=0, keep_default_na=False,
                       sheet_name=sheet)
        
    return df

# df = get_data(anospp_fn, sheet='TAB 3 TEST Metadata Entry')

In [None]:
def validate_series(df):
    
    # series should be 1,2, ..., nsamples
    logging.info('validating SERIES')
    
    if df.index.duplicated().any():
        logging.error('duplicate SERIES: {}'.format(df.index[df.index.duplicated()].to_list()))
    
    # exclude non-numeric SERIES - these don't work with ranges
    series_numeric = df.index.astype(str).str.isnumeric()
    if not series_numeric.all():
        logging.error(f'Found and excluded non-numeric SERIES: {df.index[~series_numeric].to_list()}')
        df = df.loc[series_numeric]
        
    # check the remaining SERIES are continuous
    expected_series = set([str(i) for i in range(1, df.shape[0] + 1)])
    observed_series = set(df.index.astype(str))
    if expected_series != observed_series:
        logging.error(f'In SERIES, {sorted(list(expected_series - observed_series))} are missing, '
                      f'{sorted(list(observed_series - expected_series))} are unexpected')
        
    return df
        
# df = validate_series(df)

In [None]:
def index_ranges(df):
    # based on https://stackoverflow.com/questions/4628333/converting-a-list-of-integers-into-range-in-python
    i = df.index.to_list()
    ranges = []
    for a, b in itertools.groupby(enumerate(i), lambda pair: pair[1] - pair[0]):
        b = list(b)
        if b[0][1] != b[-1][1]:
            ranges.append(f'{b[0][1]}-{b[-1][1]}')
        else:
            ranges.append(f'{b[0][1]}')
            
    return ', '.join(ranges)

# index_ranges(df)

In [None]:
def remove_trailing_spaces(df):
    for col in df.columns:
        trailing_spaces = (df[col].str.startswith(' ') | df[col].str.endswith(' '))
        if trailing_spaces.any():
            logging.warning('trailing spaces found in column {!r}, SERIES {}. Removing for validation'.format(col,
                index_ranges(df.loc[trailing_spaces])))
            df[col] = df[col].str.strip()
            
    return df

# df = remove_trailing_spaces(df)

In [None]:
def remove_nonbreaking_spaces(df):
    for col in df.columns:
        nonbreaking_spaces = df[col].str.contains(u"\u00A0")
        if nonbreaking_spaces.any():
            logging.warning('non-breaking spaces found in column {!r}, SERIES {}. Removing for validation'.format(col,
                index_ranges(df.loc[nonbreaking_spaces])))
            df[col] = df[col].str.replace(u"\u00A0", " ")
            
    return df

In [None]:
def check_columns(df, template_df):
    
    logging.info('checking manifest columns against template')
    
    data_cols = set(df.columns)
    template_cols = set(template_df.columns)
        
    if data_cols - template_cols != set():
        logging.warning('extra columns in filled manifest compared to template: {}'.format(data_cols - template_cols))
    if template_cols - data_cols != set():
        logging.error('template columns missing from filled manifest: {}'.format(template_cols - data_cols))

# check_columns(df, template_df)

In [None]:
def get_valid_dict(fn, validation_sheet='Data Validation - do not edit'):
    
    # pick up validation values from data validation sheet
    logging.info('extracting value validation data from {!r}'.format(fn))
    valid_df = pd.read_excel(fn, dtype=str, sheet_name=validation_sheet)
    valid_dict = dict()
    for col in valid_df.columns:
        valid_dict[col] = valid_df[col].dropna().to_list()
    
    return valid_dict

# valid_dict = get_valid_dict(anospp_fn, validation_sheet='TAB 5 Data Validation - do not ')

In [None]:
def exclude_missing(series, na_values=[]):
    
    # valid missing data 
    if len(na_values) > 0:
        no_data = (series.isin(na_values))
        logging.info('excluding {} {!r} samples without data in {!r}'.format(no_data.sum(), na_values, series.name))
        return series[~no_data]
    return series
    
# exclude_missing(df['TIME_OF_COLLECTION'], na_values=['NOT_COLLECTED',''])

In [None]:
def validate_contributors(fn, contrib_sheet='TAB 1 Contributors'):
    
    logging.info(f'validating contributors in {fn}')
        
    df = pd.read_excel(fn, dtype=str, keep_default_na=False,
                       sheet_name=contrib_sheet)
    
    df = remove_trailing_spaces(df)
    
    expected_columns = ['SURNAME','FIRST_NAME','PRIMARY_AFFILIATION','EMAIL ADDRESS','CONTRIBUTION','CONFIRMATION']
    
    if set(df.columns) != set(expected_columns):
        ec = set(df.columns) - set(expected_columns)
        mc = set(expected_columns) - set(df.columns)
        logging.error(f'mismatch in contributor columns: extra {ec}, missing {mc}')
    
    for delim_char in (';','|'):
        for col in df.columns:
            if df[col].str.contains(delim_char, regex=False).any():
                logging.error(f'contributor column {col} contains delimiter "{delim_char}"')
    
    df['FULL_NAME'] = df['FIRST_NAME'] + ' ' + df['SURNAME']
    df['PARTNER_CODE'] = (df['SURNAME'].str.slice(0,2) + df['FIRST_NAME'].str.slice(0,2)).str.upper()
    is_template_name = (df['FULL_NAME'] == 'Darwin Charles R.')
    if is_template_name.any():
        logging.warning('suspect template contributor was not removed')
    
    is_dup_name = df['FULL_NAME'].duplicated()
    if is_dup_name.any():
        logging.error('duplicated names {}'.format(
                df.loc[is_dup_name, 'FULL_NAME'].to_list()))
    
    # TODO update template with underscore in email column
    is_valid_email = df['EMAIL ADDRESS'].str.match('^[A-z0-9._%+-]+@[A-z0-9.-]+\.[A-z]{2,}$')
    if not is_valid_email.all():
        logging.error('invalid email addresses {}'.format(
                df.loc[~is_valid_email, 'EMAIL ADDRESS'].to_list()))
    
    is_confirmed = (df['CONFIRMATION'] == 'YES')
    if not is_confirmed.any():
        logging.error('confirmation lacking for any contributors')
        
    return df
        
# contrib_df = validate_contributors(anospp_fn)

In [None]:
def validate_plates_wells(df, contrib_df, plate_col='RACK_OR_PLATE_ID', well_col='TUBE_OR_WELL_ID', bioscan=False):
    
    # expect only complete 96-well plates
    logging.info(f'validating {plate_col} and {well_col}')
    
    empty_rows = (df[plate_col] == '') | (df[well_col] == '')
    if empty_rows.any():
        logging.warning(f'Found and excluded {empty_rows.sum()} empty rows based on {plate_col} and {well_col}')
        df = df.loc[~empty_rows]
        
    # plate names validation
    plates = df[plate_col].drop_duplicates()
    plate_name_template = ('^[A-Z]{4}-[0-9]{3}$' if bioscan else '^[A-Z]{4}_[0-9]{3}$')
    wrong_plate_names = plates[~plates.str.match(plate_name_template, na='')]
    if len(wrong_plate_names) > 0:
        logging.error(f'plate names {wrong_plate_names.to_list()} do not match template "{plate_name_template}"')
    # check plate name prefixes prefixes
    plate_prefixes = plates.str.slice(0,4)
    if bioscan:
        # todo read partners table to feed GAL
        partners_df = pd.read_csv('../data/bioscan_partners.tsv', sep='\t', dtype=str)
        unknown_prefixes = (~plate_prefixes.isin(partners_df['partner_code']))
    else:
        unknown_prefixes = (~plate_prefixes.isin(contrib_df['PARTNER_CODE']))
    if unknown_prefixes.any():
        logging.error(f'plate ID prefixes not recognised for {plates[unknown_prefixes].to_list()}')
        
    # add 96-well plate well IDs to validation
    row_id = list('ABCDEFGH')
    col_id = range(1,13)
    expected_wells = [r + str(c) for (r,c) in itertools.product(row_id, col_id)]
    
    pdfs = []
    
    for plate, pdf in df.groupby(plate_col):
        # If plate level only metadata is being entered
        # put “PLATE_ONLY” in well 
        # and use only one row to capture the metadata for the whole plate
        if bioscan and (pdf[well_col] == 'PLATE_ONLY').any():
            logging.warning(f'found PLATE_ONLY plate {plate}, expanding to well-level')
            if pdf.shape[0] > 1:
                logging.error(f'too many rows in PLATE_ONLY plate {plate}, expected one')
            # expand to 96 rows
            pdf = pd.DataFrame(pdf.iloc[0] for i in range(len(expected_wells)))
            pdf[well_col] = expected_wells
            # H12 blank
            for col in pdf.columns:
                if col == 'ORGANISM_PART':
                    pdf[col].iloc[-1]='NOT_APPLICABLE'
                elif col not in ['SERIES','CATCH_LOT','RACK_OR_PLATE_ID',
                                 'TUBE_OR_WELL_ID','ORGANISM_PART','PRESERVATIVE_SOLUTION']:
                    pdf[col].iloc[-1]=''
            #print(pdf.iloc[-3:,:3])
        # check for well duplicates
        dup_wells =  pdf[well_col].duplicated()
        if dup_wells.any():
            logging.error(f'duplicate {well_col} for plate {plate}: {pdf.loc[dup_wells, well_col].unique()}')
        # check for non A1...H12 wells
        observed_wells_set = set(pdf[well_col])
        expected_wells_set = set(expected_wells)
        if observed_wells_set != expected_wells_set:
            msg = f'in {well_col} for plate {plate}, '
            if len(expected_wells_set - observed_wells_set) > 0:
                msg += f'wells {expected_wells_set - observed_wells_set} are missing'
            if len(observed_wells_set - expected_wells_set) > 0:
                if msg.endswith('missing'):
                    msg += ', '
                msg += f'wells {observed_wells_set - expected_wells_set} are excessive'
            logging.error(msg)
        pdfs.append(pdf)
    
    if bioscan and (df[well_col] == 'PLATE_ONLY').any():
        df = pd.concat(pdfs).reset_index(drop=True)
        df.index.name = 'SERIES'
    logging.warning(f'found {df.shape[0]} samples across {df[plate_col].nunique()} plates')
    
    
    
    return df
        
# df = validate_plates_wells(df, contrib_df, 'RACK_OR_PLATE_ID', 'TUBE_OR_WELL_ID')

In [None]:
## TODO - which columns require NA, do not remove blanks to be able to get taxids for all
def check_blanks(df):
    
    logging.info('Checking and excluding blank samples')    
    
    # blank criterion
    is_blank = (df['ORGANISM_PART'] == 'NOT_APPLICABLE')
    
    blank_df = df[is_blank]
    
    non_blank_df = df[~is_blank]
    
    # last well of plate expected to be blank
    non_blank_last_well_df = non_blank_df[non_blank_df['TUBE_OR_WELL_ID'] == 'H12']
    if non_blank_last_well_df.shape[0] > 0:
        logging.error(f'last well H12 is not blank at SERIES {index_ranges(non_blank_last_well_df)}: '
                      f'in ORGANISM_PART, expected "NOT_APPLICABLE", '
                      f'found {non_blank_last_well_df.ORGANISM_PART.to_list()}, '
                      f'these samples will be included in further analysis',
                        
        )
    
    # raise to warning for sanity check
    logging.warning(f'{is_blank.sum()} blanks located across {df.RACK_OR_PLATE_ID.nunique()} plates, '
                    f'{non_blank_df.shape[0]} samples of {df.shape[0]} left for downstream analysis')
    
    
    return is_blank

# print(df.shape)
# is_blank = check_blanks(df)
# print(df[~is_blank].shape)

In [None]:
def validate_values(col, df, valid_dict, sep=None, na_values=[], level='e'):
    
    logging.info('validating values in column {!r}'.format(col))
    
    if col not in df.columns:
        logging.error('{!r} column not found in manifest'.format(col))
        return
    if col not in valid_dict.keys():
        logging.error('{!r} column not found in validation sheet'.format(col))
        return
    assert level in ('i','w','e'), '{!r} invalid logging level for validate_values'.format(level)
    
    series = df[col]
    series = exclude_missing(series, na_values)
    
    col_values = set(series.unique())
    # use separator to split values
    if sep:
        sep_col_values = list()
        for v in col_values:
            sep_col_values.extend([x.strip() for x in v.split(sep)])
        col_values = set(sep_col_values)
    
    valid_values = set(valid_dict[col])
    
    invalid_values = col_values - valid_values
    
    if len(invalid_values) > 0:
        if sep:
            invalid_value_series = index_ranges(series[series.str.contains('|'.join(invalid_values), regex=True)])
        else:
            invalid_value_series = index_ranges(series[series.isin(invalid_values)])
        msg = f'invalid values in {col}, SERIES {invalid_value_series}: {invalid_values}'
        if level == 'i':
            logging.info(msg)
        elif level == 'w':
            logging.warning(msg)
        elif level == 'e':
            logging.error(msg)
#     else:
#         logging.info('all values valid in {!r}'.format(col))
            
# validate_values('ORGANISM_PART', df, valid_dict, sep=" | ", na_values=[], level='e')

In [None]:
def validate_date(col, df, na_values=[]):
    
    logging.info('validating date column {!r}'.format(col))

    if col not in df.columns:
        logging.error('{!r} column not found in manifest'.format(col))
        return
    series = df[col]
    series = exclude_missing(series, na_values)
    
    # invalid date formats
    # empty string converted to NaT
    date_series = pd.to_datetime(series, format='%Y-%m-%d', errors='coerce')
    if date_series.isna().any():
        logging.error(f'invalid dates in {col}, SERIES {index_ranges(date_series[date_series.isna()])}: '
                      f'{series[date_series.isna()].unique()}')
    valid_date_series = date_series[~date_series.isna()]
    
    # dates in future
    future_dates = (valid_date_series > datetime.datetime.today())
    if future_dates.any():
        logging.error(f'future dates in {col}, SERIES {index_ranges(valid_date_series[future_dates])}: '
                      f'{valid_date_series[future_dates].to_list()}')
        
    # dates too old
    old_dates = (valid_date_series < datetime.datetime.strptime('1900-01-01', '%Y-%m-%d'))
    if old_dates.any():
        logging.error(f'pre-1900 dates in {col}, SERIES {index_ranges(valid_date_series[old_dates])}: '
                      f'{valid_date_series[old_dates].to_list()}')
    
    return valid_date_series

# validate_date('DATE_OF_COLLECTION', df, na_values=[])

In [None]:
def validate_time(col, df, na_values=[]):
    
    logging.info('validating time column {!r}'.format(col))
    
    if col not in df.columns:
        logging.error('{!r} column not found in manifest'.format(col))
        return
    series = df[col]
    series = exclude_missing(series, na_values)
        
    # invalid time formats
    # NB empty string converted to NaT
    time_series = pd.to_datetime(series, format='%H:%M:%S', errors='coerce')
    if time_series.isna().any():
        logging.error(
            f'invalid times in {col}, SERIES {index_ranges(time_series[time_series.isna()])}: '
            f'{series[time_series.isna()].unique()}'
        )
    valid_time_series = time_series[~time_series.isna()]
    
    return valid_time_series

# validate_time('TIME_OF_COLLECTION', df, na_values=[''])

In [None]:
def validate_time_period(col, df, na_values=[]):
    
    logging.info('validating time period column {!r}'.format(col))
    
    if col not in df.columns:
        logging.error('{!r} column not found in manifest'.format(col))
        return
    series = df[col]
    series = exclude_missing(series, na_values)

    # conversion with modifications for proper parsing 
    # by pd.Timedelta (does not accept missing data, e.g. 'PT1H')
    # note - will not work for weeks and months
    def convert_iso_duration(s):
        if s == np.nan:
            return np.nan
        if not s.startswith('P') or 'T' not in s:
            return np.nan
        # add days
        if s.startswith('PT'):
            s = s.replace('PT','P0DT')
        # add trailing minutes and seconds
        if s.endswith('H'):
            s += '0M0S'
        elif s.endswith('M'):
            s += '0S'
        try:
            return pd.Timedelta(s)
        except:
            return np.nan
    time_period_series = series.apply(convert_iso_duration)
    if time_period_series.isna().any():
        logging.error(
            f'invalid times in {col}, SERIES {index_ranges(time_period_series[time_period_series.isna()])}: '
            f'{series[time_period_series.isna()].unique()}'
        )
    valid_time_period_series = time_period_series[~time_period_series.isna()]
    return valid_time_period_series

# df.loc[1,'DURATION_OF_COLLECTION'] = 'PVT1H'
# validate_time_period('DURATION_OF_COLLECTION', df, na_values=['']);
# df['DURATION_OF_COLLECTION']

In [None]:
# TODO include tests
def validate_country_and_coordinates(df, fn, na_values=[], bioscan=False):
    
    logging.info('validating country with coordinates')
    
#     if bioscan:
    country_col, lat_col, lon_col = 'COUNTRY_OF_COLLECTION', 'DECIMAL_LATITUDE', 'DECIMAL_LONGITUDE'
#     else:
#         country_col, lat_col, lon_col = 'COLLECTION_COUNTRY', 'DECIMAL_LATITUDE', 'DECIMAL_LONGITUDE'

    try:
        loc_df_complete = df[[country_col, lat_col, lon_col]].copy()
    except:
        logging.error('One of {!r} {!r} {!r} columns not found in manifest'.format(country_col, lat_col, lon_col))
        return
    loc_df_isna = (loc_df_complete.isin(na_values)).all(axis=1)
    if loc_df_isna.any():
        logging.info('removing {} {!r} samples with missing data from coordinate analysis'.format(
                loc_df_isna.sum(), na_values))
    loc_df_complete = loc_df_complete[~loc_df_isna].copy()
    
    # coordinates in geopy format
    loc_df_complete['coord'] = loc_df_complete.apply(lambda x: '{}, {}'.format(
            x[lat_col], x[lon_col]), axis=1)
    
    # get location data for coordinates
    # use local copy of web query results for re-runs
    # this 
    loc_fn = fn+'_loc.pkl'
    if os.path.isfile(loc_fn):
        locations = pickle.load(open(loc_fn, "rb"))
    else:
        # web map server - openstreetmaps
        logging.info('querying coordinates')
        locator = Nominatim(user_agent='myGeocoder')
        rgeocode = RateLimiter(locator.reverse, min_delay_seconds=1)

        locations = dict()
        for c in loc_df_complete.coord.unique():
            # pre-fill with unknown country
            locations[c] = {'address':{'country':'UNKNOWN'}}
            # check coordniate correctness
            try:
                lat, lon = c.split(', ')
                lat, lon = float(lat), float(lon)
            except:
                unparsed_df = df[(df[lat_col] == str(lat)) & df[lon_col] == str(lon)]
                logging.error(
                    f'problem parsing coordinates {c} at SERIES {index_ranges(unparsed_df)}'
                )
                continue
            if abs(lat) > 90:
                logging.error(
                    f'invalid latitude {lat} at SERIES {index_ranges(df[df[lat_col] == str(lat)])}'
                    f', should be in [-90,90]')
                continue
            if abs(lon) > 180:
                logging.error(
                    f'invalid longitude {lon} at SERIES {index_ranges(df[df[lon_col] == str(lon)])}'
                    f', should be in [-180,180]')
                continue
            # web query
            location = rgeocode(c, language='en-gb')
            # rgeocode returns empty location outside of counries and in some other situations
            if location is not None:
                locations[c] = location.raw

        # save locations to file
        pickle.dump(locations, open(loc_fn, "wb"))
        
    # parse country from partner input
    loc_df_complete['partner_country'] = loc_df_complete[country_col].str.strip().str.upper()
    
    # extract countries from location data
    loc_countries = dict()
    for coord in locations.keys():
        
        lat, lon = coord.split(', ')
        coord_series = index_ranges(df.query(f'({lat_col} == "{lat}") & ({lon_col} == "{lon}")'))
                    
        coord_country = locations[coord]['address']['country'].upper()
        loc_countries[coord] = coord_country
        
        partner_countries = loc_df_complete.loc[loc_df_complete.coord == coord, 'partner_country']
        if partner_countries.nunique() > 1:
            logging.error(
                f'multiple partner countries for coordinates {coord}, SERIES {coord_series}: '
                f'{partner_countries.unique()}, skipping coordinate validation'
            )
            continue
        if partner_countries.shape[0] == 0:
            logging.error(f'no partner location found for coordinates {coord}, SERIES {coord_series}')
            continue
        partner_country = partner_countries.iloc[0]
        if coord_country == 'UNKNOWN':
            logging.warning(f'could not locate country for coordinates {coord}, '
                            f'partner country {partner_country}, SERIES {coord_series}')
        elif partner_country != coord_country:
            logging.error(f'country mismatch for coordinates {coord}, partner country {partner_country}, '
                          'coordinate country {coord_country}, SERIES {coord_series}')
    
    # countries based on coordinates
    loc_df_complete['coord_country'] = loc_df_complete['coord'].replace(loc_countries)
    country_mismatch = (loc_df_complete.coord_country != loc_df_complete.partner_country)

#     if country_mismatch.any():
#         logging.error('coordinates do not match country for SERIES: {}'.format(
#                 country_mismatch[country_mismatch].index.to_list()))
    
    # location data can be re-used, e.g. as an additional field
    return loc_df_complete
# df.loc[2,'DECIMAL_LATITUDE'] = '65'
# loc_test = validate_country_and_coordinates(df, anospp_fn)
# loc_test

In [None]:
# TODO hierarchy tests in BIOSCAN
def validate_taxonomy(df, ncbi, na_values = [], anospp=False, add_taxids=False):

    logging.info('validating taxonomy against NCBI')
    
    if anospp:
        df['PREDICTED_ORDER_OR_GROUP'] = 'Diptera'
        df['PREDICTED_FAMILY'] = 'Culicidae'
        df['PREDICTED_GENUS'] = 'Anopheles'
        
        harbach_spp = []
        with open('../data/harbach_spp_201910.txt') as f:
            for line in f:
                harbach_spp.append('Anopheles ' + line.strip())
        harbach_spp = set(harbach_spp)
        
    tax_levels = {
        'PREDICTED_ORDER_OR_GROUP':'order',
        'PREDICTED_FAMILY':'family',
        'PREDICTED_GENUS':'genus',
        'PREDICTED_SCIENTIFIC_NAME':'species'
    }
    
    hierarchies = df[tax_levels.keys()].drop_duplicates().copy()
    
    hierarchies.columns = list(tax_levels.values())
        
    tax_info = dict()
    
    for tax_col, tax_level in tax_levels.items():
        
        logging.info(f'validating {tax_col} against NCBI')
        
        if tax_col not in df.columns:
                logging.error(f'{tax_col} column not found in manifest')
                continue
            
        tax_names = list(hierarchies[tax_level].unique())
        
        for na_value in na_values:
            try:
                tax_names.remove(na_value)
            except:
                pass 
            
        for i, tax_name in enumerate(tax_names):
            if len(tax_name) == 0:
                continue
            corr_tax_name = tax_name[0].upper() + tax_name[1:].lower()
            if corr_tax_name != tax_name and tax_name != 'blank sample':
                s = index_ranges(df.query(f'{tax_col} == {tax_name}'))
                logging.error(f'{tax_level}, SERIES {s}'
                              f': unexpected case for "{tax_name}", '
                              f'changing to "{corr_tax_name}" for validation')
            tax_names[i] = corr_tax_name
        
        tax_info[tax_level] = ncbi.get_name_translator(tax_names) 
        
        unmatched_names = set(tax_names) - set(tax_info[tax_level].keys())
        if len(unmatched_names) > 0:
            if tax_level == 'species' and anospp:
                for sp in unmatched_names:
                    s = index_ranges(df.query(f'{tax_col} == "{sp}"'))
                    if sp in harbach_spp:
                        logging.warning(f'{tax_level}, SERIES {s}:'
                                        f' "{sp}" found in Harbach list, but not in NCBI Taxonomy')
                    else:
                        logging.error(f'{tax_level}, SERIES {s}'
                                      f': "{sp}" not found in both Harbach list and NCBI Taxonomy')
            else:
                logging.error(f'{tax_level}: {unmatched_names} not found in NCBI Taxonomy')
        
        expected_rank = tax_level
        
        for tname, tids in tax_info[tax_level].items():
            
            ranks = ncbi.get_rank(tids)
            
            upd_tid = tids[0]
            
            if len(tids) == 1:
                if ranks[upd_tid] != expected_rank: 
                    # TODO warning->info for ORDER
                    logging.warning(f'{tax_level}: found unexpected rank for {tname} (taxid {upd_tid}): {ranks[upd_tid]}')
            if len(tids) > 1:            
                for tid, r in ranks.items():
                    if r == expected_rank and len(tids) > 1:
                        logging.info(f'{tax_level}: using only first matching rank for {tname} (taxid {tid}): {r}')
                        upd_tid = tid
                        break
                else:
                    logging.warning(f'{tax_level}: could not find matching rank for {tname}, '
                                    f'using (taxid {upd_tid}): {ranks[upd_tid]}')
                    
            tax_info[tax_level][tname] = upd_tid
        
        #logging.info(f'{tax_level} {tax_info[tax_level]}')
                    
    # check consistency of taxonomy
    for _, r in hierarchies.iterrows():
        
        if r.order in na_values:
            continue
        try:
            order_id = tax_info['order'][r.order]
        except KeyError:
            logging.info(f'cannot validate PREDICTED_ORDER_OR_GROUP for "{r.order}", skipping taxonomy consistency check')
            continue
            
        if r.genus in na_values:
            continue
        try:
            family_id = tax_info['family'][r.family]
            
            family_lineage = ncbi.get_lineage(family_id)
            
            s = index_ranges(df.query(f'PREDICTED_FAMILY == "{r.family}"'))
            
            if order_id not in family_lineage:
                logging.error(f'SERIES {s}: '
                              f'family {r.family} (taxid {family_id}) does not belong to {r.order} (taxid {order_id})')
        except KeyError:
            logging.info(f'cannot validate PREDICTED_FAMILY for "{r.family}", skipping taxonomy consistency check')
            continue
            
        if r.genus in na_values:
            continue
        try:
            genus_id = tax_info['genus'][r.genus]
            
            genus_lineage = ncbi.get_lineage(genus_id)
            
            s = index_ranges(df.query(f'PREDICTED_GENUS == "{r.genus}"'))
            
            if order_id not in genus_lineage:
                logging.error(
                    f'SERIES {s}: '
                    f'genus {r.genus} (taxid {genus_id}) does not belong to {r.order} (taxid {order_id})')
            if family_id not in genus_lineage:
                logging.error(
                    f'SERIES {s}: '
                    f'genus {r.genus} (taxid {genus_id}) does not belong to {r.family} (taxid {family_id})')
        except KeyError:
            logging.info(f'cannot validate PREDICTED_GENUS for "{r.genus}", skipping taxonomy consistency check')
            continue
            
        if r.species in na_values:
            continue
        try:
            species_id = tax_info['species'][r.species]
            
            species_lineage = ncbi.get_lineage(species_id)
            
            s = index_ranges(df.query(f'PREDICTED_SCIENTIFIC_NAME == "{r.species}"'))
            
            if order_id not in species_lineage:
                logging.error(
                    f'SERIES {s}: '
                    f'species {r.species} (taxid {species_id}) does not belong to {r.order} (taxid {order_id})')
            if family_id not in species_lineage:
                logging.error(
                    f'SERIES {s}: '
                    f'species {r.species} (taxid {species_id}) does not belong to {r.family} (taxid {family_id})')
            if genus_id not in species_lineage:
                logging.error(
                    f'SERIES {s}: '
                    f'species {r.species} (taxid {species_id}) does not belong to {r.family} (taxid {genus_id})')
        except KeyError:
            logging.info(f'cannot validate PREDICTED_SCIENTIFIC_NAME for "{r.species}", skipping taxonomy consistency check')
            continue
    
    if add_taxids:
        for tc in tax_levels.keys():
            df[f'{tc}_TAXID'] = df[tc].replace(tax_info[tax_levels[tc]])
            
    return df
        
                
# validate_taxonomy(df, ncbi, anospp=True)

In [None]:
def validate_specimen_id_risk(df):
    
    logging.info(f'validating SPECIMEN_IDENTITY_RISK')
    
    if 'SPECIMEN_IDENTITY_RISK' not in df.columns:
        logging.error(f'SPECIMEN_IDENTITY_RISK column not found in manifest')
        return
    
    # missing species name, but no idenitity risk
    invalid_risk = ((df.PREDICTED_SCIENTIFIC_NAME == '') & (df.SPECIMEN_IDENTITY_RISK == 'N'))
    
    if invalid_risk.any():
        logging.error(f'SPECIMEN_IDENTITY_RISK should be Y in SERIES {index_ranges(df.loc[invalid_risk])}')

# validate_specimen_id_risk(df)

In [None]:
def validate_float(col, df, na_values=[]):
    
    logging.info(f'validating numeric format in {col}')
    
    if col not in df.columns:
        logging.error(f'{col} column not found in manifest')
        return
    series = df[col]
    series = exclude_missing(series, na_values)
    
    for val in series.unique():
        try:
            float(val)
        except:
            s = index_ranges(df.query(f'{col} == "{val}"'))
            logging.error(
                f'SERIES {s}: '
                f'found non-numeric value in {col}: "{val}"')
            
            
# validate_float('ELEVATION', df, na_values=[''])

In [None]:
def validate_freetext(col, df, na_values=['']):
    
    logging.info(f'validating freetext chars in {col}')
    
    if col not in df.columns:
        logging.error(f'{col} column not found in manifest')
        return
    series = df[col]
    series = exclude_missing(series, na_values)
    
    regex = '^[A-z0-9.,\-_ ]+$'
    
    is_valid_freetext = series.str.match(regex)
    if not is_valid_freetext.all():
        logging.info('found non-standard characters in column {}, SERIES {}. Regex: "{}"'.format(
        col, index_ranges(series.loc[~is_valid_freetext]), regex))

        
# validate_freetext('IDENTIFIED_HOW', df)

In [None]:
# bd = validate_date('DATE_OF_COLLECTION', df)
# ad = validate_date('DATE_OF_PRESERVATION', df)

In [None]:
def compare_dates(before, after):
        
    logging.info(f'checking that {before.name} are earlier than {after.name}')

    ctdf = pd.concat([before, after], axis=1)
    date_conflict = ctdf[before.name] > ctdf[after.name]
    
    if date_conflict.any():
        logging.error(f'{before.name} values are later than {after.name} for SERIES'
                      f' {index_ranges(ctdf[date_conflict])}')

        
# compare_dates(bd, ad)

In [None]:
def add_sts_cols(df, contrib_df, gal, bioscan=True):
    
    is_blank = (df['ORGANISM_PART'] == 'NOT_APPLICABLE')
    
    df['SPECIMEN_ID'] = df['RACK_OR_PLATE_ID'] + '_' + df['TUBE_OR_WELL_ID']
    if df['SPECIMEN_ID'].duplicated().any():
        logging.error('duplicate SPECIMEN_ID: {}'.format(df.loc[df.SPECIMEN_ID.duplicated(), 'SPECIMEN_ID']))
    df['SCIENTIFIC_NAME'] = 'unidentified'
    df.loc[is_blank, 'SCIENTIFIC_NAME'] = 'blank sample'
    df['TAXON_ID'] = '32644'
    df.loc[is_blank, 'TAXON_ID'] = '2582415'
    df['GAL'] = gal
    df['SYMBIONT'] = 'TARGET'
    df['REGULATORY_COMPLIANCE'] = 'Y'
    df['HAZARD_GROUP'] = 'HG1'
    # add contributors - delimiters checked in validate_contributors
    contrib_series = contrib_df['FULL_NAME'] + ';' + \
        contrib_df['PRIMARY_AFFILIATION'] + ';' + \
        contrib_df['EMAIL ADDRESS'] + ';' + \
        contrib_df['CONTRIBUTION']
    df['CONTRIBUTORS'] = '|'.join(list(contrib_series))
    
    return df

# add_sts_cols(df, contrib_df, gal='Sanger Institute');

In [None]:
def write_sts_manifest(df, input_fn, validation_version):
    
    output_fn = input_fn.rstrip('.xlsx') + '_' + validation_version + '_for_sts.xlsx'
    
    logging.info(f'writing STS manifest to {output_fn}')
    
    df.to_excel(output_fn, sheet_name='Metadata Entry')
    