# Example to collect data from ClickHouse into S3.
Example(s) of how to read data from ClickHouse to your local computer for analysis, 
and to store this (or other results) into S3.

## Requirements
1) `pip3 install clickhouse-connect boto3 pandas python-dotenv`
1) User credentials for ClickHouse (not the CloudConsole, but in the db) stored in
`.env` file: `CH_HOST`, `CH_USER`, `CH_PASSWORD`.
1) AWS keys as described in [quatt_aws_utils repo readme](https://github.com/Quattio/quatt_aws_py).
1) Read / write access to`s3://quatt-systems-control`: ask aws prod owner to 
add iam user added to `systems-control-group`


In [11]:
import os
import clickhouse_connect
import boto3
import pandas as pd
from dotenv import load_dotenv

# reading the clickhouse credentials from the .env file
load_dotenv()
host = os.environ['CH_HOST']
password = os.environ['CH_PASSWORD']
user = os.environ['CH_USER']

### Query data to dataframe

In [19]:
# connecting to the clickhouse
client = clickhouse_connect.get_client(
    host=host,
    password=password,
    user=user,
    port=8443
)

# define queries and variables
query = """
    SELECT 
        clientid,
        toStartOfFiveMinutes(time_ts) as time,
        argMin(qc_supervisoryControlMode, time_ts) as qc_supervisoryControlMode,
        argMin(hp1_thermalEnergyCounter, time_ts) as hp1_thermalEnergyCounter,
        argMin(hp1_electricalEnergyCounter, time_ts) as hp1_electricalEnergyCounter,
        argMin(qc_cvEnergyCounter, time_ts) as qc_cvEnergyCounter
    FROM
        cic_stats
    WHERE
        clientid = %(cic_id)s
        AND time_ts BETWEEN %(start_time)s AND %(end_time)s
    GROUP BY clientid, toStartOfFiveMinutes(time_ts)
    ORDER BY clientid, toStartOfFiveMinutes(time_ts)
"""

params = {
    'cic_id': 'CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472',
    'start_time': '2024-01-02 00:00:00',
    'end_time': '2024-01-02 20:00:00'
}

# executing the query
df = client.query_df(query, params)
df.head()

Unnamed: 0,clientid,time,qc_supervisoryControlMode,hp1_thermalEnergyCounter,hp1_electricalEnergyCounter,qc_cvEnergyCounter
0,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:00:00+01:00,0,3434633.0,824539.0,550680.0
1,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:05:00+01:00,0,3434633.0,824540.0,550680.0
2,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:10:00+01:00,0,3434633.0,824540.0,550680.0
3,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:15:00+01:00,0,3434633.0,824541.0,550680.0
4,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:20:00+01:00,0,3434633.0,824541.0,550680.0


### Dump and access results in S3
The bucket is called `quatt-systems-control`. Folder structure within S3 buckets
can only be done by adding '/' symbols in the name of the object.
The example uses the /data/test folder.

In [33]:
session = boto3.Session(profile_name='nout_prod')
s3 = session.client('s3')

# writing the dataframe to the s3 bucket
res = s3.put_object(
    Bucket='quatt-systems-control',
    Key='data/test/clickhouse_test.csv',
    Body=df.to_csv(index=False)
)

In [28]:
# list objects in folder
response = s3.list_objects_v2(
    Bucket='quatt-systems-control',
    Prefix='data/test/'
)
print([r['Key'] for r in response['Contents']])

['data/test/clickhouse_test.csv']


In [31]:
# load object
response = s3.get_object(
    Bucket='quatt-systems-control',
    Key='data/test/clickhouse_test.csv'
)
df = pd.read_csv(response['Body'])
df.head()

Unnamed: 0,clientid,time,qc_supervisoryControlMode,hp1_thermalEnergyCounter,hp1_electricalEnergyCounter,qc_cvEnergyCounter
0,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:00:00+01:00,0,3434633.0,824539.0,550680.0
1,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:05:00+01:00,0,3434633.0,824540.0,550680.0
2,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:10:00+01:00,0,3434633.0,824540.0,550680.0
3,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:15:00+01:00,0,3434633.0,824541.0,550680.0
4,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-01-02 00:20:00+01:00,0,3434633.0,824541.0,550680.0


### Other query examples

In [37]:
# Downsample data points: First point in timeinterval
# -- use coalesce to make sure the sum does not become 'NULL' for 4kw units
# -- use argMax to get last point of time interval

query = """
    SELECT
        clientid, 
        toStartOfInterval(time_ts, INTERVAL 1 HOUR) as time_ts,
        argMin(hp1_electricalEnergyCounter, time_ts) as hp1_E,
        argMin(hp2_electricalEnergyCounter, time_ts) as hp2_E,
        hp1_E + coalesce(hp2_E, 0) as total_E
    FROM
        cic_stats
    WHERE clientid = '{cic_id}'
        AND time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
    GROUP BY clientid, time_ts
""" \
.format(
    cic_id='CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472',
    START_DATE='2024-02-01',
    END_DATE='2024-02-02'
)

