In [5]:
import pandas as pd
import math
from influxdb_client import InfluxDBClient, Point
from kedro.config import ConfigLoader
from kedro.framework.project import settings

In [6]:
# Load Parameters
parameters = catalog.load('parameters')

# Load Credentials
conf_path = str(context.project_path / settings.CONF_SOURCE)
conf_loader = ConfigLoader(conf_source=conf_path, env='local')
credentials = conf_loader.get('credentials*', 'credentials*/**')

print('Parameters:', parameters)
print('Credentials:', credentials)

2022-09-29 19:02:58,579 - kedro.io.data_catalog - INFO - Loading data from `parameters` (MemoryDataSet)...
Parameters: {'influxdb_version': '2.x', 'nowcast_datetime': '2022-09-15T00:00:00-05:00', 'start_datetime': None}
Credentials: {'influxdb': {'url': 'http://localhost:8086', 'token': 'MsA2XntxWt2s22EXoy1DVpjmHzS14z_3U8ucm1vn8EAkccf4wxGQxbvFi6JrOCTN_50ez0npAUa52EE1wAZFJA==', 'org': 'Tangara', 'bucket': 'Tangara', 'username': 'tangara', 'password': 'sebaxtian', 'database': 'Tangara'}}


In [7]:
# Kedro Catalog
pm25_clean = catalog.load('pm25_clean')
temp_raw = catalog.load('temp_raw')
hum_raw = catalog.load('hum_raw')
co2_raw = catalog.load('co2_raw')
aqi_instant = catalog.load('aqi_instant')
tangara_stations = catalog.load('tangara_stations')

2022-09-29 19:02:58,683 - kedro.io.data_catalog - INFO - Loading data from `pm25_clean` (CSVDataSet)...
2022-09-29 19:02:58,697 - kedro.io.data_catalog - INFO - Loading data from `temp_raw` (CSVDataSet)...
2022-09-29 19:02:58,710 - kedro.io.data_catalog - INFO - Loading data from `hum_raw` (CSVDataSet)...
2022-09-29 19:02:58,718 - kedro.io.data_catalog - INFO - Loading data from `co2_raw` (CSVDataSet)...
2022-09-29 19:02:58,728 - kedro.io.data_catalog - INFO - Loading data from `aqi_instant` (CSVDataSet)...
2022-09-29 19:02:58,737 - kedro.io.data_catalog - INFO - Loading data from `tangara_stations` (CSVDataSet)...


In [8]:
tangara_stations.head()

Unnamed: 0,DATETIME,ID,MAC,GEOHASH,GEOREGION,GEOLOCATION,LATITUDE,LONGITUDE
0,2022-09-29T19:00:48.971171-05:00,TANGARA_2BBA,D29ESP32DE02BBA,d29e6b4,d29,3.38447571 -76.51634216,3.384476,-76.516342
1,2022-09-29T19:00:48.971171-05:00,TANGARA_14D6,D29ESP32DED14D6,d29dfx4,d29,3.33503723 -76.52732849,3.335037,-76.527328
2,2022-09-29T19:00:48.971171-05:00,TANGARA_1CE2,D29ESP32DED1CE2,d29e4cv,d29,3.35014343 -76.51222229,3.350143,-76.512222
3,2022-09-29T19:00:48.971171-05:00,TANGARA_2492,D29ESP32DED2492,d29e64g,d29,3.39958191 -76.54792786,3.399582,-76.547928
4,2022-09-29T19:00:48.971171-05:00,TANGARA_2FF6,D29ESP32DED2FF6,d29e66v,d29,3.39958191 -76.53419495,3.399582,-76.534195


In [9]:
pm25_clean.head()

Unnamed: 0,DATETIME,TANGARA_2BBA,TANGARA_14D6,TANGARA_1CE2,TANGARA_2492,TANGARA_2FF6,TANGARA_48C6,TANGARA_4D7A,TANGARA_532E,TANGARA_EA06,TANGARA_F1AE,TANGARA_FAC6,TANGARA_06BE
0,2022-09-28T19:00:30-05:00,10.0,,,7.0,,,,0.0,0.0,,,
1,2022-09-28T19:01:00-05:00,10.0,9.0,9.0,6.0,10.0,2.0,0.0,0.0,0.0,3.0,,12.0
2,2022-09-28T19:01:30-05:00,10.0,9.0,9.0,7.0,11.0,2.0,0.0,0.0,0.0,3.0,,12.0
3,2022-09-28T19:02:00-05:00,9.0,10.0,10.0,6.0,11.0,1.0,0.0,0.0,0.0,3.0,,13.0
4,2022-09-28T19:02:30-05:00,11.0,9.0,10.0,7.0,11.0,2.0,0.0,1.0,0.0,3.0,,13.0


