# Playground

A safe space for experimentation.

In [118]:
%load_ext autoreload
%autoreload 2

import os
import schedule
import requests
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from etl_db_manager import ETLDataBaseManager
from etl_task import ETLTask, ETLTaskFunction
from utility import base64encode_obj, base64decode_obj

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Load Data

In [119]:
def fetch_data_airnoise(sensor_id:str, time_start:datetime, time_end:datetime):
    """ Fetches air/noise data from the API
        at: https://data.smartdublin.ie/sonitus-api. 
        @param sensor_id: Serial number of the sensor.
        @param time_start: Time range starting point.
        @param time_end: Time range ending point.
        @return: Data from API or [].
    """
    data = []
    try:
        res = requests.post(f"https://data.smartdublin.ie/sonitus-api/api/data", json={ 
            'username': "dublincityapi",
            'password': "Xpa5vAQ9ki",
            'monitor': sensor_id,
            'start': time_start.timestamp(),
            'end': time_end.timestamp()
        })
        data = res.json()
    except Exception as e:
          raise print(f'Failed to fetch air noise data from source: {e}')
    return data

In [120]:
def load_data_noise():
    """ Loads last 30 mins worth of noise data 
        from multiple noise sensors.
    """
    dt_now = datetime.now() # Time now.
    dt_past = dt_now - timedelta(minutes=30) # Time 30 mins ago.
    # Fetch data from multiple sensors.
    data_sensors = {"01749":[], "01508":[], "10118":[], "01548":[], "10115":[]}
    for sensor in data_sensors.keys():
        data_sensors[sensor] = fetch_data_airnoise(
            sensor_id=sensor, 
            time_start=dt_past, 
            time_end=dt_now
        )
    return data_sensors

In [121]:
def load_data_bikes():
    """ Loads last 30 min snapshot of dublin bike stands. """
    data = []
    try:
        res = requests.get(f"http://localhost:8000/bikes/snapshot/")
        data = res.json()['data']
        print(res.json()['message'])
    except Exception as e:
          print(f'Failed to fetch dublin bikes data from source: {e}')
          raise Exception(f'Failed to fetch dublin bikes data from source: {e}')
    return data

In [83]:
# # Load tasks.
# data_noise_30min = load_data_noise()
# data_bikes_30min = load_data_bikes()

## Save Data

In [122]:
def save_data_bikes(data):
    """ Saves given data to csv file. """
    try:
        res = requests.post(
            url="http://localhost:8000/bikes/snapshot/", 
            json={'snapshot':data}
        )
        print(res.json()['message'])
    except Exception as e:
        print(f'Failed to save bikes data. {e}')
        raise Exception(f'Failed to save bikes data. {e}')

In [84]:
# save_data_bikes(data=data_bikes_30min)

## Schedule Load Save

In [123]:
def load_data_bikes():
    """ Loads last 30 min snapshot of dublin bike stands. """
    data = []
    try:
        res = requests.get(f"http://localhost:8000/bikes/snapshot/")
        data = res.json()['data']
        print(res.json()['message'])
    except Exception as e:
          data = []
          print(f'Failed to fetch dublin bikes data from source: {e}')
          raise Exception(f'Failed to fetch dublin bikes data from source: {e}')
    return data

def save_data_bikes(data):
    """ 
    Saves given data to csv file. 
    @param data: Data to be saved.
    """
    try:
        res = requests.post(
            url="http://localhost:8000/bikes/snapshot/", 
            json={'snapshot':data}
        )
        print(res.json()['message'])
    except Exception as e:
        print(f'Failed to save bikes data. {e}')
        raise Exception(f'Failed to save bikes data. {e}')

In [125]:
data = load_data_bikes()
save_data_bikes(data)

Success. Bike stations last 30 mins snapshot fetched from DCC API.
Success. Saved snapshot of bike data for every station.


In [167]:
# Toy data load and save functions.
def load_data(): 
    print(f'Loaded data.')
    return [1, 2, 3]

def save_data(data): 
    print(f'Saved data.')

