In [4]:
!pip install asyncio
!pip install aiohttp

Collecting aiohttp
  Downloading aiohttp-3.8.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m0m
[?25hCollecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.3.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (149 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.6/149.6 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.8.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (263 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m264.0/264.0 kB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting aiosignal>=1.1.2
  Dow

In [5]:
import asyncio
from queue import Queue
from time import time
from collections import Counter
import os
import json

import aiohttp

In [6]:
CALLS_PER_SECOND = os.getenv('CALLS_PER_SECOND', default=2)

In [7]:
BASE_URL = 'https://api.dbpedia-spotlight.org/en'
ENDPOINT_URL = f'{BASE_URL}/annotate'

In [8]:
write_queue = Queue()

stats = Counter()

In [9]:
class SimpleLimiter:
    """
    The reasoning for this is that even though the requests are scheduled sequentially,
    they're executed asynchronously and the rate may exceed what is considered good-natured API usage.
    It is only sufficient for a single scheduler use-case.
    """

    def __init__(self, calls_per_second: float):
        self.delay = (1 / calls_per_second) if calls_per_second else None
        self.next = 0

    async def __aenter__(self):
        if not self.delay:
            return
        t = time()
        if t < self.next:
            await asyncio.sleep(self.next - t)
        self.next = time() + self.delay

    async def __aexit__(self, *_):
        pass

In [10]:
def to_file():
    with open("imgflip.spotlight.json", "a", encoding='utf8') as f:
        f.write("{\n")
        while True:
            data = write_queue.get()
            if data is None:
                break
            f.write(f"\"{data[0]}\": {json.dumps(data[1], ensure_ascii=False)},\n")
            stats['written'] += 1
        f.write("}")

In [16]:
def texts():
    with open('imgflip.cleaned.json', 'r', encoding='utf8') as f:
        for item in json.load(f):
            url = item['URL'] #we use the instance id
            text = item['alt_text'].replace('|',"")
            stats['texts_read'] += 1
            yield url, text





In [12]:
async def printer():
    while True:
        msg = '\t'.join(f"{key}:\t{value}" for key, value in stats.items())
        print(f"\r{msg}", end='')
        await asyncio.sleep(1)

In [13]:
async def main():
    limiter = SimpleLimiter(CALLS_PER_SECOND)
    client = aiohttp.ClientSession()

    _to_file = asyncio.create_task(asyncio.to_thread(to_file))
    _printer = asyncio.create_task(printer())

    async def _fetch(url, text):
        stats['requests'] += 1
        for _ in range(5):
            response = await client.get(ENDPOINT_URL,
                                        params={"text": text,  # more configuration?
                                                'confidence': 0.5
                                                },
                                        headers={'Accept': 'application/json'}
                                        )
            if response.status == 200:
                write_queue.put((url, await response.json()))
                break
        else:
            print("\nFAILED:", url)
            stats['failed'] += 1
        stats['requests'] -= 1

    coros = []
    for url, text in texts():
        async with limiter:
            coros.append(asyncio.create_task(_fetch(url, text)))

    print("\nall done")
    await asyncio.gather(coros)
    print("\ngathered")
    write_queue.put(None)
    print("\npoisoned")
    await _to_file
    print("\nwriter done")
    _printer.cancel()
    print("\nprinter canceled")
    await client.close()

In [18]:
await main()

texts_read:	182	requests:	0	written:	181

AttributeError: 'NoneType' object has no attribute 'replace'

texts_read:	182	requests:	0	written:	182