In [1]:
import os
import requests
import pandas as pd
import geohash2

from io import StringIO
from pandas import DataFrame
from datetime import datetime, timezone, timedelta
from requests import Response
from dotenv import load_dotenv

# Explicitly providing path to '.env'
from pathlib import Path  # Python 3.6+ only
# Load .env variables
_ = load_dotenv(dotenv_path=f"{Path().resolve().parents[0]}/.env")

# Logger
from loguru import logger
logger.add(f"{Path().resolve().parents[0]}/logs/tangara.log", rotation="1 MB", retention="30 days")

1

## Utils

In [2]:
def to_timestamp(datetime_iso8601: str) -> int:
    """
    Datetime ISO 8601 Format to Timestamp
    TZ='America/Bogota' -05:00

    :params
    :datetime_iso8601: str, Datetime ISO 8601 Format

    :return: int, Timestamp

    :example
        - to_timestamp('2023-03-17T00:00:00-05:00')
            return: 1679029200000
    """
    timestamp = int(datetime.fromisoformat(datetime_iso8601).timestamp() * 1000)
    
    logger.debug("Run to_timestamp:")
    logger.debug(f"datetime_iso8601: {datetime_iso8601}, Timestamp: {timestamp}")
    
    return timestamp

In [3]:
def request_influxdb(sql_query: str) -> Response:
    """
    Request to InfluxDB API REST

    :params
    :sql_query: str, InfluxDB SQL query

    :return: Response, InfluxDB response as CSV text
    """
    endpoint = os.getenv("URL_INFLUXDB_QUERY_ENDPOINT", None)
    database = os.getenv("DB_NAME_INFLUXDB", None)
    parameters = {
        'db': database,
        'q': sql_query,
        'epoch': 'ms',
        'format': 'json'
    }
    # To get response as CSV text
    headers = {'Accept': 'application/json'}
    # GET Request
    response = requests.get(endpoint, params=parameters, headers=headers)
    
    logger.debug("Run request_influxdb:")
    logger.debug(f"response: {response}")
    
    return response

In [4]:
def query_tangaras(start_timestamp: int, end_timestamp: int) -> str:
    """
    Get InfluxDB SQL query of all Tangara sensors that have reported data over a period of time.

    :params:
    :start_timestamp: int, timestamp datetime value, ms
    :end_timestamp: int, timestamp datetime value, ms

    :return: str, InfluxDB SQL Query
    """
    # Period DateTime
    period_time = f"time >= {start_timestamp}ms AND time <= {end_timestamp}ms"
    # SQL
    sql_query = "SELECT DISTINCT(geo) AS \"geohash\" "\
                "FROM \"fixed_stations_01\" WHERE "\
                "(\"geo3\" = 'd29') AND "\
                f"{period_time} "\
                "GROUP BY \"name\";"
    
    logger.debug("Run query_tangaras:")
    logger.debug(f"sql_query: {sql_query}")
    
    return sql_query

In [5]:
def query_measure(mac_tangaras: [str], start_timestamp: int, end_timestamp: int, measure: str='pm25', group_by_time: str='30s') -> str:
    """
    Get InfluxDB SQL query for specific measure (datatype) and for each Tangara sensor identified by MAC address between a period of time.

    :params:
    :mac_tangaras: [str], Tangara sensor MAC address
    :start_timestamp: int, timestamp datetime value, ms
    :end_timestamp: int, timestamp datetime value, ms
    :measure: str, choice ['pm25', 'tmp', 'hum']
    :group_by_time: str, choice ['30s', '1m', '1h']

    :return: str, InfluxDB SQL Query
    """
    # Period DateTime
    period_time = f"time >= {start_timestamp}ms AND time <= {end_timestamp}ms"
    # SQL Datatype by Tangara Sensor
    sql_query = ""
    head = "last(" if group_by_time == '30s' else "mean("
    for mac in mac_tangaras:
        sql_query += f"SELECT {head}\"{measure}\") "\
                    "FROM \"fixed_stations_01\" WHERE "\
                    f"(\"name\" = '{mac}') AND "\
                    f"{period_time} " \
                    f"GROUP BY time({group_by_time}) fill(null); "
    sql_query = sql_query[:-2]
    
    logger.debug("Run query_measure:")
    logger.debug(f"sql_query: {sql_query}")
    
    return sql_query

