# Open Data Toronto: Data Quality Score (DQS)
This is the code behind the DQS, so we can share how the score is calculated. The goal of the DQS is to geive users an idea of how good data is without having to view it and give publishers pointers for improving their data - and an incentive to create _better_ datasets, not just _more_ of them.

> High quality data enables high-quality impact

### Resources
Although a high-level overview of the DQS is provided throughout the notebook, it is highly recommended to read the articles where we share in detail why the DQS was created and how (beyond the code):

* <a href="https://medium.com/@careduz/towards-a-data-quality-score-in-open-data-part-1-525e59f729e9">Towards a Data Quality Score in open data (part 1) - Why Open Data Toronto created a score to assess data quality and what it measures</a>
* <a href="https://medium.com/@careduz/towards-a-data-quality-score-in-open-data-part-2-3f193eb9e21d">Towards a Data Quality Score in open data (part 2) - How we created the DQS: a walkthrough for organizations facing similar challenges</a>

### Notes
This is a working notebook - hence "TODO" comments throughout the code to serve as reminders for us.

In [1]:
import json
import math
import re

import ckanapi
import geopandas as gpd
import nltk
import numpy as np
import pandas as pd
import requests

from nltk.corpus import wordnet
from shapely.geometry import shape
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm

from datetime import datetime as dt

nltk.download('wordnet')

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

## 1. Scoring DQS dimensions
This version of the DQS is calculated from the scores of 5 underlying dimensions, each with a weight towards the final score, which are in turn scored based on equal-weighted metrics we use for measuring the dataset's performance in that dimension.

This section contains the functions for scoring the dimensions. Each function checks every data "resource" in an individual dataset (in CKAN these are called "packages", hence the terminology used in the code), and the dataset score is the average of the resource scores.

For example: if a dataset has 2 CSV files, each is scored across the dimensions and assigned a DQS, then the dataset DQS is the average of the resource scores.

### Usability
Accounts for 38% of the DQS. It is composed of 3 metrics.

#### 1. Meaningful column names: proportion of words in column name in English
To determine this, we first split column names by "\_", " ", "-" and camelCase. For example:

* bdngHeightMetres ==> ["bdng", "height", "metres"]
* bdng_height_metres ==> ["bdng", "height", "metres"]
* bdng height metres ==> ["bdng", "height", "metres"]

Then we look at the portion of English words in the resulting array. In this case, 2/3 words are English which means the name gets a "meaning score" of 66%.

If the meaning score is above a threshold, which we set to 80\% for now, then the column counts is said to have a "meaningful name".

This is done for all the columns in the dataset, so if 7 of 10 column are meaningful, the dataset scores 70\% for this metric.

#### 2. Valid geometries (geospatial only)

This is an automatic check for ensuring the geometry is well-formed across 2 dimensions. A common issue of invalid geometries is the inability to visualize them or work with them - often requires cleanup, thus making them more difficult to use.

#### 3. Proportion of columns with a single value (a constant)

Checks for columns with a single value, which adds cognitive load to the user - since it's a constant, this could be captured elsewhere (e.g. metadata).

In [2]:
def score_usability(columns, data):
    '''
        How easy is it to use the data given how it is organized/structured?
        
        TODO's: 
            * level of nested fields?
            * long vs. wide?
            * if ID columns given, are these ID's common across datasets?
    '''
    
    def parse_col_name(s):
        camel_to_snake = re.sub(
            '([a-z0-9])([A-Z])', 
            r'\1_\2', 
            re.sub('(.)([A-Z][a-z]+)', r'\1_\2', s)
        ).lower()

        return camel_to_snake == s, [x for x in re.split('-|_|\s', camel_to_snake) if len(x)]

    metrics = {
        'col_names': 0, # Are the column names easy to understand?
        'col_constant': 1 # Are there columns where all values are constant?
    }
    
    for f in columns:
        is_camel, words = parse_col_name(f['id'])
        eng_words = [ w for w in words if len(wordnet.synsets(w)) ]

        if len(eng_words) / len(words) > 0.8:
            metrics['col_names'] += (1 if not is_camel else 0.5) / len(columns)
        
        if not f['id'] == 'geometry' and data[f['id']].nunique() <= 1:
            metrics['col_constant'] -= 1 / len(columns)
    
    if isinstance(data, gpd.GeoDataFrame):
        counts = data['geometry'].is_valid.value_counts()
        
        metrics['geo_validity'] = 1 - (counts[False] / (len(data) * 0.05)) if False in counts else 1
    
    return np.mean(list(metrics.values()))

