In [1]:
import os
import subprocess
from pprint import pprint
from time import sleep

# Introduction

This is a walk through of what I will do to download and install InfluxDB and other components of the InfluxDB echo system. The reason for doing this is out of general couriousity. I have worked with timeseries data in the past and want to see if InfluxDB really makes this easier to work with.

## Disclaimers and such

I don't have a lot of experiance with InfluxDB. My only exposure has been job ad's on linked in and reading through some of the docs they have made available over the course of an evening. I may (... meaning I probably will) do things that are anti-influxdb-ish. I appologize in advance and appreachiate any feedback. Finally, any opinions I have are my own and do not necessarily reflect those of any organizations I work for or have worked for or any organizations that I am otherwise affiliated with.

# Install InfluxDB

I am going to start with installing InfluxDB locally. There are also Docker Images which I may expose at some point. The overarching goal is to get data into InfluxDB. There are various data creators and sample data sets that I can use - which I will be sure to document. Unless they are propritary I will also provide code or point to the applicable repos.

```
curl -sL https://repos.influxdata.com/influxdb.key | sudo apt-key add -
source /etc/lsb-release
echo "deb https://repos.influxdata.com/${DISTRIB_ID} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
```

The instructions found here are not working: https://docs.influxdata.com/influxdb/v1.4/introduction/installation/

That being said, I found a seperate set of instructions here: https://portal.influxdata.com/downloads#influxdb

```
wget https://dl.influxdata.com/influxdb/releases/influxdb_1.4.2_amd64.deb
sudo dpkg -i influxdb_1.4.2_amd64.deb
```

In [2]:
out = subprocess.Popen(['sudo', 'systemctl', 'start', 'influxdb'], stdout=subprocess.PIPE)

In [3]:
def idb_running():
    proc = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)
    return [p for p in proc.stdout.read().split('\n') if 'influx' in p]
idb_running()

['influxdb  6719  0.0  2.2 303504 22716 ?        Ssl  16:51   0:02 /usr/bin/influxd -config /etc/influxdb/influxdb.conf']

In [4]:
out = subprocess.Popen(['sudo', 'systemctl', 'stop', 'influxdb'], stdout=subprocess.PIPE)

In [5]:
idb_running()

[]

# Get some test data in the DB
Let's face it what I just did is not all that interesting... and we probably have a bit to go before I do something that is interesting.

In [6]:
def start_idb(silent=False):
    out = subprocess.Popen(['sudo', 'systemctl', 'start', 'influxdb'], stdout=subprocess.PIPE)
    if not silent:
        sleep(1)
        pprint(idb_running())
    
def stop_idb(silent=False):
    out = subprocess.Popen(['sudo', 'systemctl', 'stop', 'influxdb'], stdout=subprocess.PIPE)
    if not silent:
        sleep(1)
        pprint(idb_running())

In [7]:
start_idb()

['influxdb 28561 34.0  2.1 156040 21608 ?        Ssl  17:44   0:00 /usr/bin/influxd -config /etc/influxdb/influxdb.conf']


I might come to forget this but it appears that there are a couple of key data types:

* timestamp
* fieldkeys and fieldvalues
* tag keys and tag values

In a manufacturing context I tend to use these terms a lot, although the way they are used are slightly seperate. For example I would typically define a tag as a combination of:

* timestamp
* tag name
* tag value

which would typically look like this:

`[ datetime(2017, 03, 03, 01, 22, 55, ...), 'temp_ava1', 8877 ] `

Anyways - I will try to remain consistant with influxdb's termonology. I think what makes the most sense to do is create a class

Also looks like there is a library for python: https://github.com/influxdata/influxdb-python

`pip install influxdb`

In [38]:
from influxdb import InfluxDBClient
import influxdb
import json
import copy
import numbers
from datetime import datetime
import pytz

In [9]:
DBNAME = 'first_try'

In [10]:
client = InfluxDBClient(database=DBNAME)

In [23]:
print influxdb.__version__
print client.get_list_database()
print client.get_list_measurements()
# print client.get_list_privileges()
# print client.get_list_retention_policies()
print client.get_list_users()
# print client.get_list_series()  # should have been looking at release not master

5.0.0
[{u'name': u'_internal'}]
[]
[]


In [24]:
client.create_database(DBNAME)

In [25]:
client.get_list_database()

[{u'name': u'_internal'}, {u'name': u'first_try'}]

