# Environment config

In [30]:
import io
import re
import gzip
import json
import requests
import numpy as np
import pandas as pd
import pydocumentdb.document_client as ddb

# External data

In [2]:
metadataUrl = 'https://chstone.blob.core.windows.net/public/congress.json.gz'
topicProbsUrl = 'https://chstone.blob.core.windows.net/public/CongressionalDocTopicProbs.npy.gz'
idsUrl = 'https://chstone.blob.core.windows.net/public/bill-ids.csv.gz'
textUrl = 'https://chstone.blob.core.windows.net/public/BillSummaries.tsv.gz'

In [3]:
def readJsonGz(url, **kwargs):
    return pd.read_json(gzip.open(requests.get(url, stream=True).raw, mode='rt'), **kwargs)

def readNumpyGz(url, **kwargs):
    return pd.DataFrame(np.load(gzip.open(io.BytesIO(requests.get(url, stream=True).content)), **kwargs))

def readCsvGz(url, **kwargs):
    return pd.read_csv(gzip.open(requests.get(url, stream=True).raw, mode='rt'), **kwargs)

def sparseDict(d):
    minVal = min(d.values())
    return {str(k):v for k,v in d.items() if v > minVal}

# Download raw data

* `metadata` contains information about the bill: sponsor, history, date
* `topicProbs` is a matrix of document/topic values
* `ids` maps the row index of of `topicProbs` to a string identifier

In [4]:
metadata = readJsonGz(metadataUrl).set_index(['id'])
topicProbs = readNumpyGz(topicProbsUrl)
ids = readCsvGz(idsUrl)
text = readCsvGz(textUrl, sep='\t').set_index(['ID'])

* `topics` is a joined dataframe between `ids` and `topicProbs`
* `metas` is a joined dataframe between `ids` and `metadata`

In [5]:
topics = ids.join(topicProbs, how='inner').set_index('ID')
metas = ids.set_index('ID').join(metadata, how='inner')
metas['type'] = list(map(lambda x: re.match('^([a-z]+)', x).group(1), metas.index))
metas['house'] = list(map(lambda x: x[0], metas.index))

# DocumentDB

The `getOrCreate` functions ensure that our database objects exist and expose otherwise opaque "self links" of each object

The `q` function allows us to write query arguments in a Pythonesque way

In [6]:
def getOrCreateDatabase(client, databaseId):
    query = q('SELECT * FROM r WHERE r.id=@id', id=databaseId)
    result = list(client.QueryDatabases(query))
    return result[0] if len(result) else client.CreateDatabase({'id':databaseId})

def getOrCreateCollection(client, databaseLink, collectionId):
    query = q('SELECT * FROM r WHERE r.id=@id', id=collectionId)
    result = list(client.QueryCollections(databaseLink, query))
    return result[0] if len(result) else client.CreateCollection(databaseLink, {'id':collectionId})

def getOrCreateStoredProcedure(client, collectionLink, sprocId, sprocBody):
    query = q('SELECT * FROM r WHERE r.id=@id', id=sprocId)
    result = list(client.QueryStoredProcedures(collectionLink, query))
    return result[0] if len(result) else client.CreateStoredProcedure(collectionLink, {'id':sprocId, 'body':sprocBody})

def getOrCreateUserDefinedFunction(client, collectionLink, udfId, udfBody):
    query = q('SELECT * FROM r WHERE r.id=@id', id=udfId)
    result = list(client.QueryUserDefinedFunctions(collectionLink, query))
    return result[0] if len(result) else client.CreateUserDefinedFunction(collectionLink, {'id':udfId, 'body':udfBody})
    
def q(statement, **kwargs):
    return { 'query': statement, 'parameters': [{'name':'@'+k, 'value':str(v)} for (k, v) in kwargs.items()]}

## Bulk import stored procedure (JavaScript)
To reduce network overhead, define a stored procedure to handle bulk document creation on the server. Stored procedures are halted after a few seconds, so expect to call this function repeatedly until the returned count is equal to the total number of documents.

In [7]:
bulkImportSproc = '''
function bulkImport(docs) {
  var self = __.getSelfLink()
    , offset = 0
    , fn = 'upsertDocument' || 'createDocument';

  create();

  function create() {
    if (!hasMore() || !__[fn](self, docs[offset], onCreate)) {
      complete();
    }
  }

  function hasMore() {
    return docs.length && offset < docs.length;
  }

  function onCreate(err, res, opt) {
    if (err) throw err;
    offset += 1;
    create();
  }

  function complete() {
    __.response.setBody({count:offset});
  }
}
'''

## Counting aggregation stored procedure  (JavaScript)
DocumentDB does not currently support aggregation. To work around this limitation, define a stored procedure that performs aggregation via a table scan. Stored procedures are halted after a few seconds, so expect to call this function multiple times using a continuation token.