### Metadata
Accounts for 25% of total weight. Ideally would measure whether the metadata is provided _and_ also how well described it is - this MVP version of the DQS focuses only on the former, whether the metadata exists, because the latter requires much more involvement... namely NLP and a training set. Hopefully that will follow in the future.

#### Metadata fields checked
Not all metadata fields are checked because many are required for publication, meaning they will always be present. Fields checked are:
* Collection method
* Limitations
* Topics
* Owner Email - _opendata@toronto.ca_ does not count because it's the default. Ideally, publishers will provide an email address (personal or distribution list) that gives users a direct connection to those who know the data.
* Column Descriptions

Fields get a value of 1 if there is metadata, and 0 if there isn't. The metric score is the mean across all fields - for instance: if 5 of 10 fields are filled, the dataset gets a 50% metadata score.

In [3]:
METADATA_FIELDS = ['collection_method', 'limitations', 'topics', 'owner_email']

def score_metadata(package, columns):
    '''
        How easy is it to understand the context of the data?
        
        TODO's: 
            * Measure the quality of the metadata as well
    '''
    
    metrics = {
        'desc_dataset': 0, # Does the metadata describe the dataset well?
        'desc_columns': 0 # Does the metadata describe the data well?
    }
    
    for field in METADATA_FIELDS:
        if field in package and package[field] and not (field == 'owner_email' and 'opendata' in package[field]):
            metrics['desc_dataset'] += 1 / len(METADATA_FIELDS)
            
    for f in columns:
        if 'info' in f and len(f['info']['notes']) and f['info']['notes'].strip() != f['id']:
            metrics['desc_columns'] += 1 / len(columns)

    return np.mean(list(metrics.values()))

### Freshness
Accounts for 18% of total weight. It is composed of 2 metrics, with equal weights towards the Freshness dimension score.

**Note**: If dataset is real-time, automatically receives 100%... doesn't get any more fresh than that.

#### 1. Duration between published refresh rate and when data was last refreshed
Say a dataset is supposed to be refreshed _daily_ but has not been updated for 7 days. In this case, the dataset is 7 periods behind.

Dataset scores 0 for this metric if more than 2 periods have elapsed since last refresh. Otherwise, the score drops gradually between 1 and periods missed.

#### 2. Duration between today and date last refreshed
We decided on this based on helpful feedback from the Data Quality Working Group and the community. It simply considers the time between the dataset was last refreshed and today - regardless of the periods. Metric starts decreasing after 6 months, and reaches 0 after 3 years. 

We accept this will put datasets with lengthy refresh rates, yearly or above (e.g. census), or datasets not refreshed at a disadvantage because the goal is to encourage more timely datasets.

In [4]:
TIME_MAP = {
    'daily': 1,
    'weekly': 7,
    'monthly': 30,
    'quarterly': 52 * 7 / 4,
    'semi-annually': 52 * 7 / 2,
    'annually': 365
}

def score_freshness(package):
    '''
        How up to date is the data?
    '''
    
    metrics = {}
    
    rr = package['refresh_rate'].lower()
    
    if rr == 'real-time':
        return 1
    elif rr in TIME_MAP and 'last_refreshed' in package and package['last_refreshed']: 
        days = (dt.utcnow() - dt.strptime(package['last_refreshed'], '%Y-%m-%dT%H:%M:%S.%f')).days
        
        # Greater than 2 periods have a score of 0
        metrics['elapse_periods'] = max(0, 1 - math.floor(days / TIME_MAP[rr]) / 2)
        
        # Decrease the score starting from ~0.5 years to ~3 years
        metrics['elapse_days'] = 1 - (1 / (1 + np.exp(4 * (2.25 - days/365))))
        
        return np.mean(list(metrics.values()))
    
    return 0

### Completeness
Accounts for 12% of total weight. This is a simple one: % of cells with a value (1 - proportion of empty cells).

In [5]:
def score_completeness(data):
    '''
        How much of the data is missing?
    '''
    return 1 - (np.sum(len(data) - data.count()) / np.prod(data.shape))

### Accessibility
Accounts for 7% of total weight. Although all datasets in scope are available via API (since they are all in the datastore), they are differentiated in this metric:

1. Datasets received as flat CSV files (which are pushed to the datastore automatically) get 50%
2. Datasets automatically pushed to the datastore on a schedule (which is captured in "extract_job" resource field) get 100%

This is to encourage publishers to increasingly adopt automated refresh methods while, at the same time, recognizing those who provide data in an open, machine-readable format.

In [6]:
def score_accessibility(resource):
    '''
        Is data available via APIs?
    '''
    return 1 if 'extract_job' in resource and resource['extract_job'] else 0.5

