# Import Data and Transform for Publication

This is the notebook used to import the raw data from influxDB and transform it to a publishable format as csv file

Version = 1.0

Author = Patrick Ruoff

The environment is in conda_env.yml

In [1]:
from math import floor

import pandas as pd
import numpy as np
from influxdb import DataFrameClient

In [None]:
def read_df_from_influxdb():
    """ Read data from influxDB instance

    Filter for specified time frame. Remove data in occGaps as specified in setup.py.
    Bring all datapoint to 30s frequency and handle duplicates. Drop a user's data
    if certain important features are missing for the full time frame.

    :return: DataFrame with all the data
    """

    host = 'localhost'
    port = 8086
    user = 'pythonServer'
    reader = open('influxdb_password.txt')
    password = reader.read()
    dbname = 'openhab_db'
    client = DataFrameClient(host, port, user, password, dbname)

    global isMessagePrinted
    for gapInfo in se.occGaps:
        isMessagePrinted[gapInfo[0]] = False

    print('Starting import from {} between {} and {}'.format(
        dbname, se.startTime, se.endTime))
    # NOTE: beware that timestamps now are +00:00 -> different from Grafana
    data = pd.DataFrame()

    if not np.shape(se.inputSeries) == np.shape(se.outputSeries):
        print('InputSerieses and se.outputSerieses must have the same number of entries! '
              'Found: {}, {}'
              .format(np.shape(se.inputSeries), np.shape(se.outputSeries)))

    start_time_unix = "{}000000000".format(int(pd.to_datetime(se.startTime).timestamp()))
    end_time_unix = "{}000000000".format(int(pd.to_datetime(se.endTime).timestamp()))
    # first is different from others
    data_frames = {}
    i = 0
    while i < np.shape(se.inputSeries)[0]:
        try:
            data_frames[se.outputSeries[i]] = client.query(
                "SELECT * FROM {} WHERE time > {} AND time < {}".format(
                    se.inputSeries[i], start_time_unix, end_time_unix))[se.inputSeries[i]]
        except KeyError:
            print('Input series {} has no entries in specified time frame'.format(
                se.inputSeries[i]))
            if se.outputSeries[i] in se.seriesEnvironment:
                se.seriesEnvironment.remove(se.outputSeries[i])
                if se.outputSeries[i] in se.categoricalSeries:
                    se.categoricalSeries.remove(se.outputSeries[i])
            elif se.outputSeries[i] in se.seriesBiosignal:
                if se.outputSeries[i][:2] in se.availableUsers:
                    se.availableUsers.remove(se.outputSeries[i][:2])
                se.seriesBiosignal.remove(se.outputSeries[i])
            else:
                se.seriesVote.remove(se.outputSeries[i])
            del se.inputSeries[i]
            del se.outputSeries[i]
            continue

        for gapInfo in se.occGaps:
            if gapInfo[0] >= pd.DataFrame(data_frames[se.outputSeries[i]]).index[0] and \
                    gapInfo[1] <= pd.DataFrame(
                    data_frames[se.outputSeries[i]]).index[-1]:
                pd.DataFrame(data_frames[se.outputSeries[i]]).loc['{}:{}'.format(
                    gapInfo[0], gapInfo[1])] = np.nan
                if not isMessagePrinted[gapInfo[0]]:
                    print('Drop entries between {} and {} as specified in setup.'.format(
                        gapInfo[0], gapInfo[1]))
                    isMessagePrinted[gapInfo[0]] = True

        current_df = pd.DataFrame(data_frames[se.outputSeries[i]])
        current_df.columns = [se.outputSeries[i]]
        # round to 30 seconds
        new_indices = []
        for ind in data_frames[se.outputSeries[i]][se.outputSeries[i]].index:
            ind = ind.replace(second=floor(ind.second/30)*30, microsecond=0, nanosecond=0)
            new_indices.append(ind)
        current_df.index = new_indices
        data_frames[se.outputSeries[i]] = current_df
        i += 1

    # check if all series of missing users are removed from seriesesBio, -Vote, and -Env
    # without biosignal data, environmental and vote data can be dropped
    j = 0
    while j < np.shape(se.seriesVote)[0]:
        if se.seriesVote[j][0] == 'U':
            if not se.seriesVote[j][:2] in se.availableUsers:
                print('WARNING: {} has values even though other series'
                      ' of the same user are missing!'.format(se.seriesVote[j]))
                print('Removing {} since userdata is not available.'.format(
                    se.seriesVote[j]))
                se.outputSeries.remove(se.seriesVote[j])
                del data_frames[se.seriesVote[j]]
                se.seriesVote.remove(se.seriesVote[j])
                continue
        j += 1
    j = 0
    while j < np.shape(se.seriesBiosignal)[0]:
        if se.seriesBiosignal[j][0] == 'U':
            if not se.seriesBiosignal[j][:2] in se.availableUsers:
                print('WARNING: {} has values even though other series'
                      ' of the same user are missing!'.format(se.seriesBiosignal[j]))
                print('Removing {} since userdata is not available.'.format(
                    se.seriesBiosignal[j]))
                se.outputSeries.remove(se.seriesBiosignal[j])
                del data_frames[se.seriesBiosignal[j]]
                se.seriesBiosignal.remove(se.seriesBiosignal[j])
                continue
        j += 1
    j = 0
    while j < np.shape(se.seriesEnvironment)[0]:
        if se.seriesEnvironment[j][0] == 'U':
            if not se.seriesEnvironment[j][:2] in se.availableUsers:
                print('WARNING: {} has values even though other series'
                      ' of the same user are missing!'.format(se.seriesEnvironment[j]))
                print('Removing {} since userdata is not available.'.format(
                    se.seriesEnvironment[j]))
                se.outputSeries.remove(se.seriesEnvironment[j])
                del data_frames[se.seriesEnvironment[j]]
                se.seriesEnvironment.remove(se.seriesEnvironment[j])
                continue
        j += 1

    # update userSerieses
    for user in se.availableUsers:
        se.userSeries[user] = []
        for series in se.outputSeries:
            if user in series:
                se.userSeries[user].append(series)

    # unite to one df
    data = pd.DataFrame([], index=pd.date_range(
        start=se.startTime, end=se.endTime, freq='30s'))
    for i in (np.arange(np.shape(se.outputSeries)[0])):
        print('i', i, data_frames[se.outputSeries[i]].columns)
        # handle duplicates
        is_duplicated_array = data_frames[se.outputSeries[i]].index.duplicated(False)
        if any(is_duplicated_array):
            data_frames[se.outputSeries[i]].index.name = 'index'
            print('duplicated indices:\n',
                  data_frames[se.outputSeries[i]][is_duplicated_array])
            duplicated_indices = np.unique(
                (data_frames[se.outputSeries[i]][is_duplicated_array]).index.to_numpy())
            if data_frames[se.outputSeries[i]]._get_numeric_data().columns.empty:
                # for categorical data
                data_frames[se.outputSeries[i]] = \
                    data_frames[se.outputSeries[i]].groupby('index').last()
            else:
                if se.outputSeries[i] in se.seriesEnvironment or \
                        se.outputSeries[i] in se.seriesBiosignal:
                    # for numerical sensor data
                    data_frames[se.outputSeries[i]] = \
                        data_frames[se.outputSeries[i]].groupby('index').mean()
                else:
                    # for numerical voting data use last voting
                    data_frames[se.outputSeries[i]] = \
                        data_frames[se.outputSeries[i]].groupby('index').last()
            print('\nare merged to:\n',
                  data_frames[se.outputSeries[i]].loc[duplicated_indices])
        # used to be data[se.outputSerieses[i]] = data_frames[se.outputSerieses[i]]
        # but that limits the time span to the first series' last value
        data = data.merge(
            right=data_frames[se.outputSeries[i]], how='outer',
            left_index=True, right_index=True)

    # make categorical features strings
    for i, series in enumerate(se.categoricalSeries):
        if series in data.columns:
            data[series] = data[series].astype(str)
            # nan values stay np.nan
            data[series] = data[series].replace('nan', np.nan)
        else:
            del se.categoricalSeries[i]
    data.index.name = 'index'

    print('Data from {} to {} in influxDB is of size {}'.format(se.startTime, se.endTime, np.shape(data)))
    return data
