In [None]:
# | default_exp domain.lobby

In [None]:
# | export
from meme_games.core import *
from meme_games.domain.user import *

In [None]:
#| export
logger = logging.getLogger(__name__)

In [None]:
#| export
@dataclass
class GameData(Model):
    member_id: str = None

In [None]:
# | export

@dataclass
class LobbyMember(fc.GetAttr, Model):
    """Represents a member in a lobby, usually associated with a websocket connection."""
    _lobby_type = 'basic'
    _ignore = ('user', 'send', 'ws')
    _default = 'user'

    user: User = None
    user_uid: str = None
    lobby_id: str = None
    is_player: bool = False
    is_host_: bool = False
    joined_at: dt.datetime = field(default_factory=dt.datetime.now)
    score: int = 0
    id: str = field(default_factory=lambda: random_id(8))

    send: Optional[FunctionType] = None
    ws: Optional[WebSocket] = None

    def __post_init__(self):
        if self.user: self.user_uid = self.user.uid
        if isinstance(self.joined_at, str):
            self.joined_at = dt.datetime.fromisoformat(self.joined_at)

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        assert cls._lobby_type != LobbyMember._lobby_type, f"You must set different `_lobby_type` from '{LobbyMember._lobby_type}'"
        LOBBY_REGISTRY[cls._lobby_type] = cls

    def spectate(self): 
        """Sets the member to be a spectator."""
        self.is_player = False

    def play(self):
        """Sets the member to be a player."""
        self.joined_at = dt.datetime.now()
        self.is_player = True

    def connect(self, send: FunctionType, ws: Optional[WebSocket] = None): 
        """Connects the member with a websocket."""
        self.send, self.ws = send, ws
    def disconnect(self): 
        """Disconnects the member."""
        self.send, self.ws = None, None
    def add_score(self, score: int): 
        """Adds a score to the member."""
        self.score += score
    def reset_score(self): 
        """Resets the member's score."""
        self.score = 0
    def sync_user(self, u: User): 
        """Synchronizes user data for the member."""
        self.user = u

    @property
    def is_connected(self): 
        """Checks if the member is connected."""
        return self.send is not None
    @property
    def is_host(self): 
        """Checks if the member is the host."""
        return self.is_host_

    def __eq__(self, other: Union[User, 'LobbyMember']): return self.uid == other.uid

    @classmethod
    def from_dict(cls, data: dict):
        data = cols2dict(data)
        data[cls]['user'] = User.from_dict(data.pop(User))
        return super().from_dict(data[cls])

    @classmethod
    def convert(cls, member: Optional['LobbyMember'] = None):
        """Converts a member to a different lobby type."""
        cf = dataclasses.fields(cls)
        if member: return cls(**{f.name: getattr(member, f.name) for f in dataclasses.fields(member) if f in cf})

    def update_user(self, u: User): 
        """Updates the user associated with this member."""
        # TODO the same as sync_user
        self.user = u


# maps lobby names to memberTypes
LOBBY_REGISTRY = {
    LobbyMember._lobby_type: LobbyMember
}

In [None]:
# | export


@dataclass
class Lobby[T: LobbyMember](Model):
    """Represents a game lobby."""
    _ignore = ('members', 'host', 'game_state')
    
    id: str = field(default_factory=random_id)
    lobby_type: str = LobbyMember._lobby_type
    locked: bool = False
    background_url: Optional[str] = None
    host: Optional[T] = None
    members: dict[str, T] = field(default_factory=dict)
    last_active: dt.datetime = field(default_factory=dt.datetime.now)
    game_state: Optional[Any] = None
    

    def __post_init__(self):
        if self.host: self.host_uid = self.host.uid
        if isinstance(self.last_active, str): 
            self.last_active = dt.datetime.fromisoformat(self.last_active)
            
    def __getattr__(self, name): return getattr(self.game_state, name)

    def sorted_members(self):
        '''lobby members sorted by `joined_at` date'''
        for m in sorted(self.members.values(), key=lambda m: m.joined_at): yield m

    def set_host(self, member: T): 
        """Sets a member as the lobby host."""
        if self.host: self.host.is_host_=False
        member.is_host_ = True
        self.host = member

    def create_member(self, user: User, send: FunctionType = None, **kwargs) -> T:
        '''Create a new member for lobby `lobby_type` and add to the lobby'''
        self.last_active = dt.datetime.now()
        m = LOBBY_REGISTRY[self.lobby_type](user=user, send=send, **kwargs)
        self.add_member(m)
        return m
    
    def add_member(self, member: T):
        """Adds a member to the lobby."""
        member.lobby_id = self.id
        self.members[member.uid] = member

    def get_member(self, uid: str) -> Optional[T]:
        self.last_active = dt.datetime.now()
        return self.members.get(uid)
    
    def lock(self): self.locked = True
    def unlock(self): self.locked = False

    @fc.delegates(create_member)
    def get_or_create_member(self, user: User, **kwargs) -> T:
        '''get member from the lobby or create a new with `create_member`'''
        self.last_active = dt.datetime.now()
        m = self.members.get(user.uid)
        if not m: m = self.create_member(user, **kwargs)
        return m

    def convert[T: LobbyMember](self: 'Lobby', lobby_type: str = LobbyMember._lobby_type) -> 'Lobby[T]':
        """Converts the lobby and its members to a different type."""
        if self.lobby_type == lobby_type: return self
        self.lobby_type = lobby_type
        mtype = LOBBY_REGISTRY[self.lobby_type]
        for k in self.members.keys(): self.members[k] = mtype.convert(self.members[k])
        self.host = mtype.convert(self.host)
        return self