## 2. Calculating dimension weights
Dimension weights are calculated based on rank weighting methods - after trying several, we decided on "SR" (Sum Reciprocal). This function takes the array of dimensions and returns the weights.

To learn more about the weighting method, refer to the resources articles shared at the beginning of this notebook (Part 2)

In [7]:
def calculate_weights(dimensions, method='sr'):
    N = len(dimensions)
    
    if method == 'sr':
        denom = np.array([ ((1 / (i + 1)) + ((N + 1 - (i + 1)) / N)) for i, x in enumerate(dimensions) ]).sum()
        weights = [ ((1 / (i + 1)) + ((N + 1 - (i + 1)) / N)) / denom for i, x in enumerate(dimensions) ]
    elif method == 'rs':
        denom = np.array([ (N + 1 - (i + 1)) for i, x in enumerate(dimensions)]).sum()
        weights = [ (N + 1 - (i + 1)) / denom for i, x in enumerate(dimensions) ]
    elif method == 'rr':
        denom = np.array([ 1 / (i + 1) for i, x in enumerate(dimensions) ]).sum()
        weights = [ (1 / (i + 1)) / denom for i, x in enumerate(dimensions) ]
    elif method == 're':
        exp = 0.2
        denom = np.array([ (N + 1 - (i + 1)) ** exp for i, x in enumerate(dimensions) ]).sum()
        weights = [ (N + 1 - (i + 1)) ** exp / denom for i, x in enumerate(dimensions) ]
    else:
        raise Exception('Invalid weighting method provided')
    
    return weights

## 3. Utility functions for scoring in CKAN


#### Reading resource from CKAN datastore

In [8]:
def read_datastore(ckan, rid, rows=10000):
    records = []
    
    is_geospatial = False
    
    has_more = True
    while has_more:
        result = ckan.action.datastore_search(id=rid, limit=rows, offset=len(records))
        
        records += result['records']
        has_more = len(records) < result['total']
    
    df = pd.DataFrame(records).drop('_id', axis=1)
    
    if 'geometry' in df.columns:
        df['geometry'] = df['geometry'].apply(lambda x: shape(json.loads(x)))
        
        df = gpd.GeoDataFrame(df, crs={'init': 'epsg:4326'})
    
    return df, [x for x in result['fields'] if x['id'] != '_id']

#### Retrieving DQS framework from Open Data Toronto CKAN
This contains the package, and resources, for the DQS in our portal. We will need this to write the results 

In [9]:
PACKAGE_DQS = 'catalogue-quality-scores'

def get_framework(ckan, pid=PACKAGE_DQS):
    try:
        framework = ckan.action.package_show(id=pid)
    except ckanapi.NotAuthorized:
        raise Exception('Permission required to search for the framework package')
    except ckanapi.NotFound:
        raise Exception('Framework package not found')
    
    return {
        r['name']: r for r in framework.pop('resources')
    }

### Building an HTTP Response (placeholder - not yet in use)
Will use this when moving the function to a serverless execution platform such as AWS Lambda or GCP Cloud Functions.

In [10]:
def build_response(code, message=''):
    response = {
        'statusCode': code,
        'headers': {
            'Access-Control-Allow-Origin': '*',
        }
    }

    if message:
        response['body'] = message

    return response

## 4. Bringing it all together: scoring the catalogue

In [11]:
RESOURCE_MODEL = 'scoring-models'
MODEL_VERSION = 'v0.1.0'

RESOURCE_SCORES = 'catalogue-scorecard'

DIMENSIONS = ['usability', 'metadata', 'freshness', 'completeness', 'accessibility'] # Ranked in order

BINS = {
    'Bronze': 0.6,
    'Silver': 0.8,
    'Gold': 1,
}

