In [None]:
from api_key import API_KEY
from datetime import datetime as dt
from math import sqrt
from queue import Queue
from random import gauss, sample
from time import sleep

import json
import requests

In [None]:
DB_URL = "https://pigeonblog-db-default-rtdb.firebaseio.com"
AUTH_URL = "https://identitytoolkit.googleapis.com/v1/accounts:signUp"

PATH = {
  "LAT": {
    "start": 34.0637,
    "end": 33.8007,
  },
  "LON": {
    "start": -118.3591,
    "end": -117.8840,
  },
}

GASES = {
  "aqi": { "min": 1, "max": 5, "mean": 2, "std": 0.5 },
  "co2": { "min": 400, "max": 3000, "mean": 400, "std": 100 },
  "tvocs": { "min": 0, "max": 5000, "mean": 100, "std": 20 },
  "co": { "min": 0, "max": 100, "mean": 50, "std": 15 },
  "eth": { "min": 0, "max": 100, "mean": 50, "std": 15 },
  "h2": { "min": 0, "max": 100, "mean": 50, "std": 15 },
  "meth": { "min": 0, "max": 100, "mean": 50, "std": 15 },
}

SENSORS = {
  "temperature": { "min": 10, "max": 35, "mean": 20, "std": 2 },
  "pressure": { "min": 750, "max": 800, "mean": 775, "std": 4 },
  "altitude": { "min": 2, "max": 1000, "mean": 100, "std": 10 },
}

