## Normalizing hospital MRF schemas

### Big picture method -- CSV
We want to read in a file and approximate its schema. We'll start with CSV files since they're the easiest to work with.

Steps:
1. Download the first chunk of a csv file. We'll start with just a handful of rows
2. Take a guess at its delimiters and quotechars
3. Remove any junk information that may be at the top of the file
4. Figure out what the header is and what the rows are
5. Try to normalize the header columns

In [None]:
import polars as pl
import requests
import csv
import chardet

from io          import StringIO
from collections import Counter

In [700]:
headers = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:107.0) Gecko/20100101 Firefox/107.0',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
    'Accept-Language': 'en-US,en;q=0.5',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1',
    'Sec-Fetch-Dest': 'document',
    'Sec-Fetch-Mode': 'navigate',
    'Sec-Fetch-Site': 'cross-site',
    'Sec-GPC': '1',
    'If-Modified-Since': 'Wed, 07 Dec 2022 13:02:20 GMT',
    'If-None-Match': 'W/"63908edc-3879"',
}

In [265]:
csvs = (pl
        .read_csv(
            file = './paylesshealth/hospitals.csv', 
            infer_schema_length = 0)
        .filter(
            pl.col('cdm_url').str.contains('.csv')
        )['cdm_url'].to_list())

In [819]:
# Convenience -- will update to be true JSON eventually
JSON = dict

In [820]:
def csv_scan(url: str, n_rows: int = 10) -> JSON:
    """Download a CSV file and return a dictionary
    with its header and some sample rows.
    """
    
    # Read first n_rows rows as bytes
    data = b''
    with requests.get(url, stream = True, headers = headers) as r:
        g = r.iter_lines()
        for _ in range(n_rows):
            chunk = next(g)
            data += (chunk + b'\n')

    # Detect the file encoding
    enc = chardet.detect(data)['encoding']
    
    # Detect the CSV schema
    chunk = chunk.decode(enc)
    dialect = csv.Sniffer().sniff(chunk)
    delim = dialect.delimiter
    quotechar = dialect.quotechar
    
    # Create a file object to iterate through
    # and parse
    content = data.decode(enc)
    f = StringIO(content)
    reader = csv.reader(f, delimiter = delim, quotechar = quotechar)
    rows = [row for row in reader]
    
    # Identify which of these rows is the header
    header_idx = find_header_idx(rows)

    # and strip off the junk at the beginning
    # of the file
    header = rows[header_idx]
    rows = rows[header_idx+1:]
    
    json = {'header': header,
            'rows': rows}
        
    return json

In [1]:
def find_header_idx(rows: list) -> int:
    """Finds the most likely header row in a list of rows.
    Computes a score for each row by looking for keywords.
    The row with the highest score is marked as the header.
    """
    
    header_idx = 0
    header_score = 0
    header_strs = ['cpt', 'drg', 'price', 'desc', 'name', 'payer', 'charge',
                   'gross', 'discounted', 'procedure', 'revenue', 'billing',
                   'allowable', 'negotiated', 'max', 'inpatient', 'outpatient']
    
    for idx, row in enumerate(rows):
        rowstr = ''.join(row).lower()
        score = sum([s in rowstr for s in header_strs])
        if score > header_score:
            header_idx = idx
            header_score = score
            
    return header_idx

## Normalizing the headers

In [809]:
def normalize_cols(cols: list) -> list:
    "Map the header column names to normalized names"
    
    return [normalize_value(c) for c in cols]

In [2]:
def strfilter(s: str, yes: list, no: list = []) -> bool:
    """return True if any of the 'yes' values are in s
    AND if none of the 'no' values are in s.
    
    This is a heuristic for matching strings to their normalized
    versions.
    
    E.g. 'Cash Price' -> 'cash'
    E.g. 'Cash price calculation method' -> 'unknown'
    """
    
    s = s.lower()
    yes = [y.lower() for y in yes]
    no = [n.lower() for n in no]
    
    if (
        any([y in s for y in yes])
        and not any([n in s for n in no])
    ):
        return True
    return False

assert(strfilter('Alec', 'lec'))
assert(strfilter('Alec', 'lec', 'B'))
assert(strfilter('Cash_Calculation_Method', 'cash'))
assert(not strfilter('Cash_Calculation_Method', 'cash', 'method'))

In [811]:
def normalize_value(s: str) -> str:
    "Finds the best guess for normalizing a string s"
    
    if strfilter(s, ['gross', 'price'], ['cash', 'min', 'max', 'cpt', 'drg']):
        return 'gross'
    
    if strfilter(s, ['cash'], ['method', 'cpt']):
        return 'cash_price'
    
    if strfilter(s, ['cpt', 'drg', 'code'], ['desc', 'price']):
        return 'code'
    
    if strfilter(s, ['payer'], ['description', 'cash', 'code', 'cpt']):
        return 'payer'
    
    if strfilter(s, ['desc'], ['cash', 'min', 'max']):
        return 'description'
    
    else:
        return None

In [822]:
def validate_normalization(header: list) -> bool:
    """We should only have one value for each column. If not, 
    we made a mistake.
    """
    cts = Counter(header)
    
    try:
        cts.pop(None)
    except:
        pass
    
    if not cts:
        return False
    
    if max(cts.values()) == 1:
        return True
    
    return False

In [823]:
def pipeline(urls: list, n_rows: int) -> JSON:
    """Main data pipeline."""
    
    data = []
    
    for url in urls:
        
        try:
            j = csv_scan(url, n_rows)
        except UnicodeDecodeError:
            continue
            
        j['url'] = url
        j['header_normalized'] = normalize_cols(j['header'])
        j['validated'] = validate_normalization(j['header'])
        data.append(j)
        
    return data

In [817]:
data = pipeline(csvs[4:10], 5)