In [10]:
# Get Tangara Stations Measurements Dictionary
def get_stations_measurements(stations, pm25, aqi, temp, hum, co2):
    # Tangara Stations Measurements Dictionary
    stations_measurements = {}
    # For each Tangara Station
    for station in stations.itertuples():
        #print(station._fields)
        #print(getattr(station, 'ID'))
        station_id = getattr(station, 'ID')
        #print('station_id:', station_id)

        # Set Index
        station_pm25 = pm25[['DATETIME', station_id]].set_index('DATETIME')
        station_aqi = aqi[['DATETIME', station_id]].set_index('DATETIME')
        station_temp = temp[['DATETIME', station_id]].set_index('DATETIME')
        station_hum = hum[['DATETIME', station_id]].set_index('DATETIME')
        station_co2 = co2[['DATETIME', station_id]].set_index('DATETIME')
        
        # Join Measurements
        pm25_aqi = station_pm25.join(station_aqi, lsuffix='_PM25', rsuffix='_AQI')
        temp_hum = station_temp.join(station_hum, lsuffix='_TEMP', rsuffix='_HUM')
        station_measurements = pm25_aqi.join([temp_hum, station_co2])
        
        # Rename Columns
        station_measurements.rename(
            columns={
                f'{station_id}_PM25': 'PM25',
                f'{station_id}_AQI': 'AQI',
                f'{station_id}_TEMP': 'TEMP',
                f'{station_id}_HUM': 'HUM',
                f'{station_id}': 'CO2'
            },
            inplace=True
        )
        # Reset Index
        station_measurements = station_measurements.reset_index()

        #print('station_measurements:', station_measurements.columns.to_list())
        
        # Add ID, MAC and GEOHASH columns
        station_measurements['STATION_ID'] = station_id
        station_measurements['MAC'] = getattr(station, 'MAC')
        station_measurements['GEOHASH'] = getattr(station, 'GEOHASH')
        station_measurements['GEOREGION'] = getattr(station, 'GEOREGION')

        # Set Data Types
        station_measurements['PM25'] = station_measurements['PM25'].astype('float64')
        station_measurements['PM25'] = station_measurements['PM25'].apply(lambda x: x if math.isnan(x) else round(x, 0))
        #station_measurements['AQI'] = station_measurements['AQI'].apply(lambda x: x if math.isnan(x) else math.ceil(x))
        station_measurements['AQI'] = station_measurements['AQI'].astype('float64')
        station_measurements['AQI'] = station_measurements['AQI'].apply(lambda x: x if math.isnan(x) else round(x, 0))
        #station_measurements['AQI'] = station_measurements['AQI'].interpolate(method='pad', limit_direction='forward')
        station_measurements['TEMP'] = station_measurements['TEMP'].astype('float64')
        station_measurements['TEMP'] = station_measurements['TEMP'].apply(lambda x: x if math.isnan(x) else round(x, 0))
        station_measurements['HUM'] = station_measurements['HUM'].astype('float64')
        station_measurements['HUM'] = station_measurements['HUM'].apply(lambda x: x if math.isnan(x) else round(x, 0))
        station_measurements['CO2'] = station_measurements['CO2'].astype('float64')
        station_measurements['CO2'] = station_measurements['CO2'].apply(lambda x: x if math.isnan(x) else round(x, 0))

        # Set Tangara Station Measurements
        stations_measurements[station_id] = station_measurements
    
    return stations_measurements

In [11]:
stations_measurements = get_stations_measurements(tangara_stations, pm25_clean, aqi_instant, temp_raw, hum_raw, co2_raw)
stations_measurements['TANGARA_F1AE'].head()

Unnamed: 0,DATETIME,PM25,AQI,TEMP,HUM,CO2,STATION_ID,MAC,GEOHASH,GEOREGION
0,2022-09-28T19:00:30-05:00,,,,,,TANGARA_F1AE,D29TTGOTD8F1AE,d29eg66,d29
1,2022-09-28T19:01:00-05:00,3.0,13.0,30.0,48.0,419.0,TANGARA_F1AE,D29TTGOTD8F1AE,d29eg66,d29
2,2022-09-28T19:01:30-05:00,3.0,13.0,30.0,48.0,417.0,TANGARA_F1AE,D29TTGOTD8F1AE,d29eg66,d29
3,2022-09-28T19:02:00-05:00,3.0,13.0,30.0,48.0,419.0,TANGARA_F1AE,D29TTGOTD8F1AE,d29eg66,d29
4,2022-09-28T19:02:30-05:00,3.0,13.0,30.0,48.0,420.0,TANGARA_F1AE,D29TTGOTD8F1AE,d29eg66,d29


