In [1]:
%load_ext autotime

In [2]:
import boto3
import datetime
import logging
import json
import pickle
import pytest
import sys
import time

from etltools import s3

from lambda_client import (
    ClaimsClient,
    BenefitsClient,
    CalculatorClient,
    run_batch_on_schedule,
)
from lambda_client.config_info import ConfigInfo

reload(logging)  # get around notebook problem

<module 'logging' from '/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/logging/__init__.pyc'>

time: 529 ms


In [3]:
logging.basicConfig(
    level=logging.INFO, 
    format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
#     handlers=[
#         logging.FileHandler(filename='mylog.log', mode='w'),
#         logging.StreamHandler(sys.stdout),
#     ]
)

time: 1.43 ms


In [4]:
# Test whether logging works:
logger = logging.getLogger()
logger.info('TEST INFO')

[2018-01-19 07:23:19,727] {<ipython-input-4-efd2929c73d3>:3} INFO - TEST INFO


time: 2.16 ms


In [5]:
aws_info = {
    'profile_name': 'sandbox',
}

uids = s3.read_json('s3://picwell.sandbox.medicare/samples/philadelphia-2015-1k-sample')
pids = ['2820028008119', '2820088001036']

time: 237 ms


# Test ConfigInfo

In [6]:
configs = ConfigInfo('lambda_client/lambda.cfg')

print configs.claims_bucket
print configs.claims_path
print
print configs.benefits_bucket
print configs.benefits_path
print
print configs.claims_table

picwell.sandbox.analytics
junghoon/lambda_calculator

picwell.sandbox.analytics
junghoon/lambda_calculator_benefits

ma_claims
time: 2.87 ms


In [7]:
all_states = configs.all_states

print '{} states'.format(len(all_states))
print all_states

51 states
['01', '04', '05', '06', '08', '09', '10', '11', '12', '13', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '44', '45', '46', '47', '48', '49', '50', '51', '53', '54', '55', '56', '72']
time: 1.19 ms


# Test ClaimsClient

In [8]:
# Test S3:
client = ClaimsClient(aws_info, 
                      s3_bucket=configs.claims_bucket,
                      s3_path=configs.claims_path)

people = client.get(uids[:1])
print 'claims of {} people retrieved'.format(len(people))

claims of 1 people retrieved
time: 324 ms


In [9]:
person = people[0]
print person.keys()
{
    'uid': person['uid'],
    'medical_claims': person['medical_claims'][:5]
}

['medical_claims', 'uid']


{'medical_claims': [{'admitted': '2014-04-03',
   'benefit_category': 16,
   'cost': 148.0,
   'discharged': '2014-04-03',
   'length_of_stay': 1},
  {'admitted': '2014-05-02',
   'benefit_category': 16,
   'cost': 74.55,
   'discharged': '2014-05-02',
   'length_of_stay': 1},
  {'admitted': '2014-05-05',
   'benefit_category': 16,
   'cost': 104.39,
   'discharged': '2014-05-05',
   'length_of_stay': 1},
  {'admitted': '2014-05-13',
   'benefit_category': 16,
   'cost': 210.12,
   'discharged': '2014-05-13',
   'length_of_stay': 1},
  {'admitted': '2014-05-19',
   'benefit_category': 11,
   'cost': 442.2,
   'discharged': '2014-05-19',
   'length_of_stay': 1}],
 'uid': '1302895801'}

time: 5.5 ms


In [10]:
# Let's try something larger:
people = client.get(uids)
print 'claims of {} people retrieved'.format(len(people))

claims of 1000 people retrieved
time: 2min 30s


In [11]:
# Test DynamoDB:
client = ClaimsClient(aws_info,
                      table_name=configs.claims_table)

people = client.get(uids[:1])
print 'claims of {} people retrieved'.format(len(people))

claims of 1 people retrieved
time: 540 ms


In [12]:
person = people[0]
print person.keys()
{
    'uid': person['uid'],
    'medical_claims': person['medical_claims'][:5]
}

['medical_claims', 'uid']


{'medical_claims': [{'admitted': '2014-04-03',
   'benefit_category': 16,
   'cost': 148.0,
   'discharged': '2014-04-03',
   'length_of_stay': 1},
  {'admitted': '2014-05-02',
   'benefit_category': 16,
   'cost': 74.55,
   'discharged': '2014-05-02',
   'length_of_stay': 1},
  {'admitted': '2014-05-05',
   'benefit_category': 16,
   'cost': 104.39,
   'discharged': '2014-05-05',
   'length_of_stay': 1},
  {'admitted': '2014-05-13',
   'benefit_category': 16,
   'cost': 210.12,
   'discharged': '2014-05-13',
   'length_of_stay': 1},
  {'admitted': '2014-05-19',
   'benefit_category': 11,
   'cost': 442.2,
   'discharged': '2014-05-19',
   'length_of_stay': 1}],
 'uid': '1302895801'}

