# Pipeline: DDSO Discharge Facilities

This notebook accomplishes the following:

1. Load the data from the `gov.nys.serviceproviders.xlsx#ddso_discharge_facilities` worksheet into memory using `pandas`.

2. Tidy the fields to prepare it for the schema described in [`settings.toml`]("./../../settings.toml") file.

3. Ingest the data to the `providers` database, underneath the `services` collection.


In [1]:
# Import system path.
import sys, os
import pandas as pd
import numpy as np

# Add module path to the kernel PATH if it isn't currently added.
def add_module_path(module_path):
    if module_path not in sys.path:
        sys.path.append(module_path)
                
# Add directories to MODULE_PATH
root = os.path.abspath(os.path.join('..\\..\\'))
add_module_path(root)
add_module_path(root + "\\data")
add_module_path(root + "\\utils")
add_module_path(root + "\\models")
add_module_path(root + "\\connection")

In [2]:
# Import utilities.
from bunch import Bunch, bunchify, unbunchify
from box import Box
from utils.finder import *
from pandas import DataFrame, Series
from matplotlib import pyplot as plt
from IPython.display import display
from typing import *

# Import configuration settings.
from config import settings

In [3]:
# Helper methods for extracting named child arrays from Box object parents:

def extract(obj, key, keyMapper = lambda x: x[0].name, valueMapper = lambda x: x[0]):
    extraction = {}
    idx = 0
    for item in obj[key]:
        extraction[keyMapper((item, idx))] = valueMapper((item, idx))
        idx += 1
    return Box(extraction)

def get_databases(root):
    return extract(root, 'databases', valueMapper = lambda db: get_collection(db[0]))

def get_collection(db):    
    return extract(db, 'collections', valueMapper = lambda coll: Box(schema=get_schema(coll[0]), sources=get_sources(coll[0])))

def get_sources(collection):
    return extract(collection, 'sources', keyMapper = lambda src: src[1], valueMapper = lambda src: { 'path': src[0].name, 'type': src[0].type, **get_worksheets(src[0]) })

def get_worksheets(source):
    if(source['type'] == "excel"):
        return Box(worksheets=source.worksheets)
    return {}

def get_schema(collection):
    return extract(collection, 'schema', keyMapper = lambda x: x[0]['field'])

def get_fields(schema):
    try:
        return [entry.field for entry in schema]
    except:
        return [*schema.keys()]

In [4]:
def format_source(source, idx = None):
    idx = '' if idx is None else f'[{str(idx)}]:'
    try:
        return f"{idx}\"{source.path}\"\n{source.worksheets}\n"
    except:
        return f"{idx}\"{source.path}\"\n"
    
def format_sources(sources):
    return ','.join([format_source(v, k) for k, v in sources.items()])

## Read in the Data

Load the data from the `gov.nys.serviceproviders.xlsx#ddso_service_providers` worksheet into memory using `pandas`.

1. Grab the `database`, `collection`, and `source` information from the `settings.toml` file. This is already loaded in via the settings object from the `from config import settings` line.

2. Load the data from the file into a `pandas.DataFrame` object.


In [5]:
# Read in the metadata.
db = get_databases(settings.db).providers
services = db.services
sources = services.sources
fields = get_fields(services.schema)
fields, sources