In [None]:
# | export

MEMBER_MANAGER_REGISTRY = {}


def register_lobby_member_manager[T: DataManager](manager: T, entity_cls: type) -> T:
    '''Register a manager for an entity class'''
    MEMBER_MANAGER_REGISTRY[entity_cls] = manager
    return manager


class LobbyManager(DataManager[Lobby]):
    '''Class to manage lobbies'''
    memm = MEMBER_MANAGER_REGISTRY

    def _set_tables(self):
        self.lobbies: fl.Table = self.db.t.lobbies.create(**Lobby.columns(), pk='id',
                                                          transform=True, if_not_exists=True)
        return self.lobbies

    def update(self, lobby: Lobby[LobbyMember]):
        """Updates a lobby and its members in the database."""
        self.memm[LOBBY_REGISTRY[lobby.lobby_type]].upsert_all(lobby.members.values())
        return super().update(lobby)

    def get(self, id: str) -> Lobby[LobbyMember]:
        """Retrieves a lobby and its members from the database."""
        if id not in self.lobbies: return
        lobby = Lobby.from_dict(self.lobbies.get(id))
        lobby.members = {m.user_uid: m for m in self.memm[LOBBY_REGISTRY[lobby.lobby_type]].get_all(id)}
        hosts = [m for m in lobby.members.values() if m.is_host]
        if hosts: lobby.host = hosts[0]
        return lobby

    def ids(self) -> list[str]: return [el['id'] for el in self.lobbies(select='id', as_cls=False)]

In [None]:
# | export
class MemberManager(DataManager[LobbyMember]):
    '''Class to manage lobby members'''

    def __init__(self, user_manager: UserManager):
        self.um, self.users = user_manager, user_manager.users
        super().__init__(self.um.db)

    def _set_tables(self):
        self.members: fl.Table = self.db.t.members.create(**LobbyMember.columns(), pk='id',
                                                          foreign_keys=[('user_uid', 'user', 'uid'),
                                                                        ('lobby_id', 'lobbies', 'id'),],
                                                          transform=True, if_not_exists=True)
        return self.members

    def update(self, member: LobbyMember):
        """Updates a member and their user data in the database."""
        self.um.update(member.user)
        return super().update(member)

    def get_all(self, lobby_id: str) -> list[LobbyMember]:
        """Gets all members for a given lobby ID from the database."""
        cols = self.members.c
        qry = f"""select {mk_aliases(LobbyMember, self.members)},
                  {mk_aliases(User, self.users)}
                  from {self.members} join {self.users}
                  on {cols.user_uid} = {self.users.c.uid} where {cols.lobby_id} = ?"""
        return list(map(LobbyMember.from_dict, self.db.q(qry, [lobby_id])))


In [None]:
db = init_db()
um = UserManager(db)
lm = LobbyManager(db)
register_lobby_member_manager(MemberManager(um), LobbyMember)
l = lm.insert(Lobby())
l.create_member(um.create())
m = l.create_member(um.create())
l.set_host(m)
lm.update(l)
lm.get(l.id)

Lobby(id='rcg2l', lobby_type='basic', host=LobbyMember(user=User(uid='glowh', name='null', filename=None), user_uid='glowh', lobby_id='rcg2l', is_player=0, is_host_=1, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 564567), score=0, id='bcabvbl9', send=None, ws=None), members={'vn8m1': LobbyMember(user=User(uid='vn8m1', name='null', filename=None), user_uid='vn8m1', lobby_id='rcg2l', is_player=0, is_host_=0, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 564567), score=0, id='9if8axae', send=None, ws=None), 'glowh': LobbyMember(user=User(uid='glowh', name='null', filename=None), user_uid='glowh', lobby_id='rcg2l', is_player=0, is_host_=1, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 564567), score=0, id='bcabvbl9', send=None, ws=None)}, last_active=datetime.datetime(2025, 1, 19, 14, 53, 43, 564567))

In [None]:

# | export