In [None]:
class Asset(object):
    """
    I am modeling this after IIoT. An asset is a machine or test station that produces data. Ideally I am providing metadata
    (tag keys and values) that will describe the asset.
    
    It should be noted that I have not looked at Telegraf too much, so I may be replicating it's functionality
    
    """
    
    def __init__(self, asset_name, fixed=True, refrence_id=None):
        """
        :param asset_name: name of the asset
        :param fixed: If the asset is fixed in place - True. If it is a mobile asset (e.g. car, truck, trailer, etc) - False
        :param refrence_id: assets tend to be part of systems. Other systems are pretty good at tracking structure.... this is a 
                            tie out
        """
        
        self.asset_name = asset_name
        self.fixed = fixed
        self.ref_id = refrence_id
        self.template = self._create_boiler_plate()
    
    def _create_boiler_plate(self):
        bp = {
                "measurement": None,
                "tags": {
                    "asset_name": self.asset_name,
                    "fixed": self.fixed,
                    "ref_id": self.ref_id
                },
                "time": None,
                "fields": {
                }
            }
        
        return bp
    
    def update_static_tags(self, tags):
        if not isinstance(tags, dict):
            raise TypeError('There is a strong assumption that tags is a dictionary')
        self.template['tags'].update(tags)
        
class Measurement(object):
    """
    Create Measurements that will then be written with values added. There will also be a chance to add Tags that are variable 
    (for example, something like a serial number or part number)
    """
    
    def __init__(self, m_name, asset):
        self.m_name = m_name
        self.rec = copy.deepcopy(asset.template)
        self.rec['measurement'] = m_name
        self.timezone = 'UTC'
        self.dlst = False
        
    def set_timezone(self, timezone, dlst=False):
        if timezone in pytz.all_timezones:
            self.timezone = timezone
        else:
            raise ValueError('{} if not a valid timezone. Try import pytz; pytz.all_timezones; for a valid list')
        self.dlst = dlst
        
    def update_fields(self, fields, timestamp=None):
        
        # validate strong assumptions
        bad_keys = [k for k in fields if not isinstance(k, basestring)]
        if len(bad_keys) > 0:
            raise ValueError('Keys in fields must be a base string. Bad values include: {}'.format(bad_keys))
        bad_values = [v for v in fields.values() if not isinstance(v, numbers.Real)]
        if len(bad_values) > 0:
            raise ValueError('Values in fields must be an int or float. Bad values include: {}'.format(bad_values))
            
        self.rec['fields'].update(fields)
        
        if timestamp is None:
            timestamp = datetime.now()
        
        local_dt = pytz.timezone(self.timezone).localize(timestamp, is_dst=self.dlst)
        utc_dt = local_dt.astimezone(pytz.utc)
        
        self.rec['time'] = utc_dt
        
    def update_tags(self, tags):
        # validate strong assumptions
        bad_keys = [k for k in tags if not isinstance(k, basestring)]
        if len(bad_keys) > 0:
            raise ValueError('Keys in tags must be a base string. Bad values include: {}'.format(bad_keys))
        bad_values = [v for v in tags.values() if not isinstance(v, basestring)]
        if len(bad_values) > 0:
            raise ValueError('Values in fields must be a base string. Bad values include: {}'.format(bad_values))
            
        self.rec['tags'].update(tags)
        

In [61]:
magic_machine = Asset('magic_machine', refrence_id='mm44004')

In [62]:
magic_rate = Measurement('magic_rate', magic_machine)

In [63]:
magic_rate.update_fields({'magic_flow': 45.12023, 'magic_flow_set': 47.0})

In [64]:
magic_rate.update_tags({'station_no': '3'})

In [66]:
points = []
points.append(magic_rate.rec)

In [67]:
client.write_points(points)

True

In [68]:
result = client.query('select * from machine_rate;')

In [71]:
result = client.query('select * from magic_rate;')

In [72]:
result

ResultSet({'(u'magic_rate', None)': [{u'station_no': u'3', u'fixed': u'True', u'ref_id': u'mm44004', u'asset_name': u'magic_machine', u'time': u'2017-12-24T21:58:06.481515776Z', u'magic_flow_set': 47, u'magic_flow': 45.12023}]})

# Overall impressions

Overall this has been pretty much a hello world version so it is hard to say what I think. I have a feeling that the real power of this database is going to be in the performance from a write persepective and other components of the architecture. Over the next couple of days I probably take a look at those. I am trying to figure out why I would not just use something like Elastic Search for this activity.