### General API call flow

#### Steps

1. Producer = Prepare API request
   1. Get template from folder
   2. Get ANIME seed from seed folder
   3. One connecton per producer task
      1. Producer loop through page
      2. Wait result to arive
      3. check `hasNextPage=True`
      4. Repeat
   4. Render variables
      1. perPage = 50 (max allowed)
   5. Create mark source and target 
      1. To enable self healing feature
2. Consumer = Response parser and storage
   1. Fetch data from Queue
   2. Parse `data` key
   3. Set storage destinationjson -> 1 json per api call

In [8]:

from pathlib import Path
import aiohttp
import asyncio
import json
import duckdb 
import polars as pl
from datetime import datetime
import time


In [29]:
async def make_api_call(url:str, query:str, variables:dict):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json={'query': query, 'variables': variables}) as resp:
            result = await resp.json()
            header = resp.headers
            status_code = resp.status
            return result, resp
        
        
async def producer(queue, ids:list[int], url:str, query:str, duration_min:float):
    print("Producer: Started")
    remaining_request = 90
    for id in ids:
        start_time = time.time()
        variables = {  
            'media_id': id
        }

        print(f"Requesting page for {id}")
        result, resp = await make_api_call(url, query, variables)            
        header = resp.headers

        if resp.status != 200:
            print(f"Request fails at {id}")
            raise resp.raise_for_status()

        await queue.put((result, id))
        remaining_request = int(header.get('x-ratelimit-remaining'))
        print(f'Remaining API call per min: {remaining_request}')

        duration_ops = time.time() - start_time
        duration_to_waiting = duration_min - duration_ops

        if duration_to_waiting > 0:
            time.sleep(duration_to_waiting)
  

    print('Producer: Done')

In [30]:
async def save_api_to_local(file_path:str, data: dict):
    with open(file_path, 'w') as fp:
        json.dump(data, fp)

async def consumer(queue:asyncio.Queue, storage_path:Path):
    print("Consumer: Started")
    while True:

        response_body, id = await queue.get()
        filename = Path(f'{id}.json')
        file_path = storage_path / filename
        print(f'Saving {id} at {file_path.as_posix()}')
        await save_api_to_local(file_path=str(file_path), data=response_body)
        queue.task_done()
        print(f'Saving {id} completed')

In [44]:
# Configure log
pass

# Setup source
source_base_path = Path().cwd().parent
seed_path = source_base_path / Path('raw/seed/top-anime.csv')
seeds = duckdb.read_csv(str(seed_path))
sources = [row[0] for row in duckdb.sql('SELECT media_id FROM seeds').fetchall()]

# Setup Target
target_base_path = Path().cwd().parent
storage_path = target_base_path / Path('raw/entity/anime-studio')
targets = [int(filename.name.replace('.json', '')) for filename in storage_path.glob('*.json')]

# Setup Delta
delta = set(sources) - set(targets)
pl_run = pl.DataFrame()
pl_run = pl_run.with_columns([
    pl.Series(list(delta)).alias('delta'), 
    pl.lit(None).alias('status'),
    pl.lit(None).alias('updated')
])

df_run = duckdb.sql('SELECT * FROM pl_run')

# Configure Pipeline Run Batch Size
BATCH_SIZE = 50
start_pos = 0
end_pos = df_run.shape[0]

print(f"Total items: {end_pos - start_pos} in {int((end_pos - start_pos) / BATCH_SIZE)} batches with size ({BATCH_SIZE})")

Total items: 0 in 0 batches with size (50)


In [42]:
# Setup Producer Required Data
MAX_API_PER_MIN = 90
DURATION_PER_API_CALL = round(60 / MAX_API_PER_MIN, 1)

url = 'https://graphql.anilist.co'
template_path = Path(r'graphql-template\get-anime-studio.graphql')
with open(str(template_path), 'r') as fp:
    template = fp.read()

# Setup Consumer Required Data
pass

In [None]:
for batch_pos in range(start_pos, end_pos, BATCH_SIZE):
    batches_delta = duckdb.sql(f"""
    SELECT delta 
        FROM df_run
        LIMIT {BATCH_SIZE}
        OFFSET {batch_pos}
    """).fetchall()
    ids = [_[0] for _ in batches_delta]

    print(f"Batch: {batch_pos}")

    # Do work here
    queue = asyncio.Queue(BATCH_SIZE)
    # start the consumer
    _ = asyncio.create_task(consumer(queue, storage_path))
    # start the producer and wait for it to finish
    await asyncio.create_task(producer(queue, ids, url, template, DURATION_PER_API_CALL))
    # wait for all items to be processed
    await queue.join()

    # Update run table
    completion_dt = datetime.now()
    srs_updated = [completion_dt for i in ids]
    srs_status = ['Completed' for i in ids]
    
    pl_delta = pl.DataFrame()
    pl_delta = pl_delta.with_columns([
        pl.Series(ids).alias('delta'), 
        pl.Series(srs_status).alias('status'),
        pl.Series(srs_updated).alias('updated')
    ])
    pl_run.update(pl_delta, on='delta')