# Client


In [None]:
import asyncio
import aiohttp

ws_url = "http://localhost:8080/ws"

async def receiver_task(ws):
    print("RECEIVER TASK")
    async for msg in ws:
        print("GOT", msg)

async with aiohttp.ClientSession() as session:
    # connect to websocket
    async with session.ws_connect(ws_url) as ws:
        print("CONNECTED")
        asyncio.create_task(receiver_task(ws))
        await asyncio.sleep(1)
        for i in range(10):
            await ws.send_str(f"Hello, world! {i}")
            await asyncio.sleep(0)

# Buffer

In [None]:
from io import StringIO
import json

sender = StringIO()

for i in range(5):
    event = { "topic": "test" }
    event["data"] = f"line {i}\nmore data"
    json.dump(event, sender)
    sender.write("\n")

print(sender.getvalue())
receiver = StringIO(sender.getvalue())
receiver.seek(0)

print(json.loads(receiver.readline())["data"])



# Bridge

In [23]:
from io import StringIO
import json
import random

class Bridge:

    def __init__(self, ws, *, max_delay=2, size_threshold=1000) -> None:
        self.ws = ws
        self.max_delay = max_delay
        self.size_threshold = size_threshold
        self.buffer = StringIO()
        self.lines = []
        asyncio.create_task(self.flush_task())

    async def emit(self, event: dict) -> None:
        json.dump(event, self.buffer)
        self.buffer.write("\n")
        if self.buffer.tell() > self.size_threshold:
            await self.flush()

    async def flush(self) -> None:
        data = self.buffer.getvalue()
        if data:
            self.buffer = StringIO()
            await self.ws.send_str(data)
        asyncio.create_task(self.flush_task())

    async def flush_task(self):
        if self.ws.closed:
            return
        await asyncio.sleep(self.max_delay)
        await self.flush()

    def __aiter__(self):
        return self
    
    async def __anext__(self):
        while not self.lines:
            # fetch next batch of data from websocket
            msg = await self.ws.receive()
            if msg.type != aiohttp.WSMsgType.TEXT:
                raise StopAsyncIteration
            self.lines = msg.data.split("\n")[:-1]
        return json.loads(self.lines.pop(0))

    


import asyncio
import aiohttp

ws_url = "http://localhost:8080/ws"

async def receiver_task(bridge):
    async for msg in bridge:
        print("GOT", msg.get("data"))

async with aiohttp.ClientSession() as session:
    # connect to websocket
    async with session.ws_connect(ws_url) as ws:
        bridge = Bridge(ws, max_delay=1, size_threshold=100)
        asyncio.create_task(receiver_task(bridge))
        for i in range(10):
            await bridge.emit({ "topic": "test", "data": f"Hello, world! {i}" })
            await asyncio.sleep(0.4)  

print("DONE")

GOT Hello, world! 0
GOT Hello, world! 1
GOT Hello, world! 2
GOT Hello, world! 3
GOT Hello, world! 4
GOT Hello, world! 5
GOT Hello, world! 6
GOT Hello, world! 7
GOT Hello, world! 8
GOT Hello, world! 9
DONE