In [6]:
def df_to_csv(df: DataFrame, filename: str, datafolder: str='0_raw') -> None:
    """
    Save DataFrame into data folder as a CSV file.
    datafolder: str, choice ['0_raw', '1_clean', '2_features', 'backup']

    :params:
    :df: DataFrame, pandas DataFrame
    :filename: str, CSV file name with extension .csv
    :datafolder: str, choice ['0_raw', '1_clean', '2_features', 'backup']
    """
    # Save DataFrame into CSV file
    path_datafolder=f"{Path().resolve().parents[0]}/data/{datafolder}"
    df.to_csv(f"{path_datafolder}/{filename}")
    
    logger.debug("Run df_to_csv:")
    logger.debug(f"Save DataFrame: {path_datafolder}/{filename}")

In [7]:
def df_from_csv(filename: str, datafolder: str='0_raw', dtindex: bool=True) -> DataFrame:
    """
    Load DataFrame from CSV file localted in data folder.
    datafolder: str, choice ['0_raw', '1_clean', '2_features', 'backup']

    :params:
    :filename: str, CSV file name with extension .csv
    :datafolder: str, choice ['0_raw', '1_clean', '2_features', 'backup']
    :dtindex: bool, default True, Does the CSV file include and DATETIME column?
    
    :return: df: DataFrame, pandas DataFrame
    """
    # Load DataFrame from CSV file
    path_csvfile=f"{Path().resolve().parents[0]}/data/{datafolder}/{filename}"
    df_data = pd.read_csv(path_csvfile)
    if dtindex:
        df_data = df_data.set_index('DATETIME')
        df_data.index = pd.to_datetime(df_data.index)
        df_data: DataFrame = df_data.tz_convert("America/Bogota")
    
    logger.debug("Run df_from_csv:")
    logger.debug(f"Load DataFrame: {path_csvfile}")
    
    return df_data

In [8]:
def df_tangara_sensors(start_timestamp: int, end_timestamp: int) -> DataFrame:
    """
    Get Data Frame Tangaras Sensors of all Tangara sensors that have reported data over a period of time.

    :params:
    :start_timestamp: int, timestamp datetime value, ms
    :end_timestamp: int, timestamp datetime value, ms

    :return: DataFrame, Tangaras Sensors
    """
    # Query Tangaras
    influxdb_sql_query_tangaras = query_tangaras(start_timestamp, end_timestamp)
    # print("influxdb_sql_query_tangaras:", influxdb_sql_query_tangaras)
    # InfluxDB API REST Request
    influxdb_request = request_influxdb(influxdb_sql_query_tangaras)
    # print("influxdb_request:", influxdb_request)
    # print("influxdb_request.text:", influxdb_request.text)

    # Data Frame Tangaras
    df_tangaras = pd.DataFrame([], columns=['ID','GEOHASH','MAC','GEOLOCATION','LATITUDE','LONGITUDE'])

    # For each time series
    for result in influxdb_request.json()['results']:
        # print("result:", result)
        if 'series' in result:
            for serie in result['series']:
                # Get the serie
                # print("serie:", serie)

                # DataFrame by statement_id and df_tangaras['ID']
                tags = serie["tags"]
                columns = serie["columns"]
                values = serie["values"]
                # print("tags:", tags)
                # print("columns:", columns)
                # print("values:", values)
                df_tangara = pd.DataFrame(values, columns=columns)

                # Remove/Add Columns
                df_tangara['MAC'] = tags['name']
                df_tangara['GEOLOCATION'] = df_tangara['geohash'].apply(lambda x: " ".join(str(value) for value in list(geohash2.decode_exactly(x)[0:2])))
                df_tangara['LATITUDE'] = df_tangara['GEOLOCATION'].apply(lambda x: x.split(' ')[0])
                df_tangara['LONGITUDE'] = df_tangara['GEOLOCATION'].apply(lambda x: x.split(' ')[1])
                df_tangara['ID'] = df_tangara['MAC'].apply(lambda x: f"TANGARA_{x[-4:]}")
                df_tangara.rename(columns={'geohash': 'GEOHASH'}, inplace=True)
                df_tangara = df_tangara[['ID','GEOHASH','MAC','GEOLOCATION','LATITUDE','LONGITUDE']]

                # print('df_tangara.head():', df_tangara.head())

                df_tangaras = pd.concat([df_tangaras, df_tangara], ignore_index=True)
    
    # Set Index
    df_tangaras.set_index('ID', inplace=True)
    
    buffer = StringIO()
    df_tangaras.info(buf=buffer)
    
    logger.debug("Run df_tangara_sensors:")
    logger.debug(f"Data Frame Tangaras Sensors: {buffer.getvalue()}")
    
    return df_tangaras

