# Anomaly Detection: Using Deep Learning Methods to Monitor the Health of Automated Datasets  

I created this proposal and proof of concept for a hackweek project at Tyler Technologies in early 2023. The datasets discussed here are data assets that are housed in Tyler customer SaaS platforms as tabular datasets. In the use case described below, datasets are hosted and published in a given customer environment, and have accompanying ETL implemented such that data from their system of record – a separate database hosted elsewhere – is pulled on a given cadence through a scheduled process. The scheduled process is itself another Tyler product, which connects source systems to the Tyler environment, so that data can be visualized and shared. 


### Problem

Can we build infrastructure/services that monitor the health of a scheduled dataset?

### Use Case

Our customers often automate datasets that are connected to their own source databases. These datasets are updated automatically, based on some specified cadence. Update failures, when they happen, are typically due to schema mismatches -- when the update data doesn't match the existing dataset in schema -- or connectivity issues to the source system. These errors are caught and made obvious by our established error handling infrastructure.

There are however cases where an update can be faulty in terms of the *content* being published -- fewer rows than expected, erroneous values in a column, etc. -- and there is currently no tracking or error handling available to monitor for these types of issues. In other words, an update can run successfully, and the dataset can publish as expected, but the content of the data is inaccurate. Downstream metrics, vizzes, and reports will be inaccurate as well.

Today, in order to validate the content of dataset updates, customers have to query their datasets themselves, and then proceed to compare the data to a previous update, or to data from the source system (or some other, likely manual, QA process).

This method of issue remediation is costly. Customers must be proactive in monitoring their updates by establishing their own manual, labor-intensive QA process. Internally, Data Integrations spends more time troubleshooting data issues; because these "silent failures" often originate from the customer source system, DI engineers must investigate database architecture remotely, with little to no knowledge of or access to the database itself.

Can we instead provide an opt-in service that automatically checks a dataset each time an update is published in the platform? The service would assess the dataset for completeness, obvious anomalies, or other inconsistencies that we prioritize. If health checks don't pass, the customer can be alerted by the service that their dataset may not have updated properly and take action.

### Building a Proof of Concept: Tasks for the Build

The testing process and development process for a net-new monitoring service are linked, in that we will need to build an MVP of the service before we can fully understand the requirements to build in production. The approach laid out here is expected to be iterative and change along the way, but at a high level, the testing and development process can be broken out into phases:

1.  **Collect test data**
    1.  In order to understand whether we *can* implement automated dataset monitoring, we need to know which specific events to monitor for, and how to obtain comprehensive data on these events.
        1.  Tasks:
            1.  Identify monitoring criteria.
            2.  Collect a sample of the data. The sample should represent the dataset in its healthy state, as well as in an error state. If possible, the proportion of healthy observations to erroneous ones in the sample should be as realistic as possible. If the anomaly event is rare, there should only be a small number of errors in the sample, and a large number of "valid" records.

2.  **Create test environment**
    1.  What IT resources are needed for testing? Can this be done with a demo domain and a local dev environment?
        1.  Tasks:
            1.  Identify and set up testing framework

3.  **Create archive dataset** (first time only)
    1.  In order to monitor a dataset, we need to begin capturing metadata on the dataset each time an update happens. Dataset metadata will be captured in an archive dataset which lives on the same domain as the dataset to be monitored.
        1.  Tasks:
            1.  Define the metadata that should be captured on the dataset.
            2.  Create a script that generates an archive dataset for the dataset being monitored, where each column is a specific piece of metadata.

4.  **Capture updates**
    1.  Each time the monitored dataset undergoes an update, the archive repository dataset should capture metadata on the updates.
        1.  Tasks:
            1.  Create a script that runs whenever an update is made to the monitored dataset. The script should capture any of the defined pieces of metadata and append them to the archive repository dataset.

5.  **Analyze updates**
    1.  With each update of the monitored dataset, analyze the archive data and determine the health of the dataset.
        1.  Tasks:
            1.  Set up model competition and analyze performance
                1.  PCA
                2.  Some kind of random forest?
                3.  Some kind of neural network?
                4.  Naive methods?
            2.  Write script to assess the latest dataset update for health, using the best modeling method/s from the model competition. The archive history records are used as training data, and the most recent archive record is the test.
            3.  Write the data from model assessment to a dataset? Maybe back to the archive dataset?
                1.  Could be interesting to create a viz from the results

6.  **If anomaly is detected, notify dataset owners**

    1.  The notification method is dependent on customer needs, and also on the implementation path; do we need input from the customer in order to start this service on their domain, or can we just "turn on" the service?
        1.  It should be said that this is a service meant to empower the dataset owners. The expectation is that customers will use this tool to monitor their own datasets. We internally take on the responsibility of monitoring the monitoring service, but we do not monitor datasets for the customer.
        2.  Considering this, I strongly recommend that we have a requirements-gathering exchange with the customer as a pre-req for turning on the service. This would be a stop-gap to ensure that there is an agreement of customer ownership, giving us the ability to designate a real person who will receive and be custodian of any notifications.

