In [1]:
import asyncio
import time
from pathlib import Path

import aiohttp
import pandas as pd

DATA_DIR = Path.cwd() / "json"
DATA_DIR.mkdir(parents=True, exist_ok=True)

SERVER_ERRORS = (
    aiohttp.ClientConnectionError,
    aiohttp.ClientResponseError,
)

class ErrorCounter:
    def __init__(self) -> None:
        self.count = 0
    
    def inc(self):
        self.count += 1
    
    def reset(self):
        self.count = 0
    
    def __str__(self):
        return str(self.count)


err_count = ErrorCounter()


class Logger:
    def __init__(self) -> None:
        self.t0 = time.time()
        self.tl = self.t0

    def __call__(self, msg: str, *args):
        tn = time.time()
        ts = f"{tn - self.t0:9.4f}s"
        if (tn - self.tl) < 0.1:
            ds = "<0.1s"
        else:
            ds = f"{tn - self.tl:4.1f}s"
        self.tl = tn

        msg = f"{ts} ({ds}): {' '.join([msg, *[str(a) for a in args]])}"
        print(msg)


async def fetch(client, url, name, log):
    async with client.get(url) as resp:
        try:
            if resp.status != 200:
                resp.raise_for_status()
            log(f"Read {url}.")
            
            resp_text = await resp.text()
            save_path = DATA_DIR / f"{name}.json"
            save_path.write_text(resp_text)

            return resp_text

        except SERVER_ERRORS:
            err_count.inc()
            return None
        

async def fetch_all(client, urls, names, log):
    tasks = []
    for url, name in zip(urls, names):
        task = asyncio.create_task(fetch(client, url, name, log))
        tasks.append(task)
    res = await asyncio.gather(*tasks)
    return res


async def read_json_from_hdruk_api(paths, names, log):
    URL = "https://phenotypes.healthdatagateway.org"
    urls =[f"/api/v1/public/{path}/?format=json" for path in paths]
    
    log(f"Reading {len(urls)} endpoints from {URL}...")

    async with aiohttp.ClientSession(base_url=URL) as client:
        responses = await fetch_all(client, urls, names, log)
    
    log("Finished reading.")
    
    return responses


def convert_responses(names, responses):
    data = {ep: pd.read_json(res) for ep, res in zip(names, responses) if res is not None}

    return data


async def read_hdruk_data(resource, ids = None, endpoint = ""):
    log = Logger()
    if ids is None:
        ids = [""]

    paths = []
    names = []
    data = {}
    for id in ids:
        path = f"{resource}/{id}/{endpoint}".strip("/")
        name = path.replace("/", "_")

        file_path = (DATA_DIR / f"{name}.json")
        if file_path.exists():
            data[name] = pd.read_json(file_path)
        else:
            paths.append(path)
            names.append(name)

    if paths:
        responses = await read_json_from_hdruk_api(paths, names, log)
        data.update(convert_responses(names, responses))
    else:
        log(f"All data scraped for {resource}/{endpoint}")

    return data


async def read_hdruk_lists():
    resources = [
        "phenotypes",
        "concepts",
        "coding-systems",
        "data-sources",
        "tags",
        "collections",
    ]
    data = {}
    for res in resources:
        res_data = await read_hdruk_data(res)
        data.update(res_data)

    return data


async def read_hdruk_data_with_retries(resource, ids = None, endpoint = "", max_retries=100):
    for i in range(1 + max_retries):
        try:
            return await read_hdruk_data(resource, ids, endpoint)
        except SERVER_ERRORS as exc:
            if i < max_retries:
                print(f"Server error: {exc}.")
                print(f"Counted {err_count} GET errors.")
                err_count.reset()

                print("Pausing for 1 minute...")
                time.sleep(60)
            else:
                raise exc
        
        

In [2]:
resources = await read_hdruk_lists()

   0.0960s (<0.1s): All data scraped for phenotypes/
   0.0397s (<0.1s): All data scraped for concepts/
   0.0027s (<0.1s): All data scraped for coding-systems/
   0.0148s (<0.1s): All data scraped for data-sources/
   0.0023s (<0.1s): All data scraped for tags/
   0.0024s (<0.1s): All data scraped for collections/


In [3]:
df = resources["phenotypes"]
print(df.phenotype_id.is_unique)
df.head()

True


Unnamed: 0,phenotype_id,version_id,phenotype_name,type,author,owner,tags,collections,clinical_terminologies,data_sources,versions
0,PH1,2,COVID-19 infection,Disease or Syndrome,BHF CVD COVID UK Consortium,ieuan.scanlon,[],"[{'description': 'BHF Data Science Centre', 'i...","[{'name': 'ICD10 codes', 'id': 4}, {'name': 'S...","[{'id': 1, 'name': 'GPES Data for Pandemic Pla...","[{'version_id': 2, 'version_name': 'COVID-19 i..."
1,PH2,4,Anxiety,Disease or Syndrome,"Matthew J Carr, Sarah Steeg, Roger T Webb, Nav...",ieuan.scanlon,[],"[{'description': 'Phenotype Library', 'id': 18...","[{'name': 'Read codes v2', 'id': 5}]","[{'id': 6, 'name': 'CPRD Aurum', 'url': 'https...","[{'version_id': 4, 'version_name': 'Anxiety', ..."
2,PH3,6,Depression,Disease or Syndrome,"Matthew J Carr, Sarah Steeg, Roger T Webb, Nav...",ieuan.scanlon,[],"[{'description': 'Phenotype Library', 'id': 18...","[{'name': 'Read codes v2', 'id': 5}]","[{'id': 6, 'name': 'CPRD Aurum', 'url': 'https...","[{'version_id': 6, 'version_name': 'Depression..."
3,PH4,8,Self Harm,Disease or Syndrome,"Matthew J Carr, Sarah Steeg, Roger T Webb, Nav...",ieuan.scanlon,[],"[{'description': 'Phenotype Library', 'id': 18...","[{'name': 'Read codes v2', 'id': 5}]","[{'id': 6, 'name': 'CPRD Aurum', 'url': 'https...","[{'version_id': 8, 'version_name': 'Self Harm'..."
4,PH5,1509,Cardiovascular Disease,Disease or Syndrome,"Ellie Paige, Jessica Barret, David Stevens, Ru...",ieuan.scanlon,[],"[{'description': 'Phenotype Library', 'id': 18...","[{'name': 'Read codes v2', 'id': 5}]","[{'id': 5, 'name': 'CPRD GOLD', 'url': 'https:...","[{'version_id': 1509, 'version_name': 'Cardiov..."


In [4]:
codes = await read_hdruk_data_with_retries("phenotypes", df.phenotype_id, "export/codes")

   4.3484s ( 4.3s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 0 GET errors.
Pausing for 1 minute...
   4.4523s ( 4.5s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 0 GET errors.
Pausing for 1 minute...
   4.2380s ( 4.2s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 2 GET errors.
Pausing for 1 minute...
   4.4855s ( 4.5s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 0 GET errors.
Pausing for 1 minute...
   4.2457s ( 4.2s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 2 GET errors.
Pausing for 1 minute...
   4.4820s ( 4.5s): Reading 17 endpoints from https://phenotypes.healthdatagateway.org...
Server error: Server disconnected.
Counted 0 GET errors

In [None]:
details = await read_hdruk_data_with_retries("phenotypes", df.phenotype_id, "detail")

In [None]:
# Not needed right now
# concepts = await read_hdruk_data("concepts", resources["concepts"].concept_id, "detail")