In [None]:
# For each Tangara Stations Measurements
for station_measurements in stations_measurements.values():
    #print(station_measurement.columns.to_list())
    # For each station_measurement tuple
    for row in station_measurements.itertuples():
        #print(row._fields)
        measurement = {
            'STATION_ID': getattr(row, 'STATION_ID'),
            'MAC': getattr(row, 'MAC'),
            'GEOHASH': getattr(row, 'GEOHASH'),
            'PM25': getattr(row, 'PM25'),
            'AQI': getattr(row, 'AQI'),
            'TEMP': getattr(row, 'TEMP'),
            'HUM': getattr(row, 'HUM'),
            'CO2': getattr(row, 'CO2'),
            'DATETIME': getattr(row, 'DATETIME'),
        }
        point = Point.from_dict(
            measurement,
            record_measurement_key="STATION_ID",
            record_time_key="DATETIME",
            record_tag_keys=["MAC", "GEOHASH"],
            record_field_keys=["PM25", "AQI", "TEMP", "HUM", "CO2"]
        )
        #print('Point:', point)

In [None]:
# How to use RxPY to prepare batches for asyncio client
# https://github.com/influxdata/influxdb-client-python/blob/master/examples/asynchronous_batching.py
#
# InfluxDBClientAsync
#
import asyncio
from csv import DictReader

import reactivex as rx
from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler

from influxdb_client import Point
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync



def station_measurements_to_generator(station_measurements):
    """
    Parse your stations_measurements Data Frame into generator
    """
    # For each station_measurements tuple
    for row in station_measurements.itertuples():
        #print(row._fields)
        measurement = {
            'MEASUREMENT_NAME': 'TANGARA_STATIONS',
            'STATION_ID': getattr(row, 'STATION_ID'),
            'MAC': getattr(row, 'MAC'),
            'GEOHASH': getattr(row, 'GEOHASH'),
            'PM25': getattr(row, 'PM25'),
            'AQI': getattr(row, 'AQI'),
            'TEMP': getattr(row, 'TEMP'),
            'HUM': getattr(row, 'HUM'),
            'CO2': getattr(row, 'CO2'),
            'DATETIME': getattr(row, 'DATETIME'),
        }
        point = Point.from_dict(
            measurement,
            record_measurement_key="MEASUREMENT_NAME",
            record_time_key="DATETIME",
            record_tag_keys=["MAC", "GEOHASH"],
            record_field_keys=["PM25", "AQI", "TEMP", "HUM", "CO2"]
        )
        #print('Point:', point)
        yield point


async def async_ingesting_stations_measurements(station_measurements):
    # Check InfluxDB Version
    if parameters['influxdb_version'] == '2.x':
        # Secrets
        # You can generate an API token from the "API Tokens Tab" in the UI
        url = credentials['influxdb']['url']
        token = credentials['influxdb']['token']
        org = credentials['influxdb']['org']
        bucket = credentials['influxdb']['bucket']
    elif parameters['influxdb_version'] == '1.8':
        # Secrets
        url = credentials['influxdb']['url']
        username = credentials['influxdb']['username']
        password = credentials['influxdb']['password']
        token = f'{username}:{password}'
        database = credentials['influxdb']['database']
        retention_policy = 'autogen'
        bucket = f'{database}/{retention_policy}'
        org = credentials['influxdb']['org']

    # Async write batches
    async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
        write_api = client.write_api()

        """
        Async write
        """

        async def async_write(batch):
            """
            Prepare async task
            """
            await write_api.write(bucket=bucket, record=batch)
            return batch

        """
        Prepare batches from generator
        """
        batches = rx \
            .from_iterable(station_measurements_to_generator(station_measurements)) \
            .pipe(ops.buffer_with_count(500)) \
            .pipe(ops.map(lambda batch: rx.from_future(asyncio.ensure_future(async_write(batch)))), ops.merge_all())

        done = asyncio.Future()

        """
        Write batches by subscribing to Rx generator
        """
        batches.subscribe(on_next=lambda batch: print(f'Written batch... {len(batch)}'),
                        on_error=lambda ex: print(f'Unexpected error: {ex}'),
                        on_completed=lambda: done.set_result(0),
                        scheduler=AsyncIOScheduler(asyncio.get_event_loop()))
        """
        Wait to finish all writes
        """
        await done


