In [28]:
from typing import TypeVar, Generic, TypedDict, Protocol, Sequence, Mapping, Any, Union, Coroutine, Unpack
from abc import ABC, abstractmethod
import functools as ft
from dataclasses import dataclass
from haskellian import Either, Left
import asyncio
from pipeteer.pipelines import Pipeline, Wrapped, Workflow, DictWorkflow, Task
from pipeteer.queues import ReadQueue, WriteQueue, Queue, SimpleQueue, ops, QueueKV

T = TypeVar('T')

def cache(func):
  @ft.wraps(func)
  def wrapper(*args, **kwargs):
    return ft.cache(func)(*args, **kwargs)
  return wrapper

@cache
def get_queue(path: Sequence[str], type: type[T], /) -> Queue[T]:
  return QueueKV.sqlite(type, 'queues.sqlite', '-'.join(path) or 'Qin')

In [29]:
class MyPipeline(Task[str, str, Coroutine]):

  def __init__(self):
    super().__init__(str, str)

  async def run(self, Qs: Task.Queues[str, str]):
    Qin, Qout = Qs['Qin'], Qs['Qout']
    while True:
      id, x = (await Qin.read()).unsafe()
      print(f'Processing "{id}":', x)
      (await Qout.push(id, f'{x}!')).unsafe()
      (await Qin.pop(id)).unsafe()

In [4]:
class MyArtifacts(TypedDict):
  run_a: Coroutine
  run_b: Coroutine


class MyWorkflow(Workflow[str, str, Task.Queues[str, str], MyArtifacts]):

  class Queues(TypedDict):
    a: Task.Queues[str, str]
    b: Task.Queues[str, str]

  def __init__(self):
    super().__init__({
      'a': MyPipeline(),
      'b': MyPipeline(),
    })

  def run(self, Qs):
    return MyArtifacts(run_a=self.pipelines['a'].run(Qs['a']), run_b=self.pipelines['b'].run(Qs['b']))

In [5]:
wkf = Workflow.dict({
  'a': MyPipeline(),
  'b': MyPipeline(),
})

In [6]:
Qout: Queue[str] = get_queue(('Qout',), str)
Qs = wkf.connect(Qout, get_queue)

In [6]:
wpd = wkf.wrap(int, pre=str, post=lambda x, s: f'{s} {x}')

In [7]:
wpd.connect(Qout, get_queue)

{'Qwrapped': SimpleQueue(),
 'wrapped': {'a': {'Qin': SimpleQueue(wrapped/a),
   'Qout': prejoin(SimpleQueue(wrapped/a), SimpleQueue(wrapped/b))},
  'b': {'Qin': SimpleQueue(wrapped/b),
   'Qout': prejoin(SimpleQueue(wrapped/a), SimpleQueue(wrapped/b))}}}

In [8]:
Qs['a']['Qout']

prejoin(SimpleQueue(a), SimpleQueue(b))

In [9]:
wkf.run(Qs)

{'run_a': <coroutine object MyPipeline.run at 0x7facfefe1640>,
 'run_b': <coroutine object MyPipeline.run at 0x7facfee01840>}