In [9]:
def df_data_sensors(df_tangaras: DataFrame, start_timestamp: int, end_timestamp: int, measure: str='pm25', group_by_time: str='30s') -> DataFrame:
    """
    Get Measure Data Frame Sensors of all Tangara sensors that have reported data over a period of time.
    
    :params:
    :df_tangaras: DataFrame, Tangaras DataFrame
    :start_timestamp: int, timestamp datetime value, ms
    :end_timestamp: int, timestamp datetime value, ms
    :measure: str, choice ['pm25', 'tmp', 'hum']
    :group_by_time: str, choice ['30s', '1m', '1h']

    :return: DataFrame, Measure Data Frame Sensors
    """
    # Data Frame Sensors List
    df_sensors_list: [DataFrame] = []
    # SQL Query Data Sensors
    influxdb_sql_query_measure = query_measure(df_tangaras['MAC'].to_list(), start_timestamp, end_timestamp, measure, group_by_time)# print("influxdb_sql_query_measure:", influxdb_sql_query_measure)
    # print("influxdb_sql_query_measure:", influxdb_sql_query_measure)
    # InfluxDB API REST Request
    influxdb_request = request_influxdb(influxdb_sql_query_measure)
    # print("influxdb_request:", influxdb_request)
    # print("influxdb_request.text:", influxdb_request.text)

    # For each time series
    for result in influxdb_request.json()['results']:
        if 'series' in result:
            # Get the series and the statement_id
            series = result['series'][0]
            statement_id = result.get('statement_id')
            # print("series:", series)
            # print("statement_id:", statement_id)

            # DataFrame by statement_id and df_tangaras['ID']
            columns = ["DATETIME", df_tangaras['ID'].to_list()[statement_id]]
            values = series["values"]
            df_sensor = pd.DataFrame(values, columns=columns)
            # Set Index on DATETIME
            df_sensor.set_index('DATETIME', inplace=True)
            # print('df_sensor.head():', df_sensor.head())

            # Append to df_sensors_list
            df_sensors_list.append(df_sensor)

    # Join all df_sensors into a single DataFrame
    df_sensors = df_sensors_list[0].join(df_sensors_list[1:]).reset_index()

    # Date Time ISO 8601 Format, TZ='America/Bogota' -05:00
    tz = timezone(timedelta(hours=-5))
    df_sensors['DATETIME'] = df_sensors['DATETIME'].apply(lambda x: datetime.fromtimestamp(int(x) / 1000, tz=tz).isoformat())
    df_sensors['DATETIME'] = pd.to_datetime(df_sensors['DATETIME'])

    # SET GROUP BY TIME
    GROUP_BY_TIME = {'30s': '30S', '1m': 'Min', '1h': 'H'}
    group_by_time = GROUP_BY_TIME[group_by_time]

    # Set Index
    df_sensors.set_index('DATETIME', inplace=True)
    df_sensors = df_sensors.asfreq(freq=group_by_time)
    df_sensors = df_sensors.tz_convert("America/Bogota")

    df_sensors[df_sensors.columns.to_list()] = df_sensors[df_sensors.columns.to_list()].astype('float64')
    
    buffer = StringIO()
    df_sensors.info(buf=buffer)
    
    logger.debug("Run df_data_sensors:")
    logger.debug(f"Measure Data Frame Sensors: {buffer.getvalue()}")
    
    return df_sensors

In [10]:
def to_file_name(prefix: str, start_datetime_iso8601: str, end_datetime_iso8601: str, extension: str = '.csv') -> str:
    # Convertir el texto en un objeto datetime
    start_datetime = datetime.strptime(start_datetime_iso8601, '%Y-%m-%dT%H:%M:%S%z')
    end_datetime = datetime.strptime(end_datetime_iso8601, '%Y-%m-%dT%H:%M:%S%z')
    
    # Formatear la fecha en un formato adecuado para archivos, incluyendo la zona horaria
    start_datetime = start_datetime.strftime('%Y-%m-%d_%H-%M-%S_UTC%z').replace(':', '-')
    end_datetime = end_datetime.strftime('%Y-%m-%d_%H-%M-%S_UTC%z').replace(':', '-')
    
    # Nombre archivo compatible
    file_name = f"{prefix}_{start_datetime}_{end_datetime}{extension}"
    
    logger.debug("Run to_file_name:")
    logger.debug(f"File Name: {file_name}")
    
    return file_name