# SQuaSH InfluxDB data model

In [DM-18103](https://jira.lsstcorp.org/browse/DM-18103) we revisit the SQuaSH InfluxDB data model. You can use this notebook to try different strategies mapping `lsst.verify` data to InfluxDB. You can also use it to "manually" synchronize the SQuaSH production database with an InfluxDB instance. For a quick introduction on InfluxDB concepts see [this notebook](https://github.com/lsst-sqre/influx-demo).

In [None]:
SQUASH_API_URL = "https://squash-restful-api.lsst.codes/"
INFLUXDB_API_URL = "https://influxdb-demo.lsst.codes"

The following cells will grab SQuaSH data and write it in the format used by InfluxDB, the so called [line protocol](https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/):


```#<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]```

Important InfluxDB concepts: an InfluxDB measurement is equivalent to an SQL table, tags are annotations that are used to query the data, and thus are indexed in InfluxDB. Fields correspond to the actual values (metric values in this case) and are not indexed. InfluxDB is optimized for time-series data which are indexed and sharded by the timestamp.


See also [InfluxDB schema design and data layout](https://docs.influxdata.com/influxdb/v1.7/concepts/schema_and_data_layout/#general-recommendations) recommendations.


## Mapping SQuaSH metadata to InfluxDB
`lsst.verify` adds metadata to verification jobs and this is uploaded to SQuaSH. The mapping this metadata to either InfluxDB tags or fields is defined by this mapping:

1. By default, if the SQuaSH metadata key is not found in the mapping, it will be written as a tag, and the original key will be preserved. 
2. You can use the mapping to rename the metadata key when appropriate. 
3. Finally, if `schema` is set to `None` in the mapping, then the corresponding metadata won't be written to InfluxDB.

In [None]:
MAPPING = [{'squash': 'ci_id', 
            'influxdb': 'ci_id', 
            'schema': 'field'
           },
           {'squash': 'id', 
            'influxdb': 'squash_id',
            'schema': 'field'
           },
           {'squash': 'date',
            'influxdb': None,
            'schema': None
           },
           {'squash': 'timestamp',
            'influxdb': 'timestamp',
            'schema': 'field'
           },
           {'squash': 'ci_url',
            'influxdb': None,
            'schema': None
           },
           {'squash': 'packages',
            'influxdb': None,
            'schema': None
           },
           {'squash': 'filter_name',
            'influxdb': 'filter',
            'schema': 'tag'
           }]

The rationale for this mapping is the following:
1. `ci_id` and `squash_id` are sequential numbers if they are mapped to InfluxDB tag it will [increase InfluxDB series cardinality](https://docs.influxdata.com/influxdb/v1.7/concepts/schema_and_data_layout/#discouraged-schema-design) (DM-18342)
2. It is not possible to do [math operations with InfluxDB timestamps](https://community.influxdata.com/t/math-operations-on-field-value-and-time/6323/4) so it is useful to add the `timestamp` explicitly as a field. There are also a `date` field which is added as environment metadata that we don't need in InfluxDB (DM-17049)
3. `ci_url` will be used to connected Chronograf to CI. It is a different URL for each CI run. The rationale for adding `ci_url` as an InfluxDB field it is the same as in 1 (DM-18342)
4. We skip `packages` metadata for now, we plan to add to InfluxDB only the packages that changed between two consecutive CI runs (DM-18343)
5. `lsst.verify` metadata uses `filter_name` we decided to rename it to `filter` is the dataID key commonly used in DM.
6. Other metadata are automatically added as InfluxDB tags.


We start by creating a new InfluxDB database. Note that if the database already exists an status code 200 (OK) is returned and the existing data is preserved. If you want to overwrite an existing database you have to delete it first using the Chronograf admin interface.

In [None]:
import requests
import json

INFLUXDB_DATABASE = "squash-demo"

params={'q': 'CREATE DATABASE "{}"'.format(INFLUXDB_DATABASE)}
r = requests.post(url=INFLUXDB_API_URL + "/query", params=params)
r.status_code

In [None]:
from pytz import UTC
from datetime import datetime
from dateutil.parser import parse

def format_timestamp(date):
    """ Format a timestamp string to be used in the InfluxDB line protocol.

        Parameters
        ----------
        date: `<str>`
            Timestamp string, e.g. 2019-02-11T19:06:32Z

        Returns
        -------
        timestamp: `<int>`
            Timestamp in nanosecond-precision Unix time.
            See https://docs.influxdata.com/influxdb/v1.6/write_protocols/
    """

    epoch = UTC.localize(datetime.utcfromtimestamp(0))

    timestamp = int((parse(date) - epoch).total_seconds() * 1e9)

    return timestamp


In [None]:
def mapping(key):
    """ Perform the mapping between SQuaSH metadata and InfluxDB 
        given a MAPPING.
    
        Parameters
        ---------- 
        key: `str`
            The key to look for in the MAPPING.
        
        Returns
        -------
        mapped_key: `str` or `None`       
            Returns the `mapped_key` if the key is found in the MAPPING or the 
            original key if the key does not match.
       
        schema: `str` or `None`
            The InfluxDB schema to write, or `None` if the key should not 
            be added to InfluxDB. 
        
        
    """ 
    mapped_key = key
    schema = 'tag'
    
    for m in MAPPING:
        if m['squash'] == key:
            mapped_key = m['influxdb']
            schema = m['schema']
            break
                
    return mapped_key, schema

In [None]:
def sanitize(obj):
    """ Return a valid string representing a tag key, a tag value or a field key.
        
        See https://docs.influxdata.com/influxdb/v0.13/write_protocols/
        write_syntax/#escaping-characters
    
        Parameters
        ----------
        obj: `<obj>`
            An object for the tag key, tag value or field key.
        
        Returns
        -------
        string: `str`
            A valid string for the tag key, tag value or field key.
    """
    string = str(obj)
    string = string.replace(" ", "\ ")
    string = string.replace(",", "\," )
    string = string.replace("=", "\=")
            
    return string

In [None]:
def process_metadata(data):
    """ Process SQuaSH metadata using a pre-configured mapping to InfluxDB.
    
        Parameters
        ----------
        data: `dict`
            A dictionary with SQuaSH metadata.
       
        Return
        ------
        tags: `<list>` 
            List of tags to be written to InfluxDB.
        fields: `<list>`
            List of fields to be written to InfluxDB.
    """
    tags = []
    fields = []
    for key, value in data.items():
        # process nested dict
        if isinstance(value, dict):
            tmp_tags, tmp_fields = process_metadata(value)
            tags.extend(tmp_tags)
            fields.extend(tmp_fields)
        else:
            new_key, schema = mapping(key)
            if new_key and schema == 'tag':
                tags.append("{}={}".format(sanitize(new_key), sanitize(value)))
            elif new_key and schema == 'field':
                fields.append("{}={}".format(sanitize(new_key), value))
    
    return tags, fields

In [None]:
def format_influxdb_line(measurement, tags, fields, timestamp):
    """ Format a line following the InfluxDB line protocol.

        Parameters
        ----------
        measurement: `<str>`
            Name of the InfluxDB measurement
        tags: `<list>`
            A list of valid InfluxDB tags
        fields: `<list>`
            A list of valid InfluxDB fields
        timestamp: `int`
            A timestamp in nanosecond-precision Unix time.

        Returns
        -------
        influxdb_line: `<str>`
            An InfluxDB line as defined by the line protocol in
            https://docs.influxdata.com/influxdb/v1.6/write_protocols/
    """
    line = "{},{} {} {}".format(measurement, ",".join(tags), ",".join(fields),
                                timestamp)
    return line


In [None]:
def send_to_influxdb(influxdb_line):
    """ Send a line to an InfluxDB database. It assumes INFLUXDB_DATABASE already
        exists in InfluxDB.

        Parameters
        ----------
        influxdb_line: `<str>`
            An InfluxDB line as defined by the line protocol in
            https://docs.influxdata.com/influxdb/v1.6/write_protocols/

        Returns
        -------
        status_code: `<int>`
            Status code from the InfluxDB HTTP API.
        text: `<str>`
            Status message from the InfluxDB HTTP API.
    """
    params = {'db': INFLUXDB_DATABASE}
    r = requests.post(url=INFLUXDB_API_URL + "/write", params=params,
                      data=influxdb_line)

    return r.status_code, r.text

In [None]:
import math

def job_to_influxdb(data):
    """Unpack a SQuaSH job and send it to InfluxDB. 
    
        Parameters
        ----------
        data: `<dict>`
            A dictionary containing the job data
        
        Returns
        -------
        status_code: `<int>`
             204:
               The request was processed successfully
             400:
               Malformed syntax or bad query

        Note
        ----
        `lsst.verify` measurement and InfluxDB measurement mean different things. 
    """    
    # This still gets the timestamp of an individual `lsst.verify` job, we want 
    # the timestamp of the Jenkins job instead.
    # DM-XXXX - SQuaSH API /jenkins/<ci_id> should return the jenkins timestamp 
    timestamp = format_timestamp(data['date_created'])
    
    # Add extra metadata
    
    data['meta']['id'] = data['id']
    data['meta']['env']['timestamp'] = timestamp
    data['meta']['env']['ci_dataset'] = data['ci_dataset']
    
    tags, extra_fields = process_metadata(data['meta'])
    
    # `lsst.verify` package -> InfluxDB measurement
    # `lsst.verify` metric value -> InfluxDB field
    # Group InfluxDB fields by the corresponding InfluxDB measurement
    
    fields_by_measurement = {}
    for verify_measurement in data['measurements']:
        # DM-XXXX - SQuaSH API /measurements should return the verification package 
        influxdb_measurement = verify_measurement['metric'].split('.')[0]

        if influxdb_measurement not in fields_by_measurement:
            fields_by_measurement[influxdb_measurement] = []
            
        # InfluxDB does not store NaNs
        # https://github.com/influxdata/influxdb/issues/4089
        if not math.isnan(verify_measurement['value']):
            fields_by_measurement[influxdb_measurement].append("{}={}".format(verify_measurement['metric'],
                                                                              verify_measurement['value']))
    
    # By grouping InfluxDB fields we can also send all fields that belong to a 
    # measurement at once.
    for influxdb_measurement in fields_by_measurement:
    
        fields = fields_by_measurement[influxdb_measurement] + extra_fields
        influxdb_line = format_influxdb_line(influxdb_measurement, tags, fields,
                                             timestamp)

        status_code, message = send_to_influxdb(influxdb_line)
        if status_code != 204:
            print(message)

    return 

Retrieve a list of verification jobs from SQuaSH and send them to InfluxDB. As you run this notebook you might follow the data being written to InfluxDB using the [Data Explorer tool](https://chronograf-demo.lsst.codes/) in Chronograf. 



In [None]:
jobs = requests.get(SQUASH_API_URL + "/jobs").json()

for job_id in jobs['ids']:
    
    data = requests.get(SQUASH_API_URL + "/job/{}".format(job_id)).json()
    
    # Skip deprecated datasets
    if data['ci_dataset'] == 'unknown' or data['ci_dataset'] == 'decam':
        continue

    print('Sending InfluxDB line for job {}.'.format(job_id))
    
    job_to_influxdb(data)
    

Sending InfluxDB line for job 3254.
Sending InfluxDB line for job 3255.
Sending InfluxDB line for job 3256.
Sending InfluxDB line for job 3257.
Sending InfluxDB line for job 3258.
Sending InfluxDB line for job 3259.
Sending InfluxDB line for job 3260.
Sending InfluxDB line for job 3261.
Sending InfluxDB line for job 3262.
Sending InfluxDB line for job 3263.
Sending InfluxDB line for job 3264.
Sending InfluxDB line for job 3265.
Sending InfluxDB line for job 3266.
Sending InfluxDB line for job 3267.
Sending InfluxDB line for job 3268.
Sending InfluxDB line for job 3269.
Sending InfluxDB line for job 3270.
Sending InfluxDB line for job 3271.
Sending InfluxDB line for job 3272.
Sending InfluxDB line for job 3273.
Sending InfluxDB line for job 3274.
Sending InfluxDB line for job 3275.
Sending InfluxDB line for job 3276.
Sending InfluxDB line for job 3277.
Sending InfluxDB line for job 3278.
Sending InfluxDB line for job 3279.
Sending InfluxDB line for job 3280.
Sending InfluxDB line for jo