In [8]:
countBySproc = '''
function countBy(query, key, counts, ct) {
  var state = {}
    , self = __.getSelfLink()
    , maxItems = 1000;
    
  state.continuation = ct;
  state.counts = counts || {};
  
  next();

  function next() {
    if (!__.queryDocuments(self, query, {pageSize:maxItems,continuation:state.continuation}, onResults)) {
      complete()
    }
  }
  
  function onResults(err, res, opt) {
    if (err) throw err;
      
    state.continuation = opt.continuation;
    res.forEach(function(result) {
      state.counts[result[key]] = state.counts[result[key]] || 0;
      state.counts[result[key]] += 1;
    });
    
    if (opt.continuation) {
      next(opt.continuation);
    } else {
      complete();
    }
  }
  
  function complete() {
    __.response.setBody(state);
  }
}
'''

In [9]:
sumBySproc = '''
function sumBy(query, key, sumField, counts, ct) {
  var state = {}
    , self = __.getSelfLink()
    , maxItems = 1000;
    
  state.continuation = ct;
  state.counts = counts || {};
  
  next();

  function next() {
    if (!__.queryDocuments(self, query, {pageSize:maxItems,continuation:state.continuation}, onResults)) {
      complete()
    }
  }
  
  function onResults(err, res, opt) {
    if (err) throw err;
      
    state.continuation = opt.continuation;
    res.forEach(function(result) {
      state.counts[result[key]] = state.counts[result[key]] || 0;
      state.counts[result[key]] += result[sumField];
    });
    
    if (opt.continuation) {
      next(opt.continuation);
    } else {
      complete();
    }
  }
  
  function complete() {
    __.response.setBody(state);
  }
}
'''

## Date quarter function  (JavaScript)
Our queries will perform binning based on quarter-of-year. Define a User Defined Function (UDF) to perform this calculation directly on the resultset.

In [10]:
roundDateToQuarterUdf = '''
function roundDateToQuarter(value) {
  var date = new Date(value)
  if (!value) return value;
  if (isNaN(date.getTime())) return value;
  
  date.setUTCDate(1)
  date.setUTCMonth(Math.floor(date.getUTCMonth() / 3) * 3)
  date.setUTCHours(0)
  date.setUTCMinutes(0)
  date.setUTCSeconds(0)
  date.setUTCMilliseconds(0)
  return date.toISOString()
}
'''

## Connect to the database

In [11]:
host = 'https://kbdocs.documents.azure.com:443/'
key = 'sefM46ZJn1WPnSc6wrVuDeOYaUrXQ9aZGvT8kttpX4w7r2UXq67maLn5H3HW0mPuFuOQQu96HFvK9AMy0uxeGg=='
client = ddb.DocumentClient(host, {'masterKey':key})
databaseName = 'congress'
collectionName = 'data-s3'

In [12]:
db = getOrCreateDatabase(client, databaseName)
collection = getOrCreateCollection(client, db['_self'], collectionName)
bulkImport = getOrCreateStoredProcedure(client, collection['_self'], 'bulkImport', bulkImportSproc)
roundDateToQuarter = getOrCreateUserDefinedFunction(client, collection['_self'], 'roundDateToQuarter', roundDateToQuarterUdf)
countBy = getOrCreateStoredProcedure(client, collection['_self'], 'countBy', countBySproc)
sumBy = getOrCreateStoredProcedure(client, collection['_self'], 'sumBy', sumBySproc)

## Upload documents to the database
Documents should be uploaded in batches to avoid unnecessary network overhead between us and the server. However, the batch size must be small enough to avoid `Request too Large` errors from our DocumentDB collection.

Each stored procedure in DocumentDB is allocated only a few seconds of running time, so our stored procedure may "bail out" before processing a single batch. This is expected, and we handle this condition by tracking the count of how many documents were actually processed (opposed to how many were sent). However, try to avoid sending batches whose size greatly exceeds the number of documents that the stored procedure can process in a single pass. Any documents that do not get processed in the first pass will have to be sent over the network a second time, increasing the total processing time.

As we merge our topics dict with the rest of the metadata, it's important to note that we are only uploading a sparse representation of the topics (see `sparseDict()`). We can safely exclude any topic probabilities with a very low probability `< 0.001` due to the nature of our topic model. This greatly reduces the size of each database document, and thus increases the performance of our upload process.

In [14]:
offset = 0
batchSize = 350
metas = metas[0:5000]
while offset < len(metas):
    (start, end) = (offset, offset+batchSize)
    batch = metas[start:end].reset_index().rename(columns={'index': 'id'})
    batch['topics'] = batch['id'].apply(lambda x: sparseDict(topics.loc[x].to_dict()))
    args = batch.to_dict('records')
    print('{start}-{end} of {total}'.format(start=start, end=start+len(args), total=len(metas)))
    offset += client.ExecuteStoredProcedure(bulkImport['_self'], [args])['count']


