# Test Data Ingest into Elastic Search - Part 1

## Purpose

This bit of code is a test that pulls in a specific Machine Readable PUF link for Moda Health and fetches the Provider, Formulary and Plan data by following the links. This code is very specific to Moda Health in this singular case and is not really generalizeable. The purpose is to pull a little bit of data and put it into Elastic Search.

## Elastic Search

I put this together using a local instance of Elastic Search on my Mac, setup with Homebrew and running on port 9200. It could just as easily be a remote Elastic Search instance or an AWS ES instance.

## Python elasticsearch Module

You will need to install the `elasticsearch` module using `pip` or some other installation tool for Python. 

## Running the Sample

The code simply requests the date from Moda Health using the URL as hardcoded into the http request. This response in turn is a JSON object that looks like this:

    {
        "provider_urls": [
            "https://www.modahealth.com/cms-data/providers-AK.json",
            "https://www.modahealth.com/cms-data/providers-OR.json"
        ],
        "formulary_urls": [
            "https://www.modahealth.com/cms-data/drugs-AK.json",
            "https://www.modahealth.com/cms-data/drugs-OR.json"
        ],
        "plan_urls": [
            "https://www.modahealth.com/cms-data/plans-AK.json",
            "https://www.modahealth.com/cms-data/plans-OR.json"
        ]
    }

The code then process each of the `provider_urls`, `formulary_urls` and `plan_urls` into the Elastic search by following and requesting the indicated URL. Each response contains a JSON object with many documents. For example the provider data contains many provider documents (records).

## JSON Structure