class LobbyService:
    """Manages lobby creation, retrieval, and lifecycle."""
    lobby_lifetime = dt.timedelta(hours=5)
    lobby_limit = 50

    def __init__(self, lobby_manager: LobbyManager):
        self.lm = lobby_manager
        self.lobbies: dict[str, Lobby] = {}

    def __repr__(self): return f'{self.__class__.__name__}(active_lobbies={len(self.lobbies)})'

    def create_lobby[T: LobbyMember](self, u: User, lobby_id: Optional[str] = None, lobby_type: str = LobbyMember._lobby_type) -> Lobby[T]:
        """Creates a new lobby and sets the user as the host."""
        lobby_id = lobby_id or random_id()
        ids = list(self.lobbies) + self.lm.ids()
        while lobby_id in ids: lobby_id = random_id()
        lobby = Lobby[LOBBY_REGISTRY[lobby_type]](lobby_id, lobby_type=lobby_type)
        lobby.set_host(lobby.create_member(u))
        self.lobbies[lobby_id] = lobby
        self.lm.insert(lobby)
        return lobby

    def get_lobby(self, id: Optional[str] = None) -> Optional[Lobby]:
        """Gets a lobby from cache or the database."""
        lobby = self.lobbies.get(id)
        if lobby: return lobby
        lobby = self.lm.get(id)
        if lobby: self.lobbies[id] = lobby
        return lobby

    def delete_lobby(self, id: str):
        """Deletes a lobby from cache and the database."""
        self.lobbies.pop(id)
        self.lm.delete(id)

    def get_or_create[T: LobbyMember](self, u: User, id: Optional[str] = None,
                                      member_type: type[T] = LobbyMember) -> tuple[Lobby[T], bool]:
        '''Returns the lobby if it exists, otherwise creates one if id was valid. Returns (lobby, created)'''
        if not id or not id.isascii(): raise HTTPException(400, 'Invalid lobby id, must be ascii')
        ltype = dict_inverse(LOBBY_REGISTRY)[member_type]
        if lobby := self.get_lobby(id): return lobby.convert(ltype), False
        if len(self.lobbies) >= self.lobby_limit: raise HTTPException(
            400, 'Too many lobbies, wait until some are finished')
        return self.create_lobby(u, id, ltype), True

    def sync_active_lobbies_user(self, u: User):
        """Synchronizes user information across all active lobbies they are in."""
        for l in self.lobbies.values(): 
            if u.uid in l.members: l.members[u.uid].update_user(u)

    def update(self, lobby: Lobby): self.lm.update(lobby)

In [None]:
#| export
DI.register_services([LobbyManager, MemberManager, LobbyService])

In [None]:
ls = LobbyService(lm)

In [None]:
ls.create_lobby(um.create(), 1)
ls.create_lobby(um.create(), 2)
ls.lobbies

{1: Lobby(id=1, lobby_type='basic', host=LobbyMember(user=User(uid='l4do2', name='null', filename=UNSET), user_uid='l4do2', lobby_id=1, is_player=False, is_host_=True, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617), score=0, id='c70ta8qt', send=None, ws=None), members={'l4do2': LobbyMember(user=User(uid='l4do2', name='null', filename=UNSET), user_uid='l4do2', lobby_id=1, is_player=False, is_host_=True, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617), score=0, id='c70ta8qt', send=None, ws=None)}, last_active=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617)),
 2: Lobby(id=2, lobby_type='basic', host=LobbyMember(user=User(uid='xfhpp', name='null', filename=UNSET), user_uid='xfhpp', lobby_id=2, is_player=False, is_host_=True, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617), score=0, id='v2id0dg3', send=None, ws=None), members={'xfhpp': LobbyMember(user=User(uid='xfhpp', name='null', filename=UNSET), user_uid='xfhpp', lobby_id=2, is_player=False, is

In [None]:
ls.delete_lobby(1)
ls.lobbies

{2: Lobby(id=2, lobby_type='basic', host=LobbyMember(user=User(uid='xfhpp', name='null', filename=UNSET), user_uid='xfhpp', lobby_id=2, is_player=False, is_host_=True, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617), score=0, id='v2id0dg3', send=None, ws=None), members={'xfhpp': LobbyMember(user=User(uid='xfhpp', name='null', filename=UNSET), user_uid='xfhpp', lobby_id=2, is_player=False, is_host_=True, joined_at=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617), score=0, id='v2id0dg3', send=None, ws=None)}, last_active=datetime.datetime(2025, 1, 19, 14, 53, 43, 623617))}

In [None]:
#| export
def is_player(u: LobbyMember|User): return isinstance(u, LobbyMember) and u.is_player

In [None]:
#| export
def lobby_beforeware(service: LobbyService, skip=None):
    '''Makes sure that request always contains valid lobby'''
    def before(req: Request):
        if 'session' not in req.scope: return
        path_lobby_id = req.path_params.get('lobby_id')
        if path_lobby_id: req.session['lobby_id'] = path_lobby_id
        lobby: Lobby = service.get_lobby(req.session.get("lobby_id"))
        if lobby: req.state.lobby = lobby
        
    return Beforeware(before, skip)