# Feature creation pipeline to operationalize data quality

# Aspirational TODO list
- How many different versions of the standard are we using
- Build a flag for invalid reporting-org indications, add to features dict
- Are there any other description contents to calculate ("Wow, there's a lot of txn info here!") - Do this on server side

### Initial Setup

In [54]:
from __future__ import division,print_function

import os
import sys
import json
import re
import time
import pickle
import requests
import six
from datetime import datetime
from collections import Counter
from operator import add

import pandas as pd
import pymongo

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from pprint import PrettyPrinter
p=PrettyPrinter()

In [2]:
conn=pymongo.MongoClient()

# Check database names to test connection and see what's here
conn.database_names()

[u'db_followers', u'db_tweets', u'iati', u'irs', u'local', u'MIDS1-w205']

In [16]:
# Make iati database
db = conn.iati

# Create collection objects for easy reference later
activities=db.activities
metadata=db.metadata

#Verify contents of database
db.collection_names()

[u'activities',
 u'activities_archive',
 u'cleaned_orgs',
 u'cleaned_orgs_full',
 u'first_pass_activities',
 u'system.indexes',
 u'xml_docs']

In [None]:
#Archive old activities collection in case we screw something up
#activities.rename('activities_archive')

In [None]:
#restore activities
!mongorestore ./dump

In [4]:
print(activities.count())

587828


In [5]:
#Load a test record into memory so we have it on hand for later
activity=activities.find_one({'iati-identifier':'XM-DAC-41114-PROJECT-00047321'})

## Define activity-level feature creation functions

We're going to establish three different types of features
- Completeness (does the activity have the data needed to be useful)
- Compliance (does the numeric data in activity appear to be naturally generated)
- Utility (is the activity likely to be practically useful to a human reader)

### Completeness features

In [6]:
#Simple check for missing data

def check_missing_fields(activity,field,features):
    """Generic function for checking whether or not a given field has data"""
    try:
        if activity[field]:
            features['missing_'+field]=0
        else:
            features['missing_'+field]=1
    except KeyError:
        features['missing_'+field]=1

In [7]:
def get_val(val):
    """
    Many IATI schema complex types are simple content (no embedded tags) with attributes.
    xmltodict represents these as string values if no attributes are present, or as dicts with
    the content as the '#text' member otherwise.
    """
    if isinstance(val, dict):
        return val.get('#text')
    return val