An example for `plans` JSON looks like this:

    [
      {
        "plan_id_type": "HIOS-PLAN-ID",
        "plan_id": "73836AK0750001",
        "marketing_name": "Be Prosperous (Providence)",
        "summary_url": "https://www.modahealth.com/pdfs/plans/individual/Moda_Providence_BeProsperous_2016_AK.pdf",
        "marketing_url": "",
        "plan_contact": "jessica.wagner@modahealth.com",
        "network": [
          {
            "network_tier": "PREFERRED"
          }
        ],
        "formulary": [
          {
            "drug_tier": "SELECT",
            "mail_order": true,
            "cost_sharing": [
              {
                "pharmacy_type": "3-MONTH-IN-MAIL",
                "copay_amount": 90.00,
                "copay_opt": "NO-CHARGE",
                "coinsurance_rate": 0.00,
                "coinsurance_opt": null
              },
              {
                "pharmacy_type": "3-MONTH-IN-RETAIL",
                "copay_amount": 30.00,
                "copay_opt": "BEFORE-DEDUCTIBLE",
                "coinsurance_rate": 0.00,
                "coinsurance_opt": null
              },
              {
                "pharmacy_type": "3-MONTH-OUT-RETAIL",
                "copay_amount": 30.00,
                "copay_opt": "BEFORE-DEDUCTIBLE",
                "coinsurance_rate": 0.00,
                "coinsurance_opt": null
              }
            ]
          },
          {
            "drug_tier": "PREFERRED",
            "mail_order": true,
            "cost_sharing": [
              {
                "pharmacy_type": "3-MONTH-IN-MAIL",
                "copay_amount": 0.00,
                "copay_opt": "NO-CHARGE",
                "coinsurance_rate": 0.00,
                "coinsurance_opt": null
              },
              {
                "pharmacy_type": "3-MONTH-IN-RETAIL",
                "copay_amount": 0.00,
                "copay_opt": "NO-CHARGE",
                "coinsurance_rate": 0.35,
                "coinsurance_opt": null
              },
              {
                "pharmacy_type": "3-MONTH-OUT-RETAIL",
                "copay_amount": 0.00,
                "copay_opt": "NO-CHARGE",
                "coinsurance_rate": 0.35,
                "coinsurance_opt": null
              }
            ]
          },
          {
            "drug_tier": "MEDICAL-SERVICE-DRUGS",
            "mail_order": true,
            "cost_sharing": [
              {
                "pharmacy_type": "3-MONTH-IN-RETAIL",
                "copay_amount": 0.00,
                "copay_opt": "NO-CHARGE",
                "coinsurance_rate": 0.15,
                "coinsurance_opt": "AFTER-DEDUCTIBLE"
              }
            ]
          }
        ],
        "last_updated_on": "2016-04-05"
      },

## Elastic Search index mapping

Elasticsearch creates a mapping for the `plans` example as:

    {
      "ak": {
        "mappings": {
          "plan": {
            "properties": {
              "formulary": {
                "properties": {
                  "cost_sharing": {
                    "properties": {
                      "coinsurance_opt": {
                        "type": "string"
                      },
                      "coinsurance_rate": {
                        "type": "double"
                      },
                      "copay_amount": {
                        "type": "double"
                      },
                      "copay_opt": {
                        "type": "string"
                      },
                      "pharmacy_type": {
                        "type": "string"
                      }
                    }
                  },
                  "drug_tier": {
                    "type": "string"
                  },
                  "mail_order": {
                    "type": "boolean"
                  }
                }
              },
              "last_updated_on": {
                "type": "date",
                "format": "strict_date_optional_time||epoch_millis"
              },
              "marketing_name": {
                "type": "string"
              },
              "marketing_url": {
                "type": "string"
              },
              "network": {
                "properties": {
                  "network_tier": {
                    "type": "string"
                  }
                }
              },
              "plan_contact": {
                "type": "string"
              },
              "plan_id": {
                "type": "string"
              },
              "plan_id_type": {
                "type": "string"
              },
              "summary_url": {
                "type": "string"
              }
            }
          }
        }
      }
    }



There are some things to note in the processing code that's common to all three parts of data:

- The state is derived from the URL, but this is specific only to ModaHealth
- The entire content is loaded as a document into a `formulary` shard and indexed by the state
- The document that is indexed is given a type depending on whether it is `provider`,`formulary`, or `plan`

This is only a test index setup and may need to be refactored at some point.

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
import json
import requests
import re

In [2]:
es = Elasticsearch("http://localhost:9200")
ic = IndicesClient(es)


In [3]:
r = requests.get('http://localhost:9200')
if r.status_code == 200:
    # each insurer emits a different format so this will only work for modahealth
    r = requests.get('https://www.modahealth.com/cms-data-index.json')
    urls = json.loads(r.content)
#    provider
    for provider_url in urls['provider_urls']:
        print provider_url
        split_url = re.split('-',provider_url)
        state = (split_url[-1].split('.'))[0].lower()
        r = requests.get(provider_url)
        for item in json.loads(r.content):
            es.index(index=state, doc_type='provider', body=item)
#    formulary
    for formulary_url in urls['formulary_urls']:
        print formulary_url
        split_url = re.split('-',formulary_url)
        state = (split_url[-1].split('.'))[0].lower()
        r = requests.get(formulary_url)
        for item in json.loads(r.content):
            es.index(index=state, doc_type='formulary', body=item)
#    plan
    for plan_url in urls['plan_urls']:
        print plan_url
        split_url = re.split('-',plan_url)
        state = (split_url[-1].split('.'))[0].lower()
        r = requests.get(plan_url)
        for item in json.loads(r.content):
            es.index(index=state, doc_type='plan', body=item)

https://www.modahealth.com/cms-data/providers-AK.json
https://www.modahealth.com/cms-data/providers-OR.json
https://www.modahealth.com/cms-data/drugs-AK.json
https://www.modahealth.com/cms-data/drugs-OR.json
https://www.modahealth.com/cms-data/plans-AK.json
https://www.modahealth.com/cms-data/plans-OR.json


In [8]:
# This is for hacking around looking at JSON that we're loading
f = {}

r = requests.get('http://localhost:9200')
if r.status_code == 200:
    # each insurer emits a different format so this will only work for modahealth
    r = requests.get('https://www.modahealth.com/cms-data-index.json')
    urls = json.loads(r.content)

#    formulary
    for formulary_url in urls['formulary_urls']:
        print formulary_url
        split_url = re.split('-',formulary_url)
        state = (split_url[-1].split('.'))[0].lower()
        r = requests.get(formulary_url)
        f = json.loads(r.content)
        break


https://www.modahealth.com/cms-data/drugs-AK.json


In [14]:
f[1]

{u'drug_name': u'Melatonin 1 MG / pyridoxine 10 MG Oral Tablet',
 u'plans': [{u'drug_tier': u'BRAND',
   u'plan_id': u'73836AK0750002',
   u'plan_id_type': u'HIOS-PLAN-ID',
   u'prior_authorization': False,
   u'quantity_limit': False,
   u'step_therapy': False},
  {u'drug_tier': u'BRAND',
   u'plan_id': u'73836AK0750001',
   u'plan_id_type': u'HIOS-PLAN-ID',
   u'prior_authorization': False,
   u'quantity_limit': False,
   u'step_therapy': False},
  {u'drug_tier': u'BRAND',
   u'plan_id': u'73836AK0750003',
   u'plan_id_type': u'HIOS-PLAN-ID',
   u'prior_authorization': False,
   u'quantity_limit': False,
   u'step_therapy': False},
  {u'drug_tier': u'BRAND',
   u'plan_id': u'73836AK0750004',
   u'plan_id_type': u'HIOS-PLAN-ID',
   u'prior_authorization': False,
   u'quantity_limit': False,
   u'step_therapy': False},
  {u'drug_tier': u'BRAND',
   u'plan_id': u'73836AK0840001',
   u'plan_id_type': u'HIOS-PLAN-ID',
   u'prior_authorization': False,
   u'quantity_limit': False,
   u'ste

In [4]:
es.search(index="ak", body={"query": {"match": {'marketing_name':'Be Equipped'}}})

{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'AVUYxQmQPESeHxNEIPWE',
    u'_index': u'ak',
    u'_score': 6.8934927,
    u'_source': {u'formulary': [{u'cost_sharing': [{u'coinsurance_opt': None,
         u'coinsurance_rate': 0.0,
         u'copay_amount': 180.0,
         u'copay_opt': u'NO-CHARGE',
         u'pharmacy_type': u'3-MONTH-IN-MAIL'},
        {u'coinsurance_opt': None,
         u'coinsurance_rate': 0.0,
         u'copay_amount': 60.0,
         u'copay_opt': u'BEFORE-DEDUCTIBLE',
         u'pharmacy_type': u'3-MONTH-IN-RETAIL'},
        {u'coinsurance_opt': None,
         u'coinsurance_rate': 0.0,
         u'copay_amount': 60.0,
         u'copay_opt': u'BEFORE-DEDUCTIBLE',
         u'pharmacy_type': u'3-MONTH-OUT-RETAIL'}],
       u'drug_tier': u'SELECT',
       u'mail_order': True},
      {u'cost_sharing': [{u'coinsurance_opt': None,
         u'coinsurance_rate': 0.0,
         u'copay_amount': 0.0,
         u'copay_opt': u'NO-CHA

In [24]:
es.search(index="or", q='drug_name:"Melatonin"')

{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'AVUYxN3lPESeHxNEINTz',
    u'_index': u'or',
    u'_score': 4.3666186,
    u'_source': {u'drug_name': u'Melatonin 3 MG Oral Tablet',
     u'plans': [{u'drug_tier': u'SELECT',
       u'plan_id': u'39424OR1240001',
       u'plan_id_type': u'HIOS-PLAN-ID',
       u'prior_authorization': False,
       u'quantity_limit': False,
       u'step_therapy': False},
      {u'drug_tier': u'SELECT',
       u'plan_id': u'39424OR1260001',
       u'plan_id_type': u'HIOS-PLAN-ID',
       u'prior_authorization': False,
       u'quantity_limit': False,
       u'step_therapy': False},
      {u'drug_tier': u'SELECT',
       u'plan_id': u'39424OR1460001',
       u'plan_id_type': u'HIOS-PLAN-ID',
       u'prior_authorization': False,
       u'quantity_limit': False,
       u'step_therapy': False},
      {u'drug_tier': u'SELECT',
       u'plan_id': u'39424OR1460002',
       u'plan_id_type': u'HIOS-PLAN-ID',
       u'pri

In [25]:
es.search(index="or", q='drug_tier:"SELECT"')

{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [], u'max_score': None, u'total': 0},
 u'timed_out': False,
 u'took': 1}

In [29]:
matches = es.search('or', q="melatonin")
hits = matches['hits']['hits']
if not hits:
    print('No matches found')
else:
    for hit in hits:
        print hit['_source']['drug_name']


Melatonin 3 MG Oral Tablet
Melatonin 10 MG Sublingual Tablet
Melatonin 1 MG Oral Tablet
Melatonin 5 MG Oral Capsule
Melatonin 3 MG Extended Release Oral Tablet
Melatonin 10 MG Extended Release Oral Tablet
Melatonin 3 MG / Vitamin B6 1 MG Oral Tablet
Melatonin 0.5 MG Sublingual Tablet
Melatonin 10 MG Oral Capsule
Melatonin 5 MG Disintegrating Oral Tablet


In [32]:
matches = es.search('or', q="HIOS-PLAN-ID")
hits = matches['hits']['hits']
if not hits:
    print('No matches found')
else:
    for hit in hits:
        for plan in hit['_source']['plans']:
            print plan['plan_id']

39424OR1240001
39424OR1260001
39424OR1460001
39424OR1460002
39424OR1240002
39424OR1260002
39424OR1460003
39424OR1460004
39424OR1470001
39424OR1310001
39424OR1480001
39424OR1480002
39424OR1490001
39424OR1500001
39424OR1320001
39424OR1480003
39424OR1240001
39424OR1260001
39424OR1460001
39424OR1460002
39424OR1240002
39424OR1260002
39424OR1460003
39424OR1460004
39424OR1470001
39424OR1310001
39424OR1480001
39424OR1480002
39424OR1480003
39424OR1500001
39424OR1320001
39424OR1490001
39424OR1240001
39424OR1260001
39424OR1460001
39424OR1460002
39424OR1240002
39424OR1260002
39424OR1460003
39424OR1460004
39424OR1470001
39424OR1310001
39424OR1480001
39424OR1480002
39424OR1480003
39424OR1500001
39424OR1320001
39424OR1490001
39424OR1240001
39424OR1260001
39424OR1460001
39424OR1460002
39424OR1240002
39424OR1260002
39424OR1460003
39424OR1460004
39424OR1470001
39424OR1310001
39424OR1480001
39424OR1480002
39424OR1480003
39424OR1490001
39424OR1500001
39424OR1320001
39424OR1240001
39424OR1260001
39424OR146

# Test Data Ingest into Elastic Search - Part 2

## Reading the Machine Readable URL file

The machine readable PUF is a .zip compressed XLSX file. The OpenPyXL module is able to read an XLSX file. 

However it's easier to export the XLSX file as a CSV file, so I've done this using ASCII encoding as the output encoding.

This code has evolved to change the ElasticSearch index so that there is only one index with the three document types. The reason for this is that the original scheme broke things up by state, but that's not a natural break of the data - only providers have state attributes. Plans are related to providers and formularies so we leave the state as an attribute of the provider. The new index is called `data` in elasticsearch.

### Large JSON files

While working through the process of reading the data files I ran across JSON files that are very large, > 2GB. It's not feasible to load the HTTP response into memory, then load that into a JSON dict object to feed into elasticsearch. Instead the HTTP requests are streamed into the `ijson` module which works like a SAX parser, emitting events that allow us to pick the objects one at a time out of the stream. This means that memory is very low but processing is slower.

### Uniformity of the Data

I originally thought that the data would be difficult to deal with and require customization per insuror. That's true if we stick to the model where each state is an index. Otherwise the processing is quite uniform and straighforward.

### Making the Ingest More Robust

As we learn about and handle different scenarios processing the data, it doesn't make sense to start over every time. Instead we should keep track of what we've processed and write it to a file. That would allow picking back up where we left off.

In [46]:
import csv
import json
import requests
import re
from elasticsearch import Elasticsearch
from elasticsearch.client import IndicesClient
from elasticsearch_dsl import Search
from urllib2 import urlopen
from urlparse import urlparse
import ijson
from contextlib import closing

In [47]:
# process the provider url
def process_provider_url(provider_url, es):
    print "Processing {0}".format(provider_url)
    count = 0
    with closing(urlopen(provider_url)) as f:
        for provider in ijson.items(f, 'item'):
            if provider['type'] == 'INDIVIDUAL':
                es.index(index='data', doc_type='provider', body=provider)
            else:
                es.index(index='data', doc_type='facility', body=provider)
            count += 1
    return count

In [48]:
# process the formulary url
def process_formulary_url(formulary_url, es):
    print "Processing {0}".format(formulary_url)

    count = 0
    with closing(urlopen(formulary_url)) as f:
        # process each drug in the formulary
        for drug in ijson.items(f, 'item'):
            # check optional fields and add them if they are missing
            for plan in drug['plans']:
                if 'prior_authorization' not in plan:
                    plan['prior_authorization'] = 'false'
                if 'step_therapy' not in plan:
                    plan['step_therapy'] = 'false'
                if 'quantity_limit' not in plan:
                    plan['quanitity_limit'] = 'false'
            es.index(index='data', doc_type='drug', body=drug)
            count += 1
    return count

In [49]:
# process the plan url
def process_plan_url(plan_url, es):
    print "Processing {0}".format(plan_url)

    count = 0
    r = requests.get(plan_url)
    # process each plan        
    for plan in json.loads(r.content):
        es.index(index='data', doc_type='plan', body=plan)
        count += 1
    return count

In [50]:
def process_puf_url(puf_url, es):
    print "Processing {0}...".format(puf_url)
    response = requests.get(puf_url)

    links = json.loads(response.content)
    print ("\nProvider URLS:")
    print ("==================================")
    for provider_url in links['provider_urls']:
        print "Processed {0} providers".format(process_provider_url(provider_url, es))
        
    print ("\nFormulary URLS:")
    print ("==================================")    
    for formulary_url in links['formulary_urls']:
        print "Processed {0} drugs".format(process_formulary_url(formulary_url, es))
        
    print ("\nPlan URLS:")
    print ("==================================")        
    for plan_url in links['plan_urls']:
        print "Processed {0} plans".format(process_plan_url(plan_url, es))
    
    

In [51]:
def process_machine_readable_puf(csv_filename):
    urls_processed = []
    es = Elasticsearch("http://localhost:9200")
    ic = IndicesClient(es)
    with open(csv_filename, 'r') as urlfile:
        urls = csv.DictReader(urlfile)
        for row in urls:
            _url = row['URL Submitted']
            url_parseresult = urlparse(_url)
            
            # minimal check to make sure the url begins with scheme:// and is not empty
            if url_parseresult.scheme:
                if _url not in urls_processed:
                    process_puf_url(_url, es)
                    urls_processed.append(_url)

In [None]:
# check to see if the es instance is up
r = requests.get('http://localhost:9200')
if r.status_code == 200:
    process_machine_readable_puf('machine-readable-url-puf.csv')

It turns out that some of these URLs link to very large data sets, so it would be best to stream them to a file. Here I'm experimenting with the `requests` library to see how to do that. 

After experimenting, the requests module streaming doesn't provide a file like interface so that ijson can handle the stream and emit events that allow us to pull out the JSON objects without having to load everything in memory. So we end up using the urllib2 module instead.

This way we can stream directly into the objects and then put the objects as documents into Elastic Search.

In [21]:
from urllib2 import urlopen
import ijson
from contextlib import closing

with closing(urlopen('https://www.bestlife.com/exchange/providers_wNPI.json')) as f:
    count = 0
    for obj in ijson.items(f, 'item'):
        print obj['npi']
        count += 1
        if count > 100:
            break


1003003336
1003003633
1003008418
1003008723
1003010372
1003011123
1003013301
1003014333
1003015181
1003017138
1003017807
1003018037
1003018789
1003019050
1003019910
1003020512
1003021460
1003021775
1003022880
1003025792
1003026972
1003027848
1003028036
1003028184
1003028440
1003028838
1003028861
1003030248
1003030420
1003030792
1003031386
1003031675
1003031766
1003031972
1003033770
1003034984
1003035437
1003037292
1003039298
1003039579
1003039595
1003042649
1003043175
1003044082
1003044603
1003045493
1003047341
1003047929
1003052051
1003055377
1003056409
1003065285
1003070798
1003076589
1003085978
1003088576
1003091232
1003093048
1003093352
1003093469
1003093717
1003098476
1003098740
1003098757
1003103342
1003104399
1003104746
1003104977
1003108820
1003109851
1003114000
1003120627
1003126343
1003126350
1003126392
1003129255
1003130816
1003136649
1003140146
1003141342
1003147208
1003150384
1003151432
1003159864
1003161191
1003162934
1003165051
1003167958
1003170135
1003171182
1003176819

In [13]:
import ijson
import json
file_name="test.json"

with open(file_name) as f:
    for obj in ijson.items(f, 'item'):
        print obj['npi']
            


1003003336
1003003633
1003008418
1003008723
1003010372
1003011123


In [40]:
from urlparse import urlparse
u = urlparse('NOT SUBMITTED')

In [41]:
print u

ParseResult(scheme='', netloc='', path='NOT SUBMITTED', params='', query='', fragment='')


In [44]:
if not u.scheme:
    print 'hello'

hello


# Yelp API

This is a test of the Yelp API to understand how it works for looking up businesses. The OAuth keys and identifiers are pulled from environment variables in order to keep them out of the source code. The OAuth setup is performed via the Yelp Developer web site.

In [9]:
from yelp.client import Client
from yelp.oauth1_authenticator import Oauth1Authenticator
import os
import json

In [17]:

auth = Oauth1Authenticator(
    consumer_key=os.environ['YELP_CONSUMER_KEY'],
    consumer_secret=os.environ['YELP_CONSUMER_SECRET'],
    token=os.environ['YELP_TOKEN'],
    token_secret=os.environ['YELP_TOKEN_SECRET']
)

client = Client(auth)

In [21]:
params = {
    'term': 'Zenner M.D.',
    'lang': 'en'
}

a=client.search('Virginia Beach', **params)

In [19]:
for item in a.businesses:
    print item.name, item.review_count

Coastal Grill 76
Blue Pearl Veterinary Hospital 21
Tautog's Restaurant 481
Onelife Fitness - Greenbrier 42
Bodega 89
Carrabba's Italian Grill 36
Cheddar's Scratch Kitchen 192
Berrets Seafood Restaurant and Taphouse Grill 199