7.  **Operationalize**
    1.  Scripts can run in Airflow, with the DAG cadence set to one of the following:
        1.  A brief time after the dataset is regularly scheduled to update
            1.  pros:
                1.  The job only runs as often as necessary
            2.  cons:
                1.  If a dataset takes too long to finish updating, the DAG could kick off before the update is complete, thereby missing the newest data
                2.  Manual data pulls would not be caught until the next scheduled runtime
        2.  Very frequently
            1.  pros:
                1.  We catch way more of the updates faster
                2.  If any of the scripts takes a long time / heavy resources to execute, the DAG could fail more often

### Collecting Test Data for Anomaly Detection Monitoring

_Project docs (Confluence): [Anomaly Detection Modeling](https://socrata.atlassian.net/wiki/spaces/~5d02b4deaf3a8f0c58e401e8/pages/2450227276/Anomaly+Detection+Modeling)_

**Objective:** Collect a sample of the data. The sample should represent the dataset in its healthy state, as well as in an error state. If possible, the proportion of healthy observations to erroneous ones in the sample should be as realistic as possible. If the anomaly event is rare, there should only be a small number of errors in the sample, and a large number of “valid” records.

**Test Case:** Filings (3 Years w/ rollups)

**Socrata dataset:** https://internal-aoic.data.socrata.com/dataset/Filings-3-Years-w-rollups-/rt4a-u7gh

**Data Source:** At this stage, we aim to collect metadata on the number of rows added, updated, and deleted with each update. The data dictionary below describes the schema of the data we'll use for modeling:

| Column       | Description |
| :----------  | :---------- |
| timestamp    | String   |
| rows_deleted | String   |
| rows_updated | String   |
| rows_created | String   |

These metadata are not tracked by Socrata APIs, but we can pull this info from the Sumo logs. _Further down the road, we should see whether this info can be captured directly, assuming we have our plugins write out these details to the agent logs._ At the end of this notebook is a sample of info we could collect from the revisions endpoint, if need be.

In [85]:
import os
import re
import requests
import pandas as pd
import json
from Sumo_util import Sumo # <- this utility is just a local copy of dia-integrations-server/Utils/Sumo.py

domain = 'https://internal-aoic.data.socrata.com'
dataset = 'rt4a-u7gh'
username = os.environ['MY_SOCRATA_USERNAME']
password = os.environ['MY_SOCRATA_PASSWORD']

In [113]:
sumo = Sumo()

# Since the dataset was created on June 27, 2022, we need to pull all the updates for the lifetime of the dataset
response = sumo.search(
                        query="rows update summary AND rt4a-u7gh",
                        fromDate="2022-06-26T00:00:00", 
                        toDate="2023-01-11T00:00:00",
                        timezone="GMT")

Request Sumo search: {'query': 'rows update summary AND rt4a-u7gh', 'from': '2022-12-25T00:00:00', 'to': '2022-12-31T00:00:00', 'timeZone': 'GMT'}
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Request status: GATHERING RESULTS...
Found 6 messages...


In [195]:
df

Unnamed: 0,timestamp,rows_deleted,rows_updated,rows_created
0,2022-12-30 09:00:54,0,9122,24847
1,2022-12-29 09:02:49,0,9721,24611
2,2022-12-28 09:02:37,0,17968,21595
3,2022-12-27 09:04:30,0,1156,2361
4,2022-12-26 09:03:02,0,4,705
5,2022-12-25 09:00:21,55,295,489


In [192]:
[print(x) for x in response]

2022-12-30 09:00:54,044 INFO  [mq2tl677un0aoa38ekg0tpaqh1hobapq] [59931 / internal-aoic.data.socrata.com id/rt4a-u7gh.json] resources.RowsService - rows update summary DatasetWriter nbe rt4a-u7gh {Rows Deleted=0, Rows Updated=9122, Rows Created=24847}
2022-12-29 09:02:49,228 INFO  [74ngis0f7b4ojebbtefa78thqq0n54ts] [52177 / internal-aoic.data.socrata.com id/rt4a-u7gh.json] resources.RowsService - rows update summary DatasetWriter nbe rt4a-u7gh {Rows Deleted=0, Rows Updated=9721, Rows Created=24611}
2022-12-28 09:02:37,914 INFO  [ncfsmtna580le89s1sqoso2qbnl52n9v] [46199 / internal-aoic.data.socrata.com id/rt4a-u7gh.json] resources.RowsService - rows update summary DatasetWriter nbe rt4a-u7gh {Rows Deleted=0, Rows Updated=17968, Rows Created=21595}
2022-12-27 09:04:30,798 INFO  [0cmr8mn4l50kbq6a6kq9qb00iidfrtd9] [36270 / internal-aoic.data.socrata.com id/rt4a-u7gh.json] resources.RowsService - rows update summary DatasetWriter nbe rt4a-u7gh {Rows Deleted=0, Rows Updated=1156, Rows Create

[None, None, None, None, None, None]

In [193]:
# re.search('{(.*)}', response[0])[1].split(', ')[0].split('=')[1]

In [188]:
# result = re.search('{(.*)}', response[0])


# row_data = re.search('{(.*)}', response[0])[1]
# for x in row_data.split(', '):
#     df[x.split('=')[0]] = []
    
    
# for x in result[1].split(', '):
#     df[x.split('=')[0]] = re.search('\=(.*)', x)[1]

In [194]:
# Create header for df
df_dict = {'timestamp':[], 'rows_deleted': [], 'rows_updated': [], 'rows_created': []}

# Parse responses for update metadata
for x in response:
    df_dict['timestamp'].append(x[:19])
    df_dict['rows_deleted'].append(re.search('{(.*)}', x)[1].split(', ')[0].split('=')[1])
    df_dict['rows_updated'].append(re.search('{(.*)}', x)[1].split(', ')[1].split('=')[1])
    df_dict['rows_created'].append(re.search('{(.*)}', x)[1].split(', ')[2].split('=')[1])
    
df = pd.DataFrame(df_dict)
    
#     for y in re.search('{(.*)}', x)[1].split(', '):
#         key = y.split('=')[0]
#         value = y.split('=')[1]
#         df[key].extend(value)
    
#     df[x.split('=')[0]] = None
#     for y in result[1].split(', '):
#         re.search('\=(.*)', y)[1]


In [94]:
df.keys() = ['updated', 'deleted']

SyntaxError: cannot assign to function call (4242815025.py, line 1)

In [56]:
timestamp = []

[timestamp.append(x[:19]) for x in response]

What metadata can we get from the `revisions` endpoint?

In [94]:
dataset_id = 'rt4a-u7gh'

response = requests.get(f'{domain}/api/publishing/v1/revision/{dataset_id}', 
                        auth=(username,password), 
                        stream=True)
revisions = response.json()

In [96]:
f'{domain}/api/publishing/v1/revision/{dataset_id}'

'https://internal-aoic.data.socrata.com/api/publishing/v1/revision/rt4a-u7gh'

In [66]:
revisions[0].keys()

dict_keys(['resource', 'links'])

In [70]:
rev_1_resource = revisions[0]['resource']
rev_2_resource = revisions[1]['resource']
rev_3_resource = revisions[2]['resource']
rev_4_resource = revisions[3]['resource']
rev_5_resource = revisions[4]['resource']

In [73]:
revisions[19]

{'resource': {'updated_at': '2022-12-19T06:02:15.968664Z',
  'task_sets': [{'updated_at': '2022-12-19T06:02:13.730281Z',
    'status': 'successful',
    'request_id': '1l01l06pgjjgjilhe0ud30c1ip5o8iba',
    'output_schema_id': 63891532,
    'log': [{'time': '2022-12-19T06:01:45',
      'stage': 'upsert_task',
      'details': {'truth_data_version': 530,
       'total_rows': 1563,
       'error_count': 0,
       'Rows Updated': 644,
       'Rows Deleted': 0,
       'Rows Created': 918,
       'Errors': 1}},
     {'time': '2022-12-19T06:01:34',
      'stage': 'apply_metadata',
      'details': {'diff': {'old': {'sorts': [],
         'name': 'e-File AOIC Prod Filings',
         'metadata': {'rowIdentifier': 554463289,
          'availableDisplayTypes': ['table', 'fatrow', 'page']},
         'displayType': 'table',
         'attachments': []},
        'new': {'sorts': [],
         'privateMetadata': {'custom_fields': {}, 'contactEmail': None},
         'name': 'e-File AOIC Prod Filings',
 

In [30]:
rev_1_resource.keys()

dict_keys(['updated_at', 'task_sets', 'revision_seq', 'permissions', 'output_schema_id', 'notes', 'metadata', 'id', 'href', 'fourfour', 'domain_id', 'creation_source', 'created_by', 'created_at', 'closed_at', 'blob_id', 'attachments', 'archive', 'action'])

In [31]:
rev_1_resource['task_sets'][0].keys()

dict_keys(['updated_at', 'status', 'request_id', 'output_schema_id', 'log', 'job_uuid', 'is_edit', 'id', 'finished_at', 'created_by', 'created_at'])

In [34]:
rev_1_resource['permissions']

{}

In [36]:
rev_1_resource['notes'].keys()

AttributeError: 'NoneType' object has no attribute 'keys'

In [38]:
rev_1_resource['metadata'].keys()

dict_keys(['tags', 'queryString', 'privateMetadata', 'name', 'metadata', 'measure', 'licenseId', 'license', 'inputs', 'displayType', 'displayFormat', 'description', 'columns', 'clientContext', 'category', 'attributionLink', 'attribution'])

Column metadata:

In [61]:
rev_1_resource['metadata']['columns'][0]

{'tableColumnId': 139408245,
 'renderTypeName': 'number',
 'position': 1,
 'name': 'RowKey',
 'id': 552437347,
 'format': {},
 'fieldName': 'rowkey',
 'description': '',
 'dataTypeName': 'number',
 'cachedContents': {'top': [{'item': '207433', 'count': '1'},
   {'item': '410489', 'count': '1'},
   {'item': '683251', 'count': '1'},
   {'item': '2553566', 'count': '1'},
   {'item': '2625181', 'count': '1'},
   {'item': '2693764', 'count': '1'},
   {'item': '2757192', 'count': '1'},
   {'item': '3358189', 'count': '1'},
   {'item': '4529425', 'count': '1'},
   {'item': '4529437', 'count': '1'},
   {'item': '4529454', 'count': '1'},
   {'item': '4813603', 'count': '1'},
   {'item': '4813650', 'count': '1'},
   {'item': '4813690', 'count': '1'},
   {'item': '4813782', 'count': '1'},
   {'item': '5626865', 'count': '1'},
   {'item': '5626888', 'count': '1'},
   {'item': '7066596', 'count': '1'},
   {'item': '7066654', 'count': '1'},
   {'item': '7066712', 'count': '1'}],
  'smallest': '4474'

In [71]:
print("Revision 1 Metadata")
print("Column: " + rev_1_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_1_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_1_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_1_resource['metadata']['columns'][0]['cachedContents']['non_null'])
print("                        ")
print("Revision 2 Metadata")
print("Column: " + rev_2_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_2_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_2_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_2_resource['metadata']['columns'][0]['cachedContents']['non_null'])
print("                        ")
print("Revision 3 Metadata")
print("Column: " + rev_3_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_3_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_3_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_3_resource['metadata']['columns'][0]['cachedContents']['non_null'])
print("                        ")
print("Revision 4 Metadata")
print("Column: " + rev_4_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_4_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_4_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_4_resource['metadata']['columns'][0]['cachedContents']['non_null'])
print("                        ")
print("Revision 5 Metadata")
print("Column: " + rev_5_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['non_null'])
print("Revision 5 Metadata")
print("Column: " + rev_5_resource['metadata']['columns'][0]['name'])
print("Count: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['count'])
print("Null: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['null'])
print("Non-null: " + rev_5_resource['metadata']['columns'][0]['cachedContents']['non_null'])

Revision 1 Metadata
Column: RowKey
Count: 6618943
Null: 0
Non-null: 6618943
                        
Revision 2 Metadata
Column: RowKey
Count: 6618943
Null: 0
Non-null: 6618943
                        
Revision 3 Metadata
Column: RowKey
Count: 6618943
Null: 0
Non-null: 6618943
                        
Revision 4 Metadata
Column: RowKey
Count: 6618943
Null: 0
Non-null: 6618943
                        
Revision 5 Metadata
Column: RowKey
Count: 6618943
Null: 0
Non-null: 6618943


What about the `metadata` endpoint?

In [49]:
response = requests.get(f'{domain}/api/views/metadata/v1/{dataset_id}', 
                        auth=(username,password), 
                        stream=True)
metadata = response.json()

In [50]:
metadata

{'id': '7pzs-jtti',
 'name': 'e-File AOIC Prod Filings',
 'attribution': None,
 'attributionLink': None,
 'category': None,
 'createdAt': '2021-08-26T15:55:08+0000',
 'dataUpdatedAt': '2023-01-06T06:02:13+0000',
 'dataUri': 'https://internal-aoic.data.socrata.com/resource/7pzs-jtti',
 'description': None,
 'domain': 'internal-aoic.data.socrata.com',
 'externalId': None,
 'hideFromCatalog': False,
 'hideFromDataJson': False,
 'license': None,
 'metadataUpdatedAt': '2023-01-06T06:01:54+0000',
 'provenance': 'OFFICIAL',
 'updatedAt': '2023-01-06T06:01:54+0000',
 'webUri': 'https://internal-aoic.data.socrata.com/d/7pzs-jtti',
 'approvals': [{'state': 'submittable',
   'submissionObject': 'public_audience_request',
   'submissionOutcome': 'change_audience',
   'workflowId': 3104,
   'submissionDetails': {'permissionType': 'READ'}}],
 'customFields': None,
 'tags': None}