In [126]:
def make_post_request(url, data={}):
    """
    Makes a post request.
    @param url: URL to post to.
    @data: Request body data.
    @return: Response.
    """
    response = requests.post(url, params=data)
    return {'status': response.status_code, 'message': response.text}

def make_get_request(url, data={}):
    """
    Makes a post request.
    @param url: URL to post to.
    @data: Request body data.
    @return: Response.
    """
    response = requests.get(url, params=data)
    return {'status': response.status_code, 'message': response.text}

def make_delete_request(url, data={}):
    """
    Makes a delete request.
    @param url: URL to post to.
    @data: Request body data.
    @return: Response.
    """
    response = requests.delete(url, params=data)
    return {'status': response.status_code, 'message': response.text}

def make_put_request(url, task_name, new_values):
    """
    Makes a put request.
    @param url: URL to post to.
    @data: Request body data.
    @return: Response.
    """
    response = requests.put(
        url=url, 
        params={"task_name": task_name}, 
        json=new_values
    )
    return {'status': response.status_code, 'message': response.json()}

In [168]:
db_man = ETLDataBaseManager(db_name="db_etl", db_path=".")

Database './db_etl.db' available :) .


In [169]:
db_man.query(q="SELECT * FROM etl_tasks WHERE status='scheduled'", params=[])

In [170]:
# Create a new task.
task = ETLTask(
    name='bikes',
    fun_data_load=load_data_bikes,
    fun_data_save=save_data_bikes,
    repeat_time_unit='minutes',
    repeat_interval=30
)
task_str = base64encode_obj(task)
# make_post_request(url="http://127.0.0.1:8003/task", data={"task_str": task_str})

In [186]:
# Create a new task.
task = ETLTask(
    name='test',
    fun_data_load=load_data,
    fun_data_save=save_data,
    repeat_time_unit='seconds',
    repeat_interval=3
)
task_str = base64encode_obj(task)
make_post_request(url="http://127.0.0.1:8003/task", data={"task_str": task_str})

{'status': 200,
 'message': '{"status":200,"message":"Success. Task created and scheduled test.","data":[]}'}

In [185]:
# Delete a task.
make_delete_request(url="http://127.0.0.1:8003/task", data={'task_name': 'test'})

{'status': 200,
 'message': '{"status":200,"message":"Success. Deleted task test.","data":[]}'}

In [178]:
# Start scheduler.
make_get_request(url="http://127.0.0.1:8003/start_scheduler")

{'status': 200,
 'message': '{"status":200,"message":"Scheduler started.","data":[]}'}

In [188]:
# Stop scheduler.
make_get_request(url="http://127.0.0.1:8003/stop_scheduler")

{'status': 200,
 'message': '{"status":200,"message":"Scheduler stopped.","data":[]}'}

In [202]:
make_put_request(
    url="http://127.0.0.1:8003/task", 
    task_name="task1",
    new_values={
        'status': 'scheduled',
        "num_runs": 0
    }
)

{'status': 200,
 'message': {'status': 200,
  'message': "Success. Status of task task1 updated with new values {'status': 'scheduled', 'num_runs': 0}.",
  'data': []}}

In [187]:
# Stop task
make_put_request(
    url="http://127.0.0.1:8003/task/stop", 
    task_name="test",
    new_values={}
)

{'status': 200,
 'message': {'status': 200,
  'message': 'Success. Task test has been stopped.',
  'data': []}}

In [167]:
requests.put(url="http://127.0.0.1:8003/task", 
    params={"task_name": "task1"}, 
    json={
        'status': 'running',
        'time_run_last_start': datetime.now().isoformat(),
        'time_run_last_end': datetime.now().isoformat(),
        "num_runs": 200
    }
).json()

{'status': 200,
 'message': "Success. Status of task task1 updated with new values {'status': 'running', 'time_run_last_start': '2024-03-16T00:19:29.088353', 'time_run_last_end': '2024-03-16T00:19:29.088353', 'num_runs': 200}.",
 'data': []}