In [2]:
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
import json
import ast
import logging
import pandas as pd
import os
import logging

## EPA tests

In [18]:
def clean_kafka_data(data : dict) -> pd.DataFrame | None :
    """
    Clean the data stored to a list of records, only keeping seeked columns

    @returns a list of data dictionaries
    """
    cleaned = []
    # Go thorugh each record returned
    if 'records' not in data.keys():
        return None
    
    for record in data['records']:
        # Pull out location
        if 'geometry' not in record.keys():
            continue
        if 'coordinates' not in record['geometry'].keys():
            continue
        location = record['geometry']['coordinates']
        # Then get the parameters
        if 'parameters' not in record.keys():
            continue
        parameters = record['parameters']
        for parameter in parameters:
            # Pull out name
            if 'name' not in parameter.keys():
                continue
            name = parameter['name']
            if 'timeSeriesReadings' not in parameter.keys():
                continue
            for series in parameter['timeSeriesReadings']:
                # Get all the readings
                if 'readings' not in series.keys():
                    continue
                for reading in series['readings']:
                    # Check for required data
                    cont = True
                    for key in ['since', 'until', 'averageValue']:
                        if key not in reading.keys():
                            cont = False
                    if not cont:
                        continue
                    # Build dictionary and append
                    toAdd = {}
                    toAdd['measure_name'] = name
                    toAdd['location'] = location
                    toAdd['start'] = datetime.strptime(
                        reading['since'], '%Y-%m-%dT%H:%M:%SZ')
                    toAdd['end'] = datetime.strptime(
                        reading['until'], '%Y-%m-%dT%H:%M:%SZ')
                    toAdd['value'] = reading['averageValue']
                    cleaned.append(toAdd)

    df_new_data = pd.DataFrame.from_records(cleaned, index=range(len(cleaned)))

    df_new_data['start'] = df_new_data['start'].dt.strftime('%Y-%m-%dT%H:%M:%S')
    df_new_data['end'] = df_new_data['end'].dt.strftime('%Y-%m-%dT%H:%M:%S')

    df_new_data['start'] = df_new_data['start'].apply(lambda s: datetime.strptime(s, '%Y-%m-%dT%H:%M:%S'))
    df_new_data['end'] = df_new_data['end'].apply(lambda s: datetime.strptime(s, '%Y-%m-%dT%H:%M:%S'))

    # Switch coordinate order for new data and move to tuple
    df_new_data['location'] = df_new_data['location'].apply(
        lambda location: (location[1], location[0]))
    
    return df_new_data

In [19]:
clean_kafka_data({'not_records':0}) is None

True

In [20]:
data = {'records':[{'geometry': {'coordinates':('long','lat')},
                    'parameters':[{'name':'particule', 
                                  'timeSeriesReadings':[{'readings':[{'since':'2024-01-01T00:00:00Z',
                                                                     'until':'2024-01-01T01:00:00Z',
                                                                     'averageValue':'value'}]}]}]}]}
str(clean_kafka_data(data).to_dict(orient='records'))

"[{'measure_name': 'particule', 'location': ('lat', 'long'), 'start': Timestamp('2024-01-01 00:00:00'), 'end': Timestamp('2024-01-01 01:00:00'), 'value': 'value'}]"

In [22]:
result_string = "[{'measure_name': 'particule', 'location': ('lat', 'long'), 'start': Timestamp('2024-01-01 00:00:00'), 'end': Timestamp('2024-01-01 01:00:00'), 'value': 'value'}]"

str(clean_kafka_data(data).to_dict(orient='records')) == result_string

True

In [17]:
def accepting_new_data(new_data: pd.DataFrame, current_data: pd.DataFrame) -> pd.DataFrame:
    """
    Compute which data to keep and upload, based on time range inclusion, 
    to prevent duplicatas

    @param new_data is the data pulled from the EPA as a DataFrame
    @param current_data is the data in elastic search as a DataFrame
    @returns a list of what data needs to be inserted
    """
    latest_current_df = current_data.groupby(['measure_name', 'location'])['end'].max()
    kept_data = new_data.copy()

    for index in new_data.index:

        name = new_data.loc[index, 'measure_name']
        # Need to convert coorinates to tuple
        location = (new_data.loc[index, 'location'][0],
                    new_data.loc[index, 'location'][1])

        # Check for collisions
        if name in latest_current_df.index:
            if location in latest_current_df[new_data.loc[index, 'measure_name']].index:
                if new_data.loc[index, 'end'] <= latest_current_df[new_data.loc[index, 'measure_name']][new_data.loc[index, 'location']]:
                    kept_data = kept_data.drop(index, axis='index')
    return kept_data

In [23]:
current_records =  [{'measure_name': 'P1', 'location': (0,0), 'end': 0},
                    {'measure_name': 'P1', 'location': (0,0), 'end': 1},
                    {'measure_name': 'P1', 'location': (1,1), 'end': 1},
                    {'measure_name': 'P2', 'location': (0,0), 'end': 1},
                    {'measure_name': 'P2', 'location': (1,1), 'end': 1}]
new_records =  [{'measure_name': 'P1', 'location': (0,0), 'end': 2},
                {'measure_name': 'P2', 'location': (0,0), 'end': 1},
                {'measure_name': 'P2', 'location': (1,1), 'end': 2}]
current_data = pd.DataFrame(current_records)
new_data = pd.DataFrame(new_records)

accepting_new_data(new_data,current_data)

Unnamed: 0,measure_name,location,end
0,P1,"(0, 0)",2
2,P2,"(1, 1)",2


In [24]:
result_string = str(accepting_new_data(new_data,current_data).to_dict(orient='records'))

"[{'measure_name': 'P1', 'location': (0, 0), 'end': 2}, {'measure_name': 'P2', 'location': (1, 1), 'end': 2}]"

In [None]:
str(accepting_new_data(new_data,current_data).to_dict(orient='records')) == result_string