0-350 of 5000
350-700 of 5000
700-1050 of 5000
1050-1400 of 5000
1400-1750 of 5000
1750-2100 of 5000
2100-2450 of 5000
2450-2800 of 5000
2800-3150 of 5000
3150-3500 of 5000
3500-3850 of 5000
3850-4200 of 5000
4200-4550 of 5000
4550-4900 of 5000
4900-5000 of 5000


## Retrieve counts-per quarter on a given topic
After uploading the documents, we can use a second stored procedure to perform `count` aggregation from a given query and `grouping` key.

Each stored procedure in DocumentDB is allocated only a few seconds of running time, so our stored procedure may return an incomplete count along with a token to continue processing. Repeatedly call the stored procedure (along with the current `count` mapping) until no token is returned to retrieve the full aggregation.

In [21]:
topic = 1
key = 'date'
query = 'SELECT udf.roundDateToQuarter(r.dateIntroduced) AS {key}, r.topics["{topic}"] AS TOPIC from r WHERE r.topics["{topic}"] > 0 AND r.type="s"'.format(topic=topic, key=key)
more = True
counts = None
ct = None

while more:
    print(query)
    res = client.ExecuteStoredProcedure(sumBy['_self'], [query, key, '', counts, ct])
    counts = res['counts']
    ct = res['continuation'] if 'continuation' in res else None
    more = ct is not None

counts

SELECT udf.roundDateToQuarter(r.dateIntroduced) AS date, r.topics["1"] AS TOPIC from r WHERE r.topics["1"] > 0 AND r.type="s"