def clamp(v, vmin, vmax):
  return int(max(min(vmax, v), vmin) // 1) if vmin == 1 else max(min(vmax, v), vmin)

def lerp(vmin, vmax, t):
  return vmin + t * (vmax - vmin)

def get_sample(d):
  return round(clamp(gauss(d["mean"], d["std"]), d["min"], d["max"]), 4)

In [None]:
C1 = 1.70158
C3 = C1 + 1

EASINGS = {
  "backIn": lambda x: C3 * x ** 3 - C1 * x ** 2,
  "backOut": lambda x: 1 + C3 * (x - 1) ** 3 + C1 * (x - 1) ** 2,
  "circIn": lambda x: 1 - sqrt(1 - x ** 2),
  "circOut": lambda x: sqrt(1 - (x - 1) ** 2),
  "quadIn": lambda x: x ** 2,
  "quadOut": lambda x: 1 - (1 - x) ** 2,  
  "quinIn": lambda x: x ** 5,
  "quinOut": lambda x: 1 - (1 - x) ** 5,  
  "lInOut": lambda x: x,
  "quadInOut": lambda x: 2 * x ** 2 if x < 0.5 else 1 - pow(-2 * x + 2, 2) / 2,
  "quinInOut": lambda x: 16 * pow(x, 5) if x < 0.5 else 1 - pow(-2 * x + 2, 5) / 2,
}

FUNS = sample([f for f in EASINGS.values()], k=len(EASINGS))

### Auth

In [None]:
auth_url = f"{AUTH_URL}?key={API_KEY}"
res = requests.post(auth_url, json={"returnSecureToken": True})
if res.status_code == 200:
  token = json.loads(res.text)["idToken"]
  print("ok", len(token))

post_url = f"{DB_URL}/measurements.json?auth={token}"

### Test Data

In [None]:
num_pigeons = 5
p_list = list(range(num_pigeons))

p_infos = [
  {
    "LAT": {
      "start": gauss(PATH["LAT"]["start"], 0.01),
      "end": gauss(PATH["LAT"]["end"], 0.001),
      "fun": FUNS[p],
    },
    "LON": {
      "start": gauss(PATH["LON"]["start"], 0.01),
      "end": gauss(PATH["LON"]["end"], 0.001),
      "fun": lambda x: x,
    },
  } for p in p_list
]

def get_loc(loc_info, t):
  loc_t = loc_info["fun"](t) + gauss(0, .01)
  return round(lerp(loc_info["start"], loc_info["end"], loc_t), 4)

In [None]:
ts_date = "2024/03/02 18:30:00 +0000"
ts_date = dt.strptime(ts_date, "%Y/%m/%d %H:%M:%S %z")
ts = int(ts_date.timestamp() * 1000)

In [None]:
data = []
dataq = Queue()

n_points = 64

for m in range(n_points):
  t = m / (n_points - 1)
  for p in p_list:
    sensors = { s: get_sample(d) for s,d in GASES.items() }
    datapack = {
      "pigeon": p,
      # "timestamp": {".sv": "timestamp"},
      "timestamp": ts,
      "lat": get_loc(p_infos[p]["LAT"], t),
      "lon": get_loc(p_infos[p]["LON"], t),
      "sata": 0,
      "satb": 0,
      "temp": get_sample(SENSORS["temperature"]),
      "pres": get_sample(SENSORS["pressure"]),
      "alti": get_sample(SENSORS["altitude"]),
      "local": int(dt.now().timestamp()),
      **sensors,
    }
    data.append(datapack)

    sensors = { s: get_sample(d) for s,d in GASES.items() }
    datapack = {
      **datapack,
      "lat": get_loc(p_infos[p]["LAT"], 1 - t),
      "lon": get_loc(p_infos[p]["LON"], 1 - t),
      "temp": get_sample(SENSORS["temperature"]),
      "pres": get_sample(SENSORS["pressure"]),
      "alti": get_sample(SENSORS["altitude"]),
      **sensors,
    }
    data.append(datapack)

for d in sorted(data, key=lambda x: x["lon"]):
  dataq.put(d)

In [None]:
while not dataq.empty():
  datapack = dataq.get()
  res = requests.post(post_url, json=datapack)
  sleep(0.1)

### clean db: delete chosen range by index

In [None]:
import threading
import queue

class Worker(threading.Thread):
  def __init__(self, q, *args, **kwargs):
    self.q = q
    super().__init__(*args, **kwargs)
  def run(self):
    while True:
      try:
        id = self.q.get_nowait()
        del_url = f"{DB_URL}/measurements/{id}.json?auth={token}"
      except queue.Empty:
        return

      try:
        requests.delete(del_url, timeout=1)
        sleep(0.1)
      except Exception as e:
        print("E", e)
      else:
        self.q.task_done()

In [None]:
get_url = f'{DB_URL}/sessions.json?shallow=true'
res = requests.get(get_url)
res_obj = json.loads(res.content)
unique_days = res_obj.keys()
unique_days_array = sorted(unique_days)
[x for x in map(lambda x: dt.utcfromtimestamp(int(x)//1000).strftime("%Y-%m-%d"), unique_days_array)]

In [None]:
first_day_to_delete_idx = 100
last_day_to_delete_idx = 100

first_day_start = int(unique_days_array[first_day_to_delete_idx])
last_day_start = int(unique_days_array[last_day_to_delete_idx])

last_day_end = last_day_start + 24 * 60 * 60 * 1000 - 1000

get_url = f'{DB_URL}/measurements.json?orderBy="timestamp"&startAt={first_day_start}&endAt={last_day_end}'
res = requests.get(get_url)
res_obj = json.loads(res.content)

print(len(res_obj))

q = queue.Queue()
for id in sorted(res_obj.keys()):
  q.put_nowait(id)

[Worker(q).start() for _ in range(4)]

q.join()

### OLD: Nested schema

In [None]:
session_id = dt.now().strftime("%Y%m%d")
pigeon_id = 0

sensors = { s: get_sample(d) for s,d in GASES.items() }
latm = gauss(mu=PATH["LAT"]["start"], sigma=0.3)
lonm = gauss(mu=PATH["LON"]["start"], sigma=0.3)

measurement = {
  "timestamp": {".sv": "timestamp"},
  "lat": round(gauss(mu=latm, sigma=0.01), 4),
  "lon": round(gauss(mu=lonm, sigma=0.01), 4),
  **sensors,
}

post_url = f"{DB_URL}/sessions/{session_id}/pigeons/{pigeon_id}/measurements.json?auth={token}"
res = requests.post(post_url, json=measurement)
print(res.text)