In [None]:
from tiled.client import from_uri
from tiled.structures.dataframe import deserialize_arrow

from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

import pymongo

import httpx

import pyarrow as pa
import pyarrow.parquet as pq

import asyncio
import itertools
import datetime
import pathlib
import getpass

# Benchmark

Benchmark different methods of fetching data

In [None]:
def deserialize_parquet(data):
  reader = pa.BufferReader(data)
  table = pq.read_table(reader)
  return table.to_pandas()

In [None]:
base = "https://aimm.lbl.gov"

In [None]:
def get_access_token():
  
  p = pathlib.Path("~/.config/tiled/tokens/aimm.lbl.gov/refresh_token").expanduser()
  with open(p, "r") as f:
    refresh_token = f.read()
  
  with httpx.Client() as client:
    request = client.build_request("POST", f"{base}/auth/token/refresh", json={"refresh_token" : refresh_token})
    r = client.send(request)
    access_token = r.json()["access_token"]
    
    return access_token

access_token = get_access_token()

In [None]:
def fetch_entries():
  entries = []
  
  access_token = get_access_token()
  with httpx.Client() as client:
    headers = {"Authorization" : f"Bearer {access_token}"}
    
    offset = 0
    limit = 100
    url = f"/entries/torrisi?page[offset]={offset}&page[limit]={limit}"
    while True:
      params = {}
      r = client.get(f"{base}{url}", params=params, headers=headers)
      assert r.status_code == 200
    
      for x in r.json()["data"]:
        # FIXME for some reason the url is http
        l = x["links"]["full"]
        l = "https" + l[4:]
        entries.append(l)
        
      links = r.json()["links"]
      next_url = links["next"]
      
      if next_url is None:
        break
      else:
        url = next_url
  
    return entries
  
%time entries = fetch_entries()

In [None]:
def read_sync(entries):
  
  data = []
  
  access_token = get_access_token()
  headers = {"Authorization" : f"Bearer {access_token}"}
  params = {} 
  
  with httpx.Client() as client:
    for e in tqdm(entries):
      r = client.get(f"{e}", params=params, headers=headers)
      assert r.status_code == 200
      
      df = deserialize_arrow(r.content)
      data.append(df)
      
  return data    

N = 1000
start = datetime.datetime.now()
data = read_sync(entries[:N])
elapsed = datetime.datetime.now() - start
rate = N / elapsed.total_seconds()
print(f"elapsed = {elapsed}, rate = {rate:0.2f} it/s")

In [None]:
async def read_entry(e, params, headers, client):
  r = await client.get(f"{e}", params=params, headers=headers)
  assert r.status_code == 200
  df = deserialize_arrow(r.content)
  return df

async def read_async(entries):  
  access_token = get_access_token()
  headers = {"Authorization" : f"Bearer {access_token}"}
  params = {}
  
  async with httpx.AsyncClient() as client:
    requests = [read_entry(e, params, headers, client) for e in entries]
    return await tqdm_asyncio.gather(*requests)

In [None]:
N = 1000
start = datetime.datetime.now()
data = await read_async(entries[:N])
# data = await read_async(entries)
elapsed = datetime.datetime.now() - start
rate = N / elapsed.total_seconds()
print(f"elapsed = {elapsed}, rate = {rate:0.2f} it/s")

In [None]:
mongopass = getpass.getpass()

In [None]:
client = pymongo.MongoClient(f"mongodb://root:{mongopass}@localhost:27017")
db = client["aimm"]
c = db["torrisi"]

In [None]:
def read_mongo():
  data = []
  
  N = c.count_documents({})
  # cursor = c.find({}, batch_size=1)
  cursor = c.find({})
  
  for doc in tqdm(cursor, total=N):
    df = deserialize_parquet(doc["data"]["blob"])
    data.append(df)
  
  return data

N = c.count_documents({})
start = datetime.datetime.now()
data = read_mongo()
elapsed = datetime.datetime.now() - start
rate = N / elapsed.total_seconds()
print(f"elapsed = {elapsed}, rate = {rate:0.2f} it/s")

In [None]:
def read_tiled(N):
  data = []
  
  client = from_uri("https://aimm.lbl.gov")
  
  c = client["torrisi"]
  
  for _, k in tqdm(zip(range(N), c), total=N):
    df = c[k].read()
    data.append(df)
    
  return data

N = 1000
start = datetime.datetime.now()
data = read_tiled(N)
elapsed = datetime.datetime.now() - start
rate = N / elapsed.total_seconds()
print(f"elapsed = {elapsed}, rate = {rate:0.2f} it/s")