time: 4.84 ms


In [13]:
# Let's try something larger:
people = client.get(uids)
print 'claims of {} people retrieved'.format(len(people))

claims of 1000 people retrieved
time: 8.76 s


In [14]:
# Test configuration file and retrieving multiple people:
client = ClaimsClient(aws_info)

people = client.get(uids[:5])
print 'claims of {} people retrieved'.format(len(people))

claims of 5 people retrieved
time: 707 ms


In [15]:
# The object should not be pickled.
with pytest.raises(Exception, match='ClaimsClient object cannot be pickled.'):
    pickle.dumps(client)

time: 1.23 ms


# Test BenefitsClient

In [16]:
client = BenefitsClient(aws_info)

print client.all_states

['01', '04', '05', '06', '08', '09', '10', '11', '12', '13', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '44', '45', '46', '47', '48', '49', '50', '51', '53', '54', '55', '56', '72']
time: 1.39 ms


In [17]:
plans = client._get_one_state('01')
print '{} plans read for state 01'.format(len(plans))

plans = client._get_one_state('04')
print '{} plans read for state 04'.format(len(plans))

47 plans read for state 01
75 plans read for state 04
time: 778 ms


In [18]:
plans = client.get_by_state(['01', '04'])
print '{} plans read'.format(len(plans))

122 plans read
time: 583 ms


In [19]:
plans = client.get_all()
print '{} plans read'.format(len(plans))

3558 plans read
time: 10 s


In [20]:
# Compare the timing against reading the entire file:
from lambda_client.shared_utils import _read_json

session = boto3.Session(**aws_info)
resource = session.resource('s3')

time: 61.8 ms


In [21]:
all_plans = _read_json('picwell.sandbox.medicare', 'ma_benefits/cms_2018_pbps_20171005.json', resource)

print '{} plans read'.format(len(plans))

3558 plans read
time: 5.43 s


In [22]:
# Ensure that the same plans are read:
sort_key = lambda plan: plan['picwell_id']
assert sorted(all_plans, key=sort_key) == sorted(plans, key=sort_key)

time: 148 ms


In [23]:
# The object should not be pickled.
with pytest.raises(Exception, match='BenefitsClient object cannot be pickled.'):
    pickle.dumps(client)

time: 1.42 ms


# Test Cost Breakdown

In [24]:
client = CalculatorClient(aws_info)

time: 701 µs


In [25]:
responses = client.get_breakdown(uids[:1], pids, verbose=True)

print '{} responses returned'.format(len(responses))
responses[0]

START RequestId: f2b84cf1-fd13-11e7-9775-3b9855aebe52 Version: $LATEST
[INFO]	2018-01-19T12:26:17.700Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Clock started at 2018-01-19 12:26:17.700797.
[INFO]	2018-01-19T12:26:17.772Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Thread initialization took 0.071103 seconds.
[INFO]	2018-01-19T12:26:17.834Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Found credentials in environment variables.
[INFO]	2018-01-19T12:26:19.72Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Starting new HTTPS connection (1): s3.amazonaws.com
[INFO]	2018-01-19T12:26:19.208Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Joining all threads took 1.435514 seconds.
[INFO]	2018-01-19T12:26:19.208Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Combining all results in order took 2.9e-05 seconds.
[INFO]	2018-01-19T12:26:19.208Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Claim retrieval for [u'1302895801'] took 1.507642 seconds.
[INFO]	2018-01-19T12:26:19.272Z	f2b84cf1-fd13-11e7-9775-3b9855aebe52	Thread initialization took 0.062931 s