{'1983-01-01T00:00:00.000Z': None,
 '1983-04-01T00:00:00.000Z': None,
 '1983-07-01T00:00:00.000Z': None,
 '1983-10-01T00:00:00.000Z': None,
 '1984-01-01T00:00:00.000Z': None,
 '1984-04-01T00:00:00.000Z': None,
 '1984-07-01T00:00:00.000Z': None,
 '1984-10-01T00:00:00.000Z': None,
 '1985-01-01T00:00:00.000Z': None,
 '1985-04-01T00:00:00.000Z': None,
 '1985-07-01T00:00:00.000Z': None,
 '1985-10-01T00:00:00.000Z': None,
 '1986-01-01T00:00:00.000Z': None,
 '1986-04-01T00:00:00.000Z': None,
 '1986-07-01T00:00:00.000Z': None,
 '1986-10-01T00:00:00.000Z': None,
 '1987-01-01T00:00:00.000Z': None,
 '1987-04-01T00:00:00.000Z': None,
 '1987-07-01T00:00:00.000Z': None,
 '1987-10-01T00:00:00.000Z': None,
 '1988-01-01T00:00:00.000Z': None,
 '1988-04-01T00:00:00.000Z': None,
 '1988-07-01T00:00:00.000Z': None,
 '1988-10-01T00:00:00.000Z': None,
 '1989-01-01T00:00:00.000Z': None,
 '1989-04-01T00:00:00.000Z': None,
 '1989-07-01T00:00:00.000Z': None,
 '1989-10-01T00:00:00.000Z': None,
 '1990-01-01T00:00:0

# Azure Search

In [22]:
searchService = 'chstone2'
searchKey = '6835C3B19F3E8A5BAF46A0AC8B22A81B'
searchSession = requests.Session()

In [23]:
def searchRequest(expectStatus = [200], **kwargs):
    kwargs['url'] = 'https://{service}.search.windows.net/{path}'.format(service=searchService, path=kwargs['url'])
    kwargs.setdefault('method', 'get')
    kwargs.setdefault('headers', {})['api-key'] = searchKey
    kwargs.setdefault('params', {})['api-version'] = '2015-02-28-preview'
    res = searchSession.request(**kwargs)
    if (res.status_code not in expectStatus):
        raise Exception({'status_code':res.status_code, 'content':res.content})
    return res
def ensureIndex(schema):
    if searchRequest(url='indexes/{name}'.format(name=schema['name']), params={'$select':'name'}).status_code == 404:
        res = searchRequest(method='post', url='indexes', json=schema, expectStatus=range(200,210))

In [25]:
congressSchema = json.loads('''
{
  "name": "congress",
  "fields": [
    {
      "name": "id",
      "type": "Edm.String",
      "key": true,
      "searchable": false,
      "filterable": true,
      "facetable": false,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "title",
      "type": "Edm.String",
      "key": false,
      "searchable": true,
      "filterable": false,
      "facetable": false,
      "sortable": false,
      "retrievable": true,
      "analyzer": "en.microsoft",
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "content",
      "type": "Edm.String",
      "key": false,
      "searchable": true,
      "filterable": false,
      "facetable": false,
      "sortable": false,
      "retrievable": false,
      "analyzer": "en.microsoft",
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "dateIntroduced",
      "type": "Edm.DateTimeOffset",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": true,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "dateEnacted",
      "type": "Edm.DateTimeOffset",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": true,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "vetoed",
      "type": "Edm.Boolean",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "enacted",
      "type": "Edm.Boolean",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "awaitingSignature",
      "type": "Edm.Boolean",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "housePassageResult",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "senatePassageResult",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "topics",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": false,
      "facetable": false,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "topicTags",
      "type": "Collection(Edm.String)",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": false,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "status",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "sponsorName",
      "type": "Edm.String",
      "key": false,
      "searchable": true,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "sponsorGender",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "sponsorState",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "sponsorParty",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "url",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": false,
      "facetable": false,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "house",
      "type": "Edm.String",
      "key": false,
      "searchable": false,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    },
    {
      "name": "type",
      "type": "Edm.String",
      "key": false,
      "searchable": true,
      "filterable": true,
      "facetable": true,
      "sortable": false,
      "retrievable": true,
      "analyzer": null,
      "indexAnalyzer": null,
      "searchAnalyzer": null,
      "synonymMaps": []
    }
  ],
  "suggesters": [],
  "scoringProfiles": [],
  "defaultScoringProfile": null,
  "analyzers": [],
  "charFilters": [],
  "tokenizers": [],
  "tokenFilters": []
}
''')

In [26]:
ensureIndex(congressSchema)

In [27]:
def mapSearchRecord(record):
    docId = record.get('id')
    docTopics = sparseDict(topics.loc[docId].to_dict())
    history = record.get('history')
    sponsor = record.get('primarySponsor')
    searchRecord = {
        '@search.action': 'upload',
        'content': text.at[docId, 'Text'],
        'topics': json.dumps(docTopics),
        'topicTags': list(docTopics.keys()) }
    for k in ['id', 'title', 'dateIntroduced', 'url', 'status', 'house', 'type']:
        searchRecord[k]=record.get(k)
    for k in ['dateEnacted', 'vetoed', 'awaitingSignature', 'housePassageResult', 'senatePassageResult']:
        searchRecord[k]=history.get(k)
    for k in ['name', 'gender', 'state', 'party']:
        searchRecord['sponsor{0}{1}'.format(k[0].upper(), k[1:])]=sponsor.get(k)
    return searchRecord

In [None]:
offset = 0
batchSize = 1000
while offset < len(metas):
    (start, end) = (offset, offset+batchSize)
    batch = list(map(mapSearchRecord, metas[start:end].reset_index().rename(columns={'index':'id'}).to_dict('records')))
    print('{start}-{end} of {total}'.format(start=start, end=end, total=len(metas)))
    searchRequest(method='post', url='indexes/{0}/docs/index'.format(congressSchema['name']), json={'value':batch})
    offset += len(batch)

In [28]:
searchRequest(
    method='get', 
    url='/indexes/{0}/docs'.format(congressSchema['name']), 
    params={
        'search':'"software exports"~3', 
        '$filter':"type eq 's'", 
        'facet':'dateIntroduced,interval:quarter',
        'queryType':'full', 
        '$count':'true'}).json()

{'@odata.context': "https://chstone2.search.windows.net/indexes('congress')/$metadata#docs(id,title,dateIntroduced,dateEnacted,vetoed,enacted,awaitingSignature,housePassageResult,senatePassageResult,topics,status,sponsorName,sponsorGender,sponsorState,sponsorParty,url,house,type)",
 '@odata.count': 5,
 '@search.facets': {'dateIntroduced': [{'count': 1,
    'value': '1996-01-01T00:00:00Z'},
   {'count': 1, 'value': '1996-04-01T00:00:00Z'},
   {'count': 2, 'value': '1997-01-01T00:00:00Z'},
   {'count': 1, 'value': '1997-04-01T00:00:00Z'}],
  'dateIntroduced@odata.type': '#Collection(Microsoft.Azure.Search.V2015_02_28_Preview.QueryResultFacet)'},
 'value': [{'@search.score': 3.6256244,
   'awaitingSignature': False,
   'dateEnacted': None,
   'dateIntroduced': '1997-03-03T00:00:00Z',
   'enacted': None,
   'house': 's',
   'housePassageResult': None,
   'id': 's387-105',
   'senatePassageResult': None,
   'sponsorGender': 'M',
   'sponsorName': 'Hatch, Orrin G.',
   'sponsorParty': 'Repub