In [13]:
def score_catalogue(event={}, context={}):
    ckan = ckanapi.RemoteCKAN(**{
        'address': '', # CKAN URL here
        # 'apikey' here to write back to CKAN
    })
    
    weights = calculate_weights(DIMENSIONS)
    fw = {
        'aggregation_methods': {
            'metrics_to_dimension': 'avg',
            'dimensions_to_score': 'sum_and_reciprocal'
        },
        'dimensions': [
            {
                'name': dim,
                'rank': i + 1,
                'weights': wgt,
            } for i, (dim, wgt) in enumerate(zip(DIMENSIONS, weights))
        ],
        'bins': BINS
    }
    
    packages = ckan.action.current_package_list_with_resources(limit=500)

    storage = get_framework(ckan)

    data = []
    for p in tqdm(packages, 'Datasets Scored'):
        for r in p['resources']:
            if not 'datastore_active' in r or not r['datastore_active']:
                continue

            content, fields = read_datastore(ckan, r['id'])

            data.append({
                'package': p['name'],
                'resource': r['name'],
                'usability': score_usability(fields, content),
                'metadata': score_metadata(p, fields),
                'freshness': score_freshness(p),
                'completeness': score_completeness(content),
                'accessibility': score_accessibility(r)
            })
    
    # create JSON resource to capture scoring model details, if not already in storage
    if not RESOURCE_MODEL in storage and ckan.apikey:
        r = requests.post(
            '{0}/api/3/action/resource_create'.format(ckan.address),
            data={
                'package_id': PACKAGE_DQS,
                'name': RESOURCE_MODEL,
                'format': 'json',
                'is_preview': False,
                'is_zipped': False
            },
            headers={
                'Authorization': ckan.apikey
            },
            files={
                'upload': ('{0}.json'.format(RESOURCE_MODEL), json.dumps({}))
            }
        )
        
        storage[RESOURCE_MODEL] = json.loads(r.content)['result']

    r = requests.get(
        storage[RESOURCE_MODEL]['url'],
        headers={
            'Authorization': ckan.apikey
        }
    )

    scoring_methods = json.loads(r.content)
    scoring_methods[MODEL_VERSION] = fw

    r = requests.post(
        '{0}/api/3/action/resource_patch'.format(ckan.address),
        data={
            'id': storage[RESOURCE_MODEL]['id']
        },
        headers={
            'Authorization': ckan.apikey
        },
        files={
            'upload': ('{0}.json'.format(RESOURCE_MODEL), json.dumps(scoring_methods))
        }
    )
    
    df = pd.DataFrame(data).set_index(['package', 'resource'])

    scores = pd.DataFrame([weights] * len(df.index))
    scores.index = df.index
    scores.columns = DIMENSIONS

    scores = df.multiply(scores)

    df['score'] = scores.sum(axis=1)
    df['score_norm'] = MinMaxScaler().fit_transform(df[['score']])

    df = df.groupby('package').mean()

    
    labels = list(BINS.keys())
    
    bins = [-1]
    bins.extend(BINS.values())
    
    df['grade'] = pd.cut(df['score_norm'], bins=bins, labels=labels)
    df['grade_norm'] = pd.cut(df['score_norm'], bins=bins, labels=labels)

    df['recorded_at'] = dt.now().strftime('%Y-%m-%dT%H:%M:%SZ')
    df['version'] = MODEL_VERSION

    df = df.reset_index()
    df = df.round(2)
    
    if not ckan.apikey:
        return df
    
    if not RESOURCE_SCORES in storage:
        storage[RESOURCE_SCORES] = ckan.action.datastore_create(
            resource={
                'package_id': PACKAGE_DQS,
                'name': RESOURCE_SCORES,
                'format': 'csv',
                'is_preview': True,
                'is_zipped': True
            },
            fields=[ { 'id': x} for x in df.columns.values ],
            records=df.to_dict(orient='row')
        )
    else:
        ckan.action.datastore_upsert(
            method='insert',
            resource_id=storage[RESOURCE_SCORES]['id'],
            records=df.to_dict(orient='row')
        )
    
    # build_response(200, message='')
    return df

In [14]:
scores = score_catalogue()

Datasets Scored: 100%|██████████| 326/326 [03:27<00:00,  1.57it/s]  


In [15]:
scores.head(5)

Unnamed: 0,package,usability,metadata,freshness,completeness,accessibility,score,score_norm,grade,grade_norm,recorded_at,version
0,air-conditioned-and-cool-spaces-heat-relief-ne...,0.86,0.84,0.5,0.69,1.0,0.78,0.74,Silver,Silver,2020-02-10T15:48:39Z,v0.1.0
1,air-conditioned-public-places-cooling-centres,0.85,0.25,0.0,1.0,0.5,0.54,0.3,Bronze,Bronze,2020-02-10T15:48:39Z,v0.1.0
2,annual-summary-of-reportable-communicable-dise...,0.69,0.25,1.0,0.98,0.5,0.66,0.51,Bronze,Bronze,2020-02-10T15:48:39Z,v0.1.0
3,apartment-building-evaluation,0.94,0.75,1.0,0.96,1.0,0.91,0.97,Gold,Gold,2020-02-10T15:48:39Z,v0.1.0
4,apartment-building-registration,0.59,0.69,1.0,0.49,1.0,0.71,0.6,Silver,Silver,2020-02-10T15:48:39Z,v0.1.0