In [None]:
# Select from a random subset of cic's which was active in a period

query = """
    WITH cic_ids AS (
        SELECT distinct(clientid) as clientid
        FROM cic_stats
        WHERE time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
        LIMIT {n_cics}, {offset}
    )
    SELECT
        clientid, 
        toStartOfInterval(time_ts, INTERVAL 1 HOUR) as time_ts,
        argMin(hp1_electricalEnergyCounter, time_ts) as hp1_electricalEnergyCounter,
        argMin(hp2_electricalEnergyCounter, time_ts) as hp2_electricalEnergyCounter
    FROM
        cic_stats
    WHERE clientid in (SELECT * FROM cic_ids)
        AND time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
    GROUP BY clientid, time_ts
""" \
.format(
    START_DATE='2024-02-01',
    END_DATE='2024-02-02',
    n_cics=10,
    offset=0
)

In [44]:
# use dynamic column selection to select from large set of columns without manually specifying each of them
# can be handy to select all columns of a certain object: COLUMNS('hp1.*')
# downside: not clear how to assign alias to the columns, so it becomes hard to reference them later in the query.

query = """
    SELECT
        clientid,
	    min(time_ts) as h,
	    COLUMNS('.*Counter') EXCEPT('.*Flag') APPLY(x -> argMin(x, time_ts)) -- applies argMin function to all counter columns except the flags
    FROM
        cic_stats
    WHERE clientid = '{cic_id}'
        AND time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
    GROUP BY clientid, time_ts
""" \
.format(
    cic_id='CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472',
    START_DATE='2024-02-01',
    END_DATE='2024-02-02'
)

In [68]:
# differences between counters 

query = """
        WITH counters AS (
            SELECT
                clientid, 
                toStartOfInterval(time_ts, INTERVAL 1 HOUR) as time_ts,
                argMin(hp1_electricalEnergyCounter, time_ts) as hp1_E,
                argMin(hp2_electricalEnergyCounter, time_ts) as hp2_E,
                hp1_E + coalesce(hp2_E, 0) as total_E
            FROM
                cic_stats
            WHERE clientid = '{cic_id}'
                AND time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
            GROUP BY clientid, time_ts
        )
        SELECT clientid, 
            time_ts,
            total_E,
            total_E - lagInFrame(total_E) over window_a as E_diff
        FROM counters
        WINDOW 
            window_a AS (PARTITION BY clientid ORDER BY time_ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
""" \
.format(
    cic_id='CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472',
    START_DATE='2024-02-01',
    END_DATE='2024-02-02'
)
client.query_df(query)

Unnamed: 0,clientid,time_ts,total_E,E_diff
0,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 00:00:00+01:00,1381165.0,
1,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 01:00:00+01:00,1381170.0,5.0
2,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 02:00:00+01:00,1381405.0,235.0
3,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 03:00:00+01:00,1381748.0,343.0
4,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 04:00:00+01:00,1382249.0,501.0
5,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 05:00:00+01:00,1383022.0,773.0
6,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 06:00:00+01:00,1383945.0,923.0
7,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 07:00:00+01:00,1385358.0,1413.0
8,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 08:00:00+01:00,1386677.0,1319.0
9,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 09:00:00+01:00,1387684.0,1007.0


In [69]:
# cumulative differences

query = """
        WITH counters AS (
            SELECT
                clientid, 
                toStartOfInterval(time_ts, INTERVAL 1 HOUR) as time_ts,
                argMin(hp1_electricalEnergyCounter, time_ts) as hp1_E,
                argMin(hp2_electricalEnergyCounter, time_ts) as hp2_E,
                hp1_E + coalesce(hp2_E, 0) as total_E
            FROM
                cic_stats
            WHERE clientid = '{cic_id}'
                AND time_ts BETWEEN '{START_DATE}' AND '{END_DATE}'
            GROUP BY clientid, time_ts
        ), 
        hourly AS (
            SELECT clientid, 
                time_ts,
                total_E,
                total_E - lagInFrame(total_E) over window_a as E_diff
            FROM counters
            WINDOW 
                window_a AS (PARTITION BY clientid ORDER BY time_ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
        )
        SELECT
            clientid,
            time_ts as h,
            sum(E_diff) over window_b as cumulative_diff
        FROM hourly
        WINDOW 
            window_b AS (PARTITION BY clientid ORDER BY time_ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
""" \
.format(
    cic_id='CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472',
    START_DATE='2024-02-01',
    END_DATE='2024-02-03'
)
client.query_df(query)

Unnamed: 0,clientid,h,cumulative_diff
0,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 00:00:00+01:00,
1,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 01:00:00+01:00,5.0
2,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 02:00:00+01:00,240.0
3,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 03:00:00+01:00,583.0
4,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 04:00:00+01:00,1084.0
5,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 05:00:00+01:00,1857.0
6,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 06:00:00+01:00,2780.0
7,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 07:00:00+01:00,4193.0
8,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 08:00:00+01:00,5512.0
9,CIC-87845a22-3d04-5f8c-8d4e-c1e735a9e472,2024-02-01 09:00:00+01:00,6519.0