{u'allowed': 26376.640000000003,
 u'covered_breakdown': {u'categories': {u'0': 0.0,
   u'11': 186.33999999999997,
   u'13': 13.23,
   u'15': 30.79,
   u'16': 0.0,
   u'19': 80.0,
   u'25': 1750.0,
   u'30': 719.1700000000001,
   u'31': 300.0,
   u'44': 0.0,
   u'49': 118.48800000000001,
   u'7': 479.558},
  u'composite': 3677.5759999999996,
  u'in_network': 3677.5759999999996,
  u'out_network': 0.0},
 u'deductible_breakdown': {u'categories': {u'0': 0.0,
   u'11': 0.0,
   u'13': 0.0,
   u'15': 0.0,
   u'16': 0.0,
   u'19': 0.0,
   u'25': 0.0,
   u'30': 0.0,
   u'31': 0.0,
   u'44': 0.0,
   u'49': 0.0,
   u'7': 0.0},
  u'composite': 0.0,
  u'in_network': 0.0,
  u'out_network': 0.0},
 u'oop': 3677.5759999999996,
 u'picwell_id': u'2820028008119',
 u'uid': u'1302895801',
 u'uncovered': 0.0,
 u'uncovered_breakdown': {u'categories': {u'0': 0.0,
   u'11': 0.0,
   u'13': 0.0,
   u'15': 0.0,
   u'16': 0.0,
   u'19': 0.0,
   u'25': 0.0,
   u'30': 0.0,
   u'31': 0.0,
   u'44': 0.0,
   u'49': 0.0,


time: 5.18 s


In [26]:
responses = client.get_breakdown(uids[:1], pids, use_s3_for_claims=False, verbose=True)

print '{} responses returned'.format(len(responses))

START RequestId: f5bb53e9-fd13-11e7-a20c-7507f01538b7 Version: $LATEST
[INFO]	2018-01-19T12:26:22.713Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Clock started at 2018-01-19 12:26:22.713818.
[INFO]	2018-01-19T12:26:22.732Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Thread initialization took 0.018263 seconds.
[INFO]	2018-01-19T12:26:22.772Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Found credentials in environment variables.
[INFO]	2018-01-19T12:26:23.52Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Starting new HTTPS connection (1): dynamodb.us-east-1.amazonaws.com
[INFO]	2018-01-19T12:26:23.215Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Joining all threads took 0.482862 seconds.
[INFO]	2018-01-19T12:26:23.215Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Combining all results in order took 2.7e-05 seconds.
[INFO]	2018-01-19T12:26:23.232Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Claim retrieval for [u'1302895801'] took 0.518392 seconds.
[INFO]	2018-01-19T12:26:23.432Z	f5bb53e9-fd13-11e7-a20c-7507f01538b7	Thread initialization

In [27]:
# Test recursive call:
responses = client.get_breakdown(uids[:10], pids, max_calculated_uids=10)

print '{} responses returned'.format(len(responses))

20 responses returned
time: 15.4 s


In [28]:
responses = client.get_breakdown(uids[:10], pids, max_lambda_calls=2)

print '{} responses returned'.format(len(responses))

20 responses returned
time: 7.3 s


In [29]:
responses = client.get_breakdown(uids[:10], pids)

print '{} responses returned'.format(len(responses))

20 responses returned
time: 7.01 s


In [30]:
# Check whether DynamoDB reduces latency:
responses = client.get_breakdown(uids[:10], pids, use_s3_for_claims=False, max_calculated_uids=10)

print '{} responses returned'.format(len(responses))

20 responses returned
time: 5.83 s


In [31]:
# Let's try something larger:
responses = client.get_breakdown(uids, pids)

print '{} responses returned'.format(len(responses))

2000 responses returned
time: 21.6 s


In [32]:
responses = client.get_breakdown(uids, pids, use_s3_for_claims=False)

print '{} responses returned'.format(len(responses))

2000 responses returned
time: 24.9 s


In [33]:
# Runs into memory issue if all 1000 people are calculated once:
responses = client.get_breakdown(uids, pids, use_s3_for_claims=False, max_calculated_uids=100, max_lambda_calls=10)

print '{} responses returned'.format(len(responses))

2000 responses returned
time: 17.8 s


In [34]:
from lambda_package.calc.calculator import calculate_oop

def run_locally(people, plans, oop_only):
    costs = []
    
    for person in people:
        claims = person['medical_claims']
    
        for plan in plans:

            cost = calculate_oop(claims, plan)
            if oop_only:
                cost = {
                    'oop': cost['oop']
                }

            cost.update({
                'uid': person['uid'],
                'picwell_id': str(plan['picwell_id']),
            })

            costs.append(cost)
    
    return costs

time: 12.3 ms


In [35]:
# Run calculcations locally for comparison:
# claims_client = ClaimsClient(aws_info, 
#                              s3_bucket=configs.claims_bucket,
#                              s3_path=configs.claims_path)
claims_client = ClaimsClient(aws_info, 
                             table_name=configs.claims_table)
people = claims_client.get(uids)

benefits_client = BenefitsClient(aws_info)
plans = benefits_client.get_by_pid(pids)

costs = run_locally(people, plans, False)

print '{} costs calculated'.format(len(costs))

2000 costs calculated
time: 11.3 s


In [36]:
# benefits_client = BenefitsClient()
benefits_client = BenefitsClient(aws_info)
plans_CA = benefits_client.get_by_state(['06'])
pids_CA = [plan['picwell_id'] for plan in plans_CA]

print '{} plans identified'.format(len(pids_CA))

268 plans identified
time: 432 ms


In [37]:
# Try a sample size more relevant to commercial:
responses = client.get_oop(uids[:300], pids_CA)

print '{} responses returned'.format(len(responses))

80400 responses returned
time: 19.4 s


In [38]:
responses = client.get_oop(uids[:300], pids_CA, use_s3_for_claims=False)

print '{} responses returned'.format(len(responses))

80400 responses returned
time: 15.8 s


In [39]:
# Increasing the amount of computation at the terminal nodes increases the time. This is probably
# because there are many plans.
responses = client.get_oop(uids[:300], pids_CA, use_s3_for_claims=False, max_calculated_uids=3)

print '{} responses returned'.format(len(responses))

80400 responses returned
time: 19.7 s


In [40]:
claims_client = ClaimsClient(aws_info, 
                             table_name=configs.claims_table)
people = claims_client.get(uids[:300])

benefits_client = BenefitsClient(aws_info)
plans = benefits_client.get_by_pid(pids_CA)

costs = run_locally(people, plans, True)

print '{} costs calculated'.format(len(costs))

80400 costs calculated
time: 1min 2s


# Test Batch Calculation

In [41]:
uids = s3.read_json('s3n://picwell.sandbox.medicare/samples/philadelphia-2015')

print '{} uids read'.format(len(uids))

1352473 uids read
time: 5.9 s


In [42]:
configs = ConfigInfo('lambda_client/lambda.cfg')
all_states = configs.all_states

print '{} states'.format(len(all_states))

51 states
time: 2.31 ms


In [43]:
client = CalculatorClient(aws_info)

time: 820 µs


In [44]:
response = client.run_batch(uids[:1], months=['01', '02', '03'], states=['01', '06'], verbose=True)

print response

START RequestId: 847ae8aa-fd14-11e7-b48a-4daabd806c86 Version: $LATEST
[INFO]	2018-01-19T12:30:22.267Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Clock started at 2018-01-19 12:30:22.267031.
[INFO]	2018-01-19T12:30:22.286Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Thread initialization took 0.018485 seconds.
[INFO]	2018-01-19T12:30:22.449Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Found credentials in environment variables.
[INFO]	2018-01-19T12:30:23.6Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Starting new HTTPS connection (1): s3.amazonaws.com
[INFO]	2018-01-19T12:30:23.184Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Joining all threads took 0.898056 seconds.
[INFO]	2018-01-19T12:30:23.184Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Combining all results in order took 2.6e-05 seconds.
[INFO]	2018-01-19T12:30:23.184Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Claim retrieval for [u'31970436701'] took 0.917279 seconds.
[INFO]	2018-01-19T12:30:23.185Z	847ae8aa-fd14-11e7-b48a-4daabd806c86	Processing state 01:
[INFO]	2018-01-1

In [45]:
response = client.run_batch(uids[:2], states=['01', '06', '36'], max_calculated_uids=2)

print response

[[u'31970436701', 36], [u'1674435601', 36]]
time: 1min 57s


In [46]:
# Test recursive call:
response = client.run_batch(uids[:2], states=['01', '06', '36'])

print response

[[u'31970436701', 36], [u'1674435601', 36]]
time: 42.3 s


In [47]:
# This only runs for large enough Lambda, e.g. 512 MB, and fails for 256 MB Lambda. 
# Memory determines how fast one can run.
response = client.run_batch(uids[:20], months=['01'], states=all_states[:5])

print response

[[u'31970436701', 5], [u'1784327003', 5], [u'1674435601', 5], [u'246540301', 5], [u'3158762201', 5], [u'31917048601', 5], [u'204451602', 5], [u'3154052701', 5], [u'595989402', 5], [u'617921001', 5], [u'3170818001', 5], [u'894536502', 5], [u'30498125901', 5], [u'3182854701', 5], [u'3173810801', 5], [u'2225890201', 5], [u'1789308302', 5], [u'946101001', 5], [u'986282601', 5], [u'763956001', 5]]
time: 17.6 s


In [48]:
# Not sure why an adjustment factor is needed:
factor = 6

responses = run_batch_on_schedule(lambda uids: client.run_batch(uids, months=['01'], states=all_states[:5]),
                                  uids[:10000], num_writes_per_uid=5, mean_runtime=30, 
                                  min_writes=100*factor, max_writes=10000*factor, verbose=True)

5e-06: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9996 remaining)
1.00899: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9992 remaining)
2.023712: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9988 remaining)
3.034808: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9984 remaining)
4.048788: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9980 remaining)
5.059888: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9976 remaining)
6.069375: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9972 remaining)
7.082176: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9968 remaining)
8.095552: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9964 remaining)
9.108687: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9960 remaining)
10.120677: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9956 remaining)
11.131586: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9952 remaining)
12.14171: issuing 4 UIDs for every 1.0 seconds (0 intervals, 9948 remaining)
1

In [49]:
print 'number of responses: {}'.format(len(responses))

writes = 0
for response in responses:
    writes += response[1]
print 'number of writes: {}'.format(writes)

number of responses: 10000
number of writes: 50000
time: 6.18 ms