### Compliance features
These are based on [Benford's law](https://en.wikipedia.org/wiki/Benford%27s_law), which defines certain patterns for the frequency of digits appearing in naturally generated data.  Datasets that don't comply with Benford's law are more likely to be made up (for example, by an analyst guessing at an approximate value rather than calculating it) or even fraudulent.  We aren't making any claims about the veracity of any activities, but rather simply trying to enable the user to get a sense for which numbers have a higher likelihood of a real-world meaning, since this is what's ultimately going to make a dataset useful for understanding an aid intervention

In [18]:
#Benford's Law Analysis

def calculate_digit_distribution(list_of_numbers):
    """Returns a distribution of digits given a list of numbers (type agnostic)"""
    counts=[0]*10
    for num in list_of_numbers:
        try:
            num=filter(six.text_type.isdigit, six.text_type(num))
            digit=int(next(iter(num)))
            counts[digit]+=1
        except StopIteration:
            pass
    total=sum(counts)
    if total:
        return [round(i/total,4) for i in counts]
    return counts

def benfords_law(activity,field,features):
    """
    Tests whether or not a given field on an activity obeys Benford's law
    
    For our purposes, a distribution complies with Benford's law if the 1
    represents at least 30% of the data.  This is a simplistic definition,
    but it'll work for the prototype and we can make it more nuanced later
    
    """
    distribution=None
    compliance=None
    values=activity.get(field)
    if values:
        #Make sure types are consistent
        if type(values)!=list:
            values=[values]
        raw_vals=[]
        for value in values:
            contents=get_val(value.get('value'))
            if contents:
                raw_vals.append(contents)
        if raw_vals:
            distribution=calculate_digit_distribution(raw_vals)
            if distribution[1]>0.3:
                compliance=1
            else:
                compliance=0            
    features['benford_distribution_'+field]=distribution
    features['benford_compliance_'+field]=compliance

### Utility Features

Here, we're trying to understand how practically useful a record is likely to be to a lay user.  Useful records are likely to be managable in size and contain a diverse mix of information.  If a record consists entirely of one type of data (usually long lists of transactions or budget items), it's liable to obscure important content about its programs in the same way that meaningful data is overwhelmed by useless packets sent during a DDOS attack.

We also examine the similarity of the title and description of each activity, since these are typically among the easiest fields for end-users to understand and they're among the most complete.  When these fields are too similar, it may indicate lazy data entry (copy/pasting from one field to the other).  When they're too dissimilar, it may indicate a lack of programmatic alignment.

In [9]:
#Extract and compare similarity of title and description

def find_text_in_dict(field):
    """Structure of certain fields varies, so we need to unwrap the nested dicts when this happens"""
    if type(field)==dict:
        try:
            field=field['narrative']
            if type(field)==dict:
                field=field['text']
        except KeyError:
            field=field['#text']
            if type(field)==dict:
                field=field['text']
    return field

def find_text(field):
    """
    If there are more than one dict in a list, extract and concat all text content together
    """
    if type(field)==list:
        output=''
        for i in field:
            try:
                output+=find_text_in_dict(i)
            except TypeError:
                pass
    else:
        output=find_text_in_dict(field)
    return output

def field_similarity(field1,field2):
    """Calculate cosine similarity between two TFIDF vectorized strings"""
    tfidf_vectorizer=TfidfVectorizer(min_df=0)
    tfidf_matrix=tfidf_vectorizer.fit_transform([field1,field2])
    cs=cosine_similarity(tfidf_matrix[0],tfidf_matrix[1])
    return cs[0][0]

def compare_title_description(activity,features):
    """
    Cosine similarity between title and description
    
    Very high values (close to 1) indicate that title and description are redundant
    Very low value (close to 0) indicate that title and description have nothing to do with each other
    """
    try:
        title=find_text(activity['title'])
        description=find_text(activity['description'])
        cs=field_similarity(title,description)
        features['title_description_similarity']=cs
    #KeyError - fields don't exist
    #ValueError - fields only contain stopwords
    #AttributeError - don't remember
    except (KeyError,AttributeError,ValueError):
        features['title_description_similarity']=None

In [10]:
def get_relative_sizes(activity,exist_fields,features):
    """
    Use the serialized storage space for different fields as a proxy for their relative size within a given activity
    """
    sizes={ field:0 for field in exist_fields}
    #Included in here is the features themselves, but those will be small relative to everything else
    total=sys.getsizeof(pickle.dumps(activity))
    for field in sizes:
        try:
            field_size=sys.getsizeof(pickle.dumps(activity[field]))
            sizes[field]=field_size/total
        except KeyError:
            pass
    features[u'total_size']=total
    features[u'relative_sizes']=sizes


### Other features

There are certain features, like the IATI organization ID, that we'll want to have easily accessible later on, so we'll pull them out and add them to our feature vector.  Other incomplete feature implementations are included below as well

In [11]:
def find_reporting_org(field):
    """
    Extract IATI ID of reporting org
    
    If there are more than one dict in a list, extract and concat all text content together
    
    This methodology is NOT perfect, but it gets about 210 distinct orgs in the dataset
    We'll need to make it more responsive to the varying IATI versions, but it's close enough
    for now.
    """
    if type(field)==list:
        output=field[1]['@ref']
    elif type(field)==dict:
        output=field['@ref']
    else:
        output=None
    return output

In [12]:
#Testing whether URLS are valid - not currently in pipeline

def test_links(list_of_links):
    """
    Find percentage of links in a list that seem to work
    
    The process of checking these can be tedious, and requires a lot of bandwidth,
    so it's probably best to implement this in the cloud if we want to scale across all activities
    """
    valid_links=0
    for link in activity[list_of_links]:
        url=link['@url']
        response=requests.get(url)
        if response.status_code==200:
            valid_links+=1
    ratio=valid_links/len(list_of_links)
    return ratio


In [61]:
# Initial implementation of activity date validation - INCOMPLETE
from collections import OrderedDict
date_dict=OrderedDict({'start-planned':None,'start-actual':None,'end-actual':None,'end-actual':None})
dates=activity['activity-date']
for date in dates:
    try:
        date_dict[date['@type']]=date['@iso-date']
    except KeyError:
        pass

### Generate Features

Having defined the functions needed to generate each feature, we can combine them to create a master pipeline function that we can run iteratively on the whole dataset. 

In [13]:
#Define master feature pipeline function

#lists of fields to check for existence
exist_fields=['description','budget','title','result','transaction','document-link','participating-org','activity-date','@default-currency','@xml:lang']
benford_fields=['budget','transaction']

def generate_features(activity):
    """Generate all features"""
    
    #Clear old features/make space if it doesn't already exist
    features={
        '_id': activity['_id']
    }
    
    #Find sizes of stuff
    get_relative_sizes(activity,exist_fields, features) #do this early on so it's less affected by the creation of other data
    
    #Look for missing data
    for field in exist_fields:
        check_missing_fields(activity,field, features)
        
    #Data quality validations
    compare_title_description(activity, features)
    
    #Benford's Law
    for field in benford_fields:
        benfords_law(activity,field, features)
        
    #Extract certain pieces of text to make it easier to parse later
    features['organization']=find_reporting_org(activity['reporting-org'])
    return features

In [14]:
#Test feature generation on one record
features=generate_features(activity)
p.pprint(features)

{'_id': ObjectId('575cce4d370d9309002fab43'),
 'benford_compliance_budget': None,
 'benford_compliance_transaction': None,
 'benford_distribution_budget': None,
 'benford_distribution_transaction': None,
 'missing_@default-currency': 0,
 'missing_@xml:lang': 0,
 'missing_activity-date': 0,
 'missing_budget': 1,
 'missing_description': 0,
 'missing_document-link': 0,
 'missing_participating-org': 0,
 'missing_result': 1,
 'missing_title': 0,
 'missing_transaction': 1,
 'organization': u'XM-DAC-41114',
 u'relative_sizes': {'@default-currency': 0.0028872709013306555,
                     '@xml:lang': 0.0028245041426060756,
                     'activity-date': 0.013055485814712529,
                     'budget': 0,
                     'description': 0.012490584986191313,
                     'document-link': 0.8017198091890535,
                     'participating-org': 0.025232237007280944,
                     'result': 0,
                     'title': 0.007218177253326638,
            

In [19]:
#GENERATE ALL FEATURES for all activities!
db.drop_collection('quality')
activities_count=activities.count()
print(datetime.now(), 'Started processing')
qual_recs=[]
for num, activity in enumerate(activities.find()):
    if num > 0 and num % 25000 == 0:
        print(datetime.now(), 'Processed', num, 'of', activities_count)
        db.quality.insert_many(qual_recs)
        qual_recs=[]
    #Do all the things!
    qual_recs.append(generate_features(activity))
if qual_recs == 0:
    db.quality.insert_many(qual_recs)
print(datetime.now(), 'Finished processing')

2016-08-07 22:51:51.129010 Started processing
2016-08-07 22:53:21.852721 Processed 25000 of 587828
2016-08-07 22:55:40.062204 Processed 50000 of 587828
2016-08-07 22:58:37.675166 Processed 75000 of 587828
2016-08-07 23:00:40.244372 Processed 100000 of 587828
2016-08-07 23:03:08.883897 Processed 125000 of 587828
2016-08-07 23:05:28.640606 Processed 150000 of 587828
2016-08-07 23:07:42.706206 Processed 175000 of 587828
2016-08-07 23:10:29.974419 Processed 200000 of 587828
2016-08-07 23:12:15.264503 Processed 225000 of 587828
2016-08-07 23:14:09.195195 Processed 250000 of 587828
2016-08-07 23:16:04.765043 Processed 275000 of 587828
2016-08-07 23:18:54.584844 Processed 300000 of 587828
2016-08-07 23:23:38.319558 Processed 325000 of 587828
2016-08-07 23:25:53.831625 Processed 350000 of 587828
2016-08-07 23:28:39.056037 Processed 375000 of 587828
2016-08-07 23:30:57.609548 Processed 400000 of 587828
2016-08-07 23:34:14.510184 Processed 425000 of 587828
2016-08-07 23:36:46.393922 Processed 45

In [20]:
#Sanity check - how many results have non-null results for the title-description similarity
db.quality.find({ 'title_description_similarity' : {'$ne' : None} }).count()

373300

In [21]:
db.quality.create_index('organization')

u'organization_1'

### Exploration of activity-level results

In [22]:
#EDA to figure out how much data is missing
grp={'_id': None}
for field in exist_fields:
    grp['missing_'+field]={'$sum': '$missing_'+field}
    grp['average_'+field]={'$avg': '$relative_sizes.'+field}
for field in benford_fields:
    grp['benford_'+field]={'$sum': '$benford_compliance_'+field}
grp['total']={'$sum': 1}

# Run the agg pipeline
results=next(db.quality.aggregate([{'$group': grp}]))

# Print the results
print('MISSING DATA')
p.pprint({k:v for k,v in results.items() if k.startswith('missing_')})
print("")
print("RELATIVE SIZES OF KEY FIELDS")
p.pprint({k:v for k,v in results.items() if k.startswith('average_')})
print("")
print("BENFORD COMPLIANCE (AMONG ACTIVITIES WITH DATA)")
p.pprint({k:v for k,v in results.items() if k.startswith('benford_')})
print("")
print("TOTAL ACTIVITIES")
print(results['total'])

MISSING DATA
{u'missing_@default-currency': 45932,
 u'missing_@xml:lang': 48588,
 u'missing_activity-date': 8279,
 u'missing_budget': 375860,
 u'missing_description': 43936,
 u'missing_document-link': 486121,
 u'missing_participating-org': 10014,
 u'missing_result': 529490,
 u'missing_title': 2136,
 u'missing_transaction': 33171}

RELATIVE SIZES OF KEY FIELDS
{u'average_@default-currency': 0.007377565125319809,
 u'average_@xml:lang': 0.00728123629187669,
 u'average_activity-date': 0.03261745793047522,
 u'average_budget': 0.030091984130244255,
 u'average_description': 0.05376616478223761,
 u'average_document-link': 0.02197509009883787,
 u'average_participating-org': 0.06411876690142511,
 u'average_result': 0.01235349445010078,
 u'average_title': 0.02097687662627871,
 u'average_transaction': 0.18001654781952756}

BENFORD COMPLIANCE (AMONG ACTIVITIES WITH DATA)
{u'benford_budget': 49230, u'benford_transaction': 205671}

TOTAL ACTIVITIES
575000


In [23]:
#Number of records by language
pipeline=[{"$group": {"_id": "$@xml:lang", "count": {"$sum": 1}}}]
list(activities.aggregate(pipeline))

[{u'_id': u'pt', u'count': 160},
 {u'_id': u'FR', u'count': 8749},
 {u'_id': u'de', u'count': 8813},
 {u'_id': u'es', u'count': 1699},
 {u'_id': u'fr', u'count': 1390},
 {u'_id': u'nl', u'count': 6026},
 {u'_id': u'EN', u'count': 1327},
 {u'_id': None, u'count': 49890},
 {u'_id': u'en', u'count': 509774}]

In [24]:
#Number of unique organizations
len(list(db.quality.aggregate([{"$group": {"_id": "$organization", "count": {"$sum": 1}}}])))

210

In [99]:
#Use this to lookup how a particular org shows up 
test_q=db.quality.find_one({'organization':'Netherlands Enterprise Agency'})
test=activities.find_one({'_id': test_q['_id']})
test['reporting-org']

{'@ref': 'Netherlands Enterprise Agency',
 '@secondary-reporter': 'false',
 '@type': '10',
 'narrative': {'#text': 'Netherlands Enterprise Agency', '@xml:lang': 'en'}}

## Aggregating activity features to the organization level

Now that we've generated features for each activity, we need to aggregate them together to display them to the end user.  We'll focus on aggregating results at the organization level, but leave the option open to recalculate the aggregation for any of the other fields (country, theme, etc.)

In [27]:
#Load pickles into memory for quickly lookups later
with open("lookup_by_ref.pickle",'rb') as f:
    lookup=pickle.load(f)
    
with open("lookup_by_name.pickle",'rb') as f:
    lookup_name=pickle.load(f)

In [28]:
def get_benford_distribution(orgid, field):
    """Calculate Benford distribution for an organization across all its activity records"""
    qdata=db.quality.find({'organization':orgid})
    count=0
    running_distribution=[0,0,0,0,0,0,0,0,0,0]
    for qrec in qdata:
        try:
            running_distribution=map(add,running_distribution,qrec['benford_distribution_'+field])
            count+=1
        except TypeError:
            #print i['features']['benford_distribution_'+field]
            continue
    if count>0:
        final_distribution=[100*i/count for i in running_distribution]
    else:
        final_distribution=running_distribution
    final_dict={str(i):final_distribution[i] for i in range(0,10)}
    return final_dict

In [48]:
#Extract aggregated results for a given grouping (we'll start at the org level for now)

exist_fields=['description','budget','title','result','transaction','document-link','participating-org','activity-date','@default-currency','@xml:lang']
size_exclude_fields=['activity-date','@default-currency','@xml:lang']
benford_fields=['budget','transaction']

def query_orgs(group_field, group_val=None):
    """
    Collect aggregated results across our feature space for a given field.
    Note that most of these aggregations are simple averages across all activities
    for this prototype.
    
    This implementation isn't particularly fast because it makes repeated calls to the mongo collection.
    There are certainly opportunities for optimization by gathering a subset of results 
    into memory and aggregating from there.
    
    """
    pipeline=[]
    # Do we have a single org filter?
    if group_val:
        pipeline.append({"$match": {group_field: group_val}}) #Filter for a given subset of records
    # get the fields, grouping by org ID (or whatever is passed in)
    pipeline.append({'$group': {'_id': '$'+group_field,
                                    #CONTACT INFO
                                    'contact-info': {'$first': '$contact-info'},
                                    #TOTAL
                                    'total': {'$sum': 1},
                                    #SIMILARITY
                                    'title_description_similarity': {'$avg': '$title_description_similarity'},
                                    #DOC SIZE
                                    'avg': {'$avg': '$total_size'},
                                    'min': {'$min': '$total_size'},
                                    'max': {'$max': '$total_size'},
                                    #Doesn't exist in Mongo 3.0.4 (which is what I'm using)
                                    #'sd': {'$stdDevPop': '$total_size'},
                                   }})
    #MISSING DATA
    for field in exist_fields:
        pipeline[-1]['$group']['missing_'+field]={'$avg': '$missing_'+field}
        
        #FIELD SIZE DATA
        if field not in size_exclude_fields:
            pipeline[-1]['$group']['size_'+field]={'$avg': '$relative_sizes.'+field}
    
    #BENFORD COMPLIANCE (AMONG ACTIVITIES WITH DATA)
    for field in benford_fields:
        pipeline[-1]['$group']['benford_'+field]={'$avg': '$benford_compliance_'+field}

    # Run the pipeline and yield results as they are available
    for results in db.quality.aggregate(pipeline):
        orgid=results['_id']
        if not orgid: # discard blank and null
            continue
        # figure out the name
        orgnames=lookup[orgid]
        if orgnames and isinstance(orgnames, set):
            orgname=next(iter(orgnames))
        elif orgnames and isinstance(orgnames, six.string_types):
            orgname=orgnames
        else:
            continue
        # Collect Missing Data
        exist_results={}
        size_results={}
        for field in exist_fields:
            exist_results[field]=results['missing_'+field]
            #FIELD SIZE DATA
            if field not in size_exclude_fields:
                size_results[field]=results['size_'+field]
        # Collect Benford Fields
        benford_results={}
        for field in benford_fields:
            benford_results[field]=results['benford_'+field]
            benford_results[field+'_distribution']=get_benford_distribution(orgid,field)
            
        #Output result as JSON for easy parsing/storing in Mongo
        yield {'organization_id': orgid,
                'organization_name': orgname,
                'contact_info': results['contact-info'] or {},
                'doc_size': {
                    'avg': results['avg'],
                    'min': results['min'],
                    'max': results['max']
                },
                'records': results['total'],
                'missing_data':exist_results,
                'title_description_similarity':results['title_description_similarity'],
                'relative_size':size_results,
                'benford_compliance':benford_results
               }

In [55]:
len(list(db.quality.aggregate([{'$group': {'_id': '$organization'}}])))

210

In [56]:
#Test aggregation function
output=query_orgs('organization', 'GB-COH-04154075')
p.pprint(next(iter(output)))

{'benford_compliance': {'budget': 0.2,
                        'budget_distribution': {'0': 0.0,
                                                '1': 16.666,
                                                '2': 10.0,
                                                '3': 10.0,
                                                '4': 10.0,
                                                '5': 10.0,
                                                '6': 20.0,
                                                '7': 10.0,
                                                '8': 13.334,
                                                '9': 0.0},
                        'transaction': 0.4,
                        'transaction_distribution': {'0': 0.0,
                                                     '1': 29.798000000000002,
                                                     '2': 14.431999999999999,
                                                     '3': 8.212,
                                        

In [50]:
%%time
db.drop_collection('cleaned_orgs_full')
orgs=db.cleaned_orgs_full
batch=[]
for num, results in enumerate(query_orgs('organization')):
    batch.append(results)
    if(len(batch)==100):
        print(datetime.now(), "- inserting {} orgs".format(len(batch)))
        orgs.insert_many(batch)
        batch=[]
if batch:
    print(datetime.now(), "- inserting {} orgs".format(len(batch)))
    orgs.insert_many(batch)

2016-08-09 00:06:31.913856 - inserting 100 orgs
2016-08-09 00:06:45.681419 - inserting 100 orgs
2016-08-09 00:06:48.040088 - inserting 8 orgs
CPU times: user 25.7 s, sys: 3.06 s, total: 28.8 s
Wall time: 37.4 s


In [51]:
orgs.create_index('organization_id')

u'organization_id_1'

In [52]:
#Verify that this matches the manual test from above
p.pprint(orgs.find_one({'organization_id':'GB-COH-04154075'}))
orgs.count()

{u'_id': ObjectId('57a956c7370d933677eeb855'),
 u'benford_compliance': {u'budget': 0.2,
                         u'budget_distribution': {u'0': 0.0,
                                                  u'1': 16.666,
                                                  u'2': 10.0,
                                                  u'3': 10.0,
                                                  u'4': 10.0,
                                                  u'5': 10.0,
                                                  u'6': 20.0,
                                                  u'7': 10.0,
                                                  u'8': 13.334,
                                                  u'9': 0.0},
                         u'transaction': 0.4,
                         u'transaction_distribution': {u'0': 0.0,
                                                       u'1': 29.798000000000002,
                                                       u'2': 14.431999999999999,
               

208