In [None]:
#async_ingesting_stations_measurements(stations_measurements['TANGARA_FAC6'])

In [None]:
# How to use RxPY to prepare batches for synchronous write into InfluxDB
# https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set_sync_batching.py
#
# InfluxDBClientSync
#
from csv import DictReader

import reactivex as rx
from reactivex import operators as ops

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write.retry import WritesRetry
from influxdb_client.client.write_api import SYNCHRONOUS



def station_measurements_to_generator(station_measurements):
    """
    Parse your stations_measurements Data Frame into generator
    """
    # For each station_measurements tuple
    for row in station_measurements.itertuples():
        #print(row._fields)
        measurement = {
            'MEASUREMENT_NAME': 'TANGARA_STATIONS',
            'STATION_ID': getattr(row, 'STATION_ID'),
            'MAC': getattr(row, 'MAC'),
            'GEOHASH': getattr(row, 'GEOHASH'),
            "GEOREGION": getattr(row, "GEOREGION"),
            'PM25': getattr(row, 'PM25'),
            'AQI': getattr(row, 'AQI'),
            'TEMP': getattr(row, 'TEMP'),
            'HUM': getattr(row, 'HUM'),
            'CO2': getattr(row, 'CO2'),
            'DATETIME': getattr(row, 'DATETIME'),
        }
        point = Point.from_dict(
            measurement,
            record_measurement_key="MEASUREMENT_NAME",
            record_time_key="DATETIME",
            record_tag_keys=["STATION_ID", "MAC", "GEOHASH", "GEOREGION"],
            record_field_keys=["PM25", "AQI", "TEMP", "HUM", "CO2", "STATION_ID", "MAC", "GEOHASH"]
        )
        #print('Point:', point)
        yield point


def sync_ingesting_stations_measurements(station_measurements):
    # Check InfluxDB Version
    if parameters['influxdb_version'] == '2.x':
        # Secrets
        # You can generate an API token from the "API Tokens Tab" in the UI
        url = credentials['influxdb']['url']
        token = credentials['influxdb']['token']
        org = credentials['influxdb']['org']
        bucket = credentials['influxdb']['bucket']
    elif parameters['influxdb_version'] == '1.8':
        # Secrets
        url = credentials['influxdb']['url']
        username = credentials['influxdb']['username']
        password = credentials['influxdb']['password']
        token = f'{username}:{password}'
        database = credentials['influxdb']['database']
        retention_policy = 'autogen'
        bucket = f'{database}/{retention_policy}'
        org = credentials['influxdb']['org']

    """
    Define Retry strategy - 3 attempts => 2, 4, 8
    """
    retries = WritesRetry(total=3, retry_interval=1, exponential_base=2)
    with InfluxDBClient(url=url, token=token, org=org, retries=retries) as client:

        """
        Use synchronous version of WriteApi to strongly depends on result of write
        """
        write_api = client.write_api(write_options=SYNCHRONOUS)

        """
        Prepare batches from generator
        """
        batches = rx \
            .from_iterable(station_measurements_to_generator(station_measurements)) \
            .pipe(ops.buffer_with_count(500))


        def write_batch(batch):
            """
            Synchronous write
            """
            print(f'Writing... {len(batch)}')
            write_api.write(bucket=bucket, record=batch)


        """
        Write batches
        """
        batches.subscribe(on_next=lambda batch: write_batch(batch),
                        on_error=lambda ex: print(f'Unexpected error: {ex}'),
                        on_completed=lambda: print('Import finished!'))


In [None]:
sync_ingesting_stations_measurements(stations_measurements['TANGARA_F1AE'])

In [None]:
sync_ingesting_stations_measurements(stations_measurements['TANGARA_EA06'])

In [None]:
sync_ingesting_stations_measurements(stations_measurements['TANGARA_FAC6'])

In [None]:
for station_id, station_measurements in stations_measurements.items():
    print(station_id)
    sync_ingesting_stations_measurements(station_measurements)

In [None]:
"""
import json
def get_json(entrada):
    for station_id, station_measurements in entrada.items():
        print(station_id)
        entrada[station_id] = station_measurements.to_json()
    return entrada

salida = get_json(stations_measurements.copy())
print(type(salida))
print(type(json.dumps(salida, indent = 3)))
#print('salida', salida)
#json.dumps(salida)
"""
# Save stations_measurements into Catalog
catalog.save('stations_measurements', stations_measurements)