(['_id',
  'facility',
  'keywords',
  'category.disability',
  'category.service',
  'info.phone',
  'info.fax',
  'info.website.url',
  'info.website.subdomain',
  'info.website.hostname',
  'info.website.domain',
  'info.addressee',
  'address.location',
  'address.street.line1',
  'address.street.line2',
  'address.coordinates.latitude',
  'address.coordinates.longitude',
  'address.city',
  'address.county',
  'address.state',
  'address.zipcode'],
 <Box: {0: {'path': 'gov.ca.serviceproviders.xlsx', 'type': 'excel', 'worksheets': [{'name': 'dss_service_providers', 'fields': 'A:N'}]}, 1: {'path': 'gov.ca.serviceproviders.xlsx', 'type': 'excel', 'worksheets': [{'name': 'dss_service_providers', 'fields': 'A:N'}]}, 2: {'path': 'gov.nys.serviceproviders.xlsx', 'type': 'excel', 'worksheets': [{'name': 'ddso_service_providers', 'fields': 'A:V'}, {'name': 'ddso_discharge_facilities', 'fields': 'A:F'}, {'name': 'ofa_service_providers', 'fields': 'A:J'}]}, 3: {'path': 'org.handson.ohio.serv

In [6]:
# Get the source for the ddso_service_providers.
print(format_sources(sources))

[0]:"gov.ca.serviceproviders.xlsx"
[{'name': 'dss_service_providers', 'fields': 'A:N'}]
,[1]:"gov.ca.serviceproviders.xlsx"
[{'name': 'dss_service_providers', 'fields': 'A:N'}]
,[2]:"gov.nys.serviceproviders.xlsx"
[{'name': 'ddso_service_providers', 'fields': 'A:V'}, {'name': 'ddso_discharge_facilities', 'fields': 'A:F'}, {'name': 'ofa_service_providers', 'fields': 'A:J'}]
,[3]:"org.handson.ohio.serviceproviders.json"
,[4]:"irlu-CA-results.html"
,[5]:"irlu-NY-results.html"
,[6]:"irlu-OH-results.html"



In [7]:
# Load excel file based on input source information, into a dataframe
def load_excel(source, worksheet, dirpath='', names=None, **kwargs):
    path = source.path
    sheet = worksheet.name
    cols = worksheet.fields
    df = pd.read_excel(dirpath + path, sheet_name=sheet, usecols=cols, **kwargs)
    oldnames = df.columns
    if names:
        df.columns = names
    return df, list(oldnames.values)

In [8]:
source = sources[2]
worksheet = source.worksheets[1]
source.path, worksheet

('gov.nys.serviceproviders.xlsx',
 <Box: {'name': 'ddso_discharge_facilities', 'fields': 'A:F'}>)

In [22]:
df, columns = load_excel(source, worksheet, dirpath="./../../" + settings.datadir, names=['facility', 'address.county', 'info.addressee', 'address.location', 'info.phone', 'info.fax'])
df

Unnamed: 0,facility,address.county,info.addressee,address.location,info.phone,info.fax
0,NYS OPWDD\nCentral Office,Albany,Martha Schunk,Policy Analysis\n44 Holland Avenue\nAlbany NY ...,(518) 473-5436,(518) 473-0054
1,Bernard Fineson DDSO,Queens,"Shiny Sajan, R.N.","Bernard Fineson Dev. CenterHillside Complex, B...",(718) 217-5767,(718) 217-5022
2,Brooklyn DDSO,Kings,"Ellen Marshall, R.N.","888 Fountain Avenue\nBldg. 1, Wing 121\nBrookl...",(718) 642-8644,(718) 642-8683
3,Broome DDSO,Tioga,Constance Alford,Training & Leadership Development\nBroome Deve...,"(607) 770-0402, ext. 419",(607) 770-0392
4,Capital District DDSO,Washington,Kevin Utz-Meagher\n(Back-up: Gerard Ferrara),3 Care Lane\nSaratoga Springs NY 12866,(518) 743-0158 ext. 113,(518) 583-1308
...,...,...,...,...,...,...
69,Broome DDSO,Broome,Constance Alford,Training & Leadership Development\nBroome Deve...,"(607) 770-0402, ext. 419",(607) 770-0392
70,Broome DDSO,Chenango,Constance Alford,Training & Leadership Development\nBroome Deve...,"(607) 770-0402, ext. 419",(607) 770-0392
71,Broome DDSO,Delaware,Constance Alford,Training & Leadership Development\nBroome Deve...,"(607) 770-0402, ext. 419",(607) 770-0392
72,Broome DDSO,Otsego,Constance Alford,Training & Leadership Development\nBroome Deve...,"(607) 770-0402, ext. 419",(607) 770-0392


In [17]:
columns = [ { 'id': i, 'name': df.columns[i] } for i in range(6)]
columns

[{'id': 0, 'name': 'facility'},
 {'id': 1, 'name': 'counties'},
 {'id': 2, 'name': 'info.addressee'},
 {'id': 3, 'name': 'address.location'},
 {'id': 4, 'name': 'info.phone'},
 {'id': 5, 'name': 'info.fax'}]

## Tidy the Data

This is where we wrangle the data. We need to:

1. Impute missing data.
2. Extract additional features.
3. Reduce into final schema format.

In [23]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 74 entries, 0 to 73
Data columns (total 6 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   facility          63 non-null     object
 1   address.county    63 non-null     object
 2   info.addressee    63 non-null     object
 3   address.location  63 non-null     object
 4   info.phone        63 non-null     object
 5   info.fax          63 non-null     object
dtypes: object(6)
memory usage: 3.6+ KB


In [24]:
# Import the geocoder service wrapper.  (https://geopy.readthedocs.io/en/stable/#accessing-geocoders)
from tqdm.notebook import trange, tqdm
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from geopy.location import Location
from geopy.point import Point
from functools import partial

# Get the application name.
def application_name():
    return f"{settings.app}@{settings.app.version}"

# Initalize the geocoder instance.
def get_geocoder_instance():    
    # Initialize reference.
    gc = None
    try:
        if settings.geocoder.instance:
            gc = settings.geocoder.instance
        else:
            raise ValueError()
    except:
        settings.geocoder.instance = Nominatim(user_agent=application_name())
        gc = settings.geocoder.instance
    return gc
        
def get_reverse():
    return RateLimiter(partial(get_geocoder_instance().reverse), min_delay_seconds=1)
        
def get_geocode():    
    return RateLimiter(partial(get_geocoder_instance().geocode, addressdetails=True), min_delay_seconds=settings.geocoder.min_delay_seconds)

# Encode the query key.
def encode_query(query):
    try:
        return query.to_json()
    except:
        return str(query)

In [25]:
# Get key.
def get_key(query):
    if query.get('point', None):
        query = query['point']
        return encode_query(query)
    else:
        return encode_query(Box(**query))

# Request a specific location.
def request_location(query, geocode, otherKeys=set(), tr=None, dryrun=None):
    
    # Load local settings.
    tr = tr
    dryrun = dryrun
    keys = otherKeys

    if not settings.get(locations, None):
        settings.locations = Box({})
    
    # Inner logger.
    def log(msg, t=None):
        if msg and settings.verbose:
            print(msg)
        if msg and t:
            t.set_description(msg)
            t.update()
            
    def get_memo(key):
        # First come, first serve.        
        memo = settings.locations.get(key, None)
        if memo:
            set_memo(keys, memo)
            return memo
        return memo

    def set_memo(keys, memo):
        try:
            for key in keys:
                if memo:
                    settings.locations[key] = memo
                else:
                    settings.locations[key] = None
            return memo
        except:
            if key:
                settings.locations[key] = memo
        return memo
    
    # If no query, return.
    if not query:
        log("No query provided. Cannot search for location.")
        return None
        
    # If query present, setup to run.
    else:
        
        # Get the newest key.
        newkey = get_key(query)
        
        # Add key to the collection. 
        keys.add(newkey)  # Duplicates ignored due to set structure.
        
        # Loop for each key provided.
        for key in keys:
            
            # Get memo for current key.
            loc = get_memo(key)
                
            # If memoized value was found:
            if loc:
                log(f"Cached location found for query... {key} -> {loc}")
                return set_memo(keys, loc)
            # If no memoized value found, memoize result of search.
            else:
                if not dryrun:
                    try:
                        log(f"Calling geocoder service for new, unique query... {key}")
                        loc = geocode(query)
                        if loc:
                            log(f"SUCCESS: Found location for query! {key} -> {loc}")
                            return set_memo(keys, loc)
                        else:
                            log(f"ERROR: No location for query found. {key}")
                            continue # Try next key, if one exists.
                    except (RuntimeError, TypeError, NameError) as e:
                        log(f"ERROR: No location for query found. {e}")
                        continue # Try next key, if one exists.
                else:
                    log(f"DRY RUN: No location found due to dryrun setting. {key}")
                    continue # Try next key, if one exists.
        # End of for loop.
    # Return None.
    return None

# https://geopy.readthedocs.io/en/v1/#usage-with-pandas
def request_locations(queries, geocode, dryrun=None):
    if queries:
        results = []
        with trange(len(queries)) as tr:
            for i in tr:
                
                # Used keys.
                keys = set()
                
                # Try full query.
                query = queries[i]
                attempt = request_location(query, geocode, otherKeys=keys, tr=tr, dryrun=dryrun)
                
                
                # Attempt 'city'.
                if not attempt:
                    
                    attempt = request_location({ 'city': query['city'] }, geocode, otherKeys=keys, tr=tr, dryrun=dryrun)
                    
                # Attempt 'postalcode'.
                if not attempt:
                    attempt = request_location({ 'postalcode': query['postalcode'] }, geocode, otherKeys=keys, tr=tr, dryrun=dryrun)
                    
                # Attempt 'county'.
                if not attempt:
                    attempt = request_location({ 'county': query['county'] }, geocode, otherKeys=keys, tr=tr, dryrun=dryrun)
                    
                # Attempt 'state'.
                if not attempt:
                    attempt = request_location({ 'state': query['state'] }, geocode, otherKeys=keys, tr=tr, dryrun=dryrun)

                # Attempt 'default' (New York).
                if not attempt:
                    point = settings.get('defaultLocation', Point(latitude=43.1561681, longitude=-75.8449946, altitude=0))
                    settings['defaultLocation'] = point
                    attempt = request_location({ 'point': point }, get_reverse(), otherKeys=keys, tr=tr, dryrun=False)
                    
                results.append(attempt)
        return results
    return None
    
# Map full address into its own field.
"""
request_locations([{
    'street': "3147 Tiemann Avenue",
    "city": "Bronx",
    "state": "NY"
}]*10, get_geocode())
"""

# Get the addresses for the rows with missing coordinates.
def make_query(row):
    query = {field: row[field] for field in row.index}
    return query

In [48]:
# Extract the location newlines.
def extract_components(row):
    # ['Policy Analysis', '44 Holland Avenue', 'Albany NY 12229']
    content = row.rsplit('\n', 2)
    content = [word.strip() for word in filter(None, content)]
    return content

# Extract the location information.
def extract_location(loc):
    components = extract_components(loc)
    if components:   
        if len(components) == 3:
            line2, line1, city_state_zipcode = components
            street = line1 + ' ' + line2
        else:
            street, city_state_zipcode = components
        city_state, zipcode = city_state_zipcode.rsplit(' ', 1)
        city, state = city_state.rsplit(' ', 1)
        content = (street, city, state, zipcode)
        return content
    return None

# Extract the latitude.
def extract_latitude(coords):
    lat = None
    try:
        lat, _ = coords.split(',')
        lat = lat.strip()
        lat = float(lat)
    except:
        lon = float("nan")
    lat = Box(latitude=lat)
    return lat

# Extract the longitude.
def extract_longitude(coords):
    lon = None
    try:
        _, lon = coords.split(',')
        lon = lon.strip()
        lon = float(lon)
    except:
        lon = float("nan")
    lon = Box(longitude=lon)
    return lon

# Extract the coordinates.
def extract_coordinates(loc):
    components = extract_location(loc)
    try:
        coords = re.sub('[({})]', '', components[4])
        lat = extract_latitude(coords)
        lon = extract_longitude(coords)
        coords = Box(**lat, **lon)
        return coords
    except:
        return Box()

In [33]:
df['address.location'].head()

0    Policy Analysis\n44 Holland Avenue\nAlbany NY ...
1    Bernard Fineson Dev. CenterHillside Complex, B...
2    888 Fountain Avenue\nBldg. 1, Wing 121\nBrookl...
3    Training & Leadership Development\nBroome Deve...
4               3 Care Lane\nSaratoga Springs NY 12866
Name: address.location, dtype: object

In [50]:
# Get address components and create the query.
locations = df['address.location']
locations

0     Policy Analysis\n44 Holland Avenue\nAlbany NY ...
1     Bernard Fineson Dev. CenterHillside Complex, B...
2     888 Fountain Avenue\nBldg. 1, Wing 121\nBrookl...
3     Training & Leadership Development\nBroome Deve...
4                3 Care Lane\nSaratoga Springs NY 12866
                            ...                        
69    Training & Leadership Development\nBroome Deve...
70    Training & Leadership Development\nBroome Deve...
71    Training & Leadership Development\nBroome Deve...
72    Training & Leadership Development\nBroome Deve...
73    Training & Leadership Development\nBroome Deve...
Name: address.location, Length: 74, dtype: object

## Ingestion

Ingest the data to the `providers` database, underneath the `services` collection.

First we need to convert our table into a properly structured dictionary object.


In [None]:
# Given a row in the database, construct an object.
record_cols = [
    'facility', 
    # 'keywords',
    # 'category.disability',
    # 'category.service',
    'info.phone',
    # 'info.fax',
    # 'info.website.url',
    # 'info.website.subdomain',
    # 'info.website.hostname',
    # 'info.website.domain',
    # 'info.addressee',
    # 'address.street.line1',
    # 'address.street.line2',
    # 'address.coordinates.longitude',
    # 'address.coordinates.latitude',
    # 'address.city',
    'address.county',
    'address.state',
    'address.zipcode',
]

# Convert to a dictionary object.
def boxed(row):    
    # Select only the rows we need.
    subselect = row[record_cols]
    subselect.rename(lambda x: x if x != 7 else 'info.phone', inplace=True)
    return Box(subselect.to_dict(), box_dots=True)

def to_record(row):
    b = boxed(row)
    payload = Box(box_dots=True)
        
    # Set the facility and keywords.
    payload.facility = b.facility
    payload.keywords = b.keywords
    
    # 'category.disability',
    # 'category.service',
    
    # Set the info.
    payload['info'] = {
        'phone': b['info.phone'],
        'website': {
            'url': b['info.website.url'],
            'subdomain': b['info.website.subdomain'],
            'hostname': b['info.website.hostname'],
            'domain': b['info.website.domain'],
        },
    }
    
    # Set the address.
    payload['address'] = {
        'street': {
            'line1': b['address.street.line1'],
            'line2': b['address.street.line2']
        },
        'coordinates': {
            'latitude': b['address.coordinates.latitude'], 
            'longitude': b['address.coordinates.longitude'],  
        },
        'city': b['address.city'],
        'county': b['address.county'],
        'state': b['address.state'],
        'zipcode': b['address.zipcode'],
    }
 
    return payload.to_dict()

# Convert to format for ingestion.
df_out = df[df.columns[:]].apply(lambda row: to_record(row), axis=1)
df_out

In [None]:
# Get the connector.
from connection.database import Database

# Initialize connection.
Database.initialize(Database.make_options(
    hostname=settings.db.auth.hostname,
    username=settings.db.auth.username,
    password=settings.db.auth.password,
    port=settings.db.auth.port,
))

In [None]:
Database.use('providers_test')

In [None]:
destination = list(db.keys())[0]
data = df_out.tolist()
data[0]

In [None]:
try:
    # Database.insert_many(destination, data)
except (BaseException) as e:
    print(e)

- Extract the coordinates from the location field (if present).
- Split coordinates into 'longitude' and 'latitude' features. 


- Determine disability category for row based on values.
- Determine service category based on rows.