In [None]:
import uuid
import time
from datetime import datetime
from cassandra.util import uuid_from_time
from cassandra.query import PreparedStatement, SimpleStatement
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.management import sync_table
import random
from faker import Faker

fake = Faker()
fake.name(), fake.text()


In [None]:

# # ==============================================================================
# class Account(Model):
#     id = columns.Integer(primary_key=True)
#     name = columns.Text(1, 128)
# 
# 
# # ==============================================================================
# class Group(Model):
#     id = columns.Integer(primary_key=True)
# 
#     name = columns.Text(1, 128)
#     time_created = columns.DateTime(default=datetime.utcnow)


# # ==============================================================================
# class GroupAlternative(Model):
#     id = columns.Integer(primary_key=True)
#     time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
# 
#     name = columns.Text(1, 128)
#     # for showing name in group list
#     last_message = columns.Text(required=False)
#     last_message_time = columns.DateTime(required=False)
#     last_account_id = columns.Integer(required=False)
#     last_account_name = columns.Text(required=False)


# # ==============================================================================
# class Participant(Model):
#     account_id = columns.Integer(primary_key=True)
#     group_id = columns.Integer(primary_key=True, partition_key=True)
#     last_updated = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
# 
#     time_created = columns.DateTime(default=datetime.utcnow)


# # ==============================================================================
# class ParticipantByAccount(Model):
#     account_id = columns.Integer(primary_key=True)
#     group_id = columns.Integer(primary_key=True, clustering_order="DESC")
#     last_updated = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)


# # ==============================================================================
# class ParticipantByAccountAlternative(Model):
#     account_id = columns.Integer(primary_key=True)
#     last_updated = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
#     group_id = columns.Integer()
# 

# # ==============================================================================
# class ParticipantByGroup(Model):
#     group_id = columns.Integer(primary_key=True)
#     account_id = columns.Integer(primary_key=True, clustering_order="DESC")
#     time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
# 
# 
# # ==============================================================================
# class MessageByGroup(Model):
#     group_id = columns.Integer(primary_key=True)
#     time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
# 
#     account_id = columns.Integer()
#     # for showing name in notification
#     group_name = columns.Text(0, 128)
#     account_name = columns.Text(0, 128)
#     content = columns.Text(0, 512)
# 
# 
# # ==============================================================================
# class MessageByAccount(Model):
#     account_id = columns.Integer(primary_key=True)
#     time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
# 
#     group_id = columns.Integer()
#     # for showing name in notification
#     group_name = columns.Text(0, 128)
#     account_name = columns.Text(0, 128)
#     content = columns.Text(0, 512)


# ==============================================================================
# def sync_tables():
#     sync_table(Testing)

# sync_table(Account)
# sync_table(Group)
# sync_table(ParticipantByAccount)
# # sync_table(ParticipantByAccountAlternative)
# sync_table(ParticipantByGroup)
# sync_table(MessageByGroup)
# sync_table(MessageByAccount)


In [None]:
class TestUser(Model):
    id = columns.Integer(primary_key=True)
    name = columns.Text(1, 128)
    time_created = columns.DateTime(default=datetime.utcnow)


class TestGroup(Model):
    id = columns.Integer(primary_key=True)
    name = columns.Text(1, 128)
    time_created = columns.DateTime(default=datetime.utcnow)


class TestParByGroup(Model):
    user_id = columns.Integer(primary_key=True)
    group_id = columns.Integer(primary_key=True, clustering_order="ASC")
    time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)


class TestParByTime(Model):
    user_id = columns.Integer(primary_key=True)
    time_created = columns.DateTime(primary_key=True, clustering_order="DESC", default=datetime.utcnow)
    group_id = columns.Integer(primary_key=True, clustering_order="ASC")


def sync_tables():
    sync_table(TestUser)
    sync_table(TestGroup)
    sync_table(TestParByGroup)
    sync_table(TestParByTime)


Sort by time: query all participants and remove duplicate groups
Sort by groupid: query all participants group by groupid, sort the time

In [None]:
from cassandra.query import tuple_factory
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT, ConsistencyLevel
from cassandra.policies import WhiteListRoundRobinPolicy
from cassandra.cqlengine import connection

profile = ExecutionProfile(
    load_balancing_policy=WhiteListRoundRobinPolicy(['127.0.0.1']),
    consistency_level=ConsistencyLevel.LOCAL_QUORUM,
    serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
    request_timeout=15,
    row_factory=tuple_factory
)
connection.setup(['127.0.0.1'], "testing", execution_profiles={EXEC_PROFILE_DEFAULT: profile},
                 port=9042)
connection.execute(f"DROP KEYSPACE IF EXISTS testing")
connection.execute(
    f"CREATE KEYSPACE IF NOT EXISTS testing WITH replication = " + f"{{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
connection.execute(f"USE testing")
sync_tables()

In [None]:
Testing.create(id=1)
Testing.create(id=2)
Testing.create(id=3)


In [None]:
Testing.objects.filter(id=4).first() is None

In [None]:
session = connection.get_session()
rows = session.execute("SELECT * FROM system_schema.tables WHERE keyspace_name = 'testing'")
for row in rows:
    print(row['table_name'])

NEW:

In [None]:
for i in range(1, 11):
    TestUser.create(id=i, name="User " + fake.name())

for i in range(11, 41):
    TestGroup.create(id=i, name="Group " + fake.name())

# for i in range(150):
#     grpid = random.randint(11, 30)
#     accid = random.randint(1, 10)
#     ParticipantByGroup.create(group_id=grpid, account_id=accid)
#     ParticipantByAccount.create(group_id=grpid, account_id=accid)
#     ParticipantByAccount2.create(group_id=grpid, account_id=accid)

In [None]:
row = session.execute("select * from test_user where id=1").one()
row

In [None]:
for i in TestUser.objects.all():
    print(i, i.name)

for i in TestGroup.objects.all():
    print(i, i.name)

In [None]:
for row in TestParByGroup.objects.filter(user_id=1).all():
    print(row)

In [None]:
for i in range(500):
    grpid = random.randint(11, 40)
for j in range(random.randint(1, 8)):
    TestParByTime.create(group_id=grpid, user_id=random.randint(1, 10))
TestParByGroup.create(group_id=grpid, user_id=random.randint(1, 10))

In [None]:
# set fetch size
fetch_size = 100

# It will print first 100 records
next_page_available = True
paging_state = None
data_count = 0

user_lookup_stmt = SimpleStatement("select * from test_par_by_time where user_id=? and time_created<?", fetch_size=fetch_size)
user_lookup_stmt.fetch_size = fetch_size
group_lookup_stmt = SimpleStatement("select * from test_group where id=?")

group_map = {}
last_time = datetime.utcnow()
while len(group_map) < 15:
    print("COUNTED")
    rows = session.execute(user_lookup_stmt, [1, last_time], paging_state=paging_state)
    paging_state = rows.paging_state
    for row in rows:
        if row["group_id"] not in group_map:
            group = session.execute(group_lookup_stmt, [row["group_id"]]).one()
            group_map[row["group_id"]] = group["name"]

    print(rows[-1]["time_created"])



In [None]:
group_map

In [None]:
for row in session.execute("select * from test_par_by_time where user_id=1 limit 20"):
    print(row)


In [None]:
def setup_participant_message(accid, grpid, firstjoin=False, message=None):
    accinfo = TestUser.objects.filter(id=accid).first()
    groupinfo = TestGroup.objects.filter(id=grpid).first()

    now = datetime.utcnow()

    # if it's their first time, create a participant in group list
    if firstjoin:
        TestUserByGroup.create(group_id=grpid, account_id=accid, time_created=now)
        message = f"{accinfo.name} has joined group."
    else:
        existed = TestUserByGroup.objects.filter(group_id=grpid, account_id=accid).first()
        if existed is None:
            print("Account not in group. Cancel this later")
            return

    # add new updated activity
    grouppars = TestUserByGroup.objects.filter(group_id=grpid).all()
    for participant in grouppars:
        ParticipantByAccount.create(group_id=grpid, account_id=participant.account_id, last_updated=now)

    # ParticipantByAccountAlternative.create(group_id=grpid, account_id=accid, last_updated=now)

    # add new message
    MessageByGroup.create(group_id=grpid, time_created=now, account_id=accid, account_name=accinfo.name,
                          group_name=groupinfo.name, content=message)
    MessageByAccount.create(account_id=accid, group_id=grpid, time_created=now, account_name=accinfo.name,
                            group_name=groupinfo.name, content=message)

    # update latest group message info
    # groupinfo.update(last_message=message, last_message_time=now, last_account_id=accid, last_account_name=accinfo.name)

In [None]:
setup_participant_message(1, 11, firstjoin=True)
setup_participant_message(1, 17, firstjoin=True)
setup_participant_message(1, 12, firstjoin=True)
setup_participant_message(1, 21, firstjoin=True)
setup_participant_message(1, 13, firstjoin=True)
setup_participant_message(1, 18, firstjoin=True)


In [None]:
setup_participant_message(2, 17, firstjoin=True)
setup_participant_message(2, 12, firstjoin=True)
setup_participant_message(2, 21, firstjoin=True)
setup_participant_message(2, 13, firstjoin=True)

In [None]:
setup_participant_message(3, 11, firstjoin=True)

In [None]:
setup_participant_message(1, 12, message="Hello everyone")

In [None]:
setup_participant_message(2, 12, message="Hi man")

In [None]:
# All messages by account

for i in MessageByAccount.objects.all():
    print(i.time_created, ": Account", i.account_id, "send to group", i.group_id, ":", i.content)

In [None]:
# Get all joined groups and most recent message

participant_query = session.execute(
    'select last_updated, group_id from participant_by_account where account_id=1 group by group_id')
participant_query = sorted(participant_query, key=lambda row: row['last_updated'], reverse=True)

for row in participant_query:
    group_query = session.execute(f"select * from group where id={row['group_id']}").one()
    group_last_message = session.execute(f"select * from message_by_group where group_id={row['group_id']}").one()
    print(group_query['id'], '-', group_query['name'], "-", group_last_message['account_name'], '-',
          group_last_message['content'])


In [None]:
# get all messages of a group

group_messages = session.execute(f"select * from message_by_group where group_id=12").all()
for row in group_messages:
    print(row['account_name'], '-', row['content'])

In [None]:
participant_query = session.execute(
    'select group_id from participant_by_account_alternative where account_id=1')
for row in participant_query:
    print(row)

In [None]:
# check list participant by group
for i in ParticipantByGroup.objects.filter(group_id=12).all():
    print(i)

OLD:

In [None]:
new_group = ChatGroup.create(name="temporary")
new_group

In [None]:
new_group = ChatGroup.create(name="temporary 2")
new_group

In [None]:
new_participant_1 = ChatParticipant.create(message_notify=False, id_account=1,
                                           id_chatgroup='015edba8-22ea-4d4d-85d0-530c5400f9b8')

In [None]:
new_participant_2 = ChatParticipant.create(message_notify=False, id_account=2,
                                           id_chatgroup='015edba8-22ea-4d4d-85d0-530c5400f9b8')

In [None]:
new_participant_3 = ChatParticipant.create(message_notify=False, id_account=2,
                                           id_chatgroup='015edba8-22ea-4d4d-85d0-530c5400f9b8')

In [None]:
list_group_participant = ChatParticipant.objects.filter(id_account=2)

In [None]:
for i in list_group_participant.all():
    print(i)

In [None]:
import time, asyncio


def waitfunc(t: int, name):
    time.sleep(t)
    print(name)


async def asyncwaitfunc(t: int, name):
    await asyncio.sleep(t)
    print(name)


async def testconcurrent():
    await asyncwaitfunc(1, "hello")
    await asyncwaitfunc(2, "world")
    waitfunc(0.5, "test")


async def testconcurrent2():
    task1 = asyncio.create_task(asyncwaitfunc(1, "hello"))
    task2 = asyncio.create_task(asyncwaitfunc(2, "world"))
    await task1
    await task2
    waitfunc(0.5, "test")


In [None]:
await testconcurrent()

In [None]:
await testconcurrent2()

In [None]:
# SQLAlchemy 2.0 Migrations:
# docs.sqlalchemy.org/en/20/changelog/whatsnew_20.html#step-three-apply-exact-python-types-as-needed-using-orm-mapped
import random, string
from datetime import datetime
from typing import Annotated, Optional, List
from sqlalchemy import ForeignKey, types, Table, Column, Integer
from sqlalchemy.orm import Mapped, mapped_column, relationship, DeclarativeBase

# declare datatypes
str16 = Annotated[str, None]
str64 = Annotated[str, None]
str128 = Annotated[str, None]
str256 = Annotated[str, None]
str_random = Annotated[str, mapped_column(default=''.join(random.choices(string.ascii_letters + string.digits, k=16)))]

smallint = Annotated[int, None]
int_identifier = Annotated[int, mapped_column(default=random.randint(1, 9999))]
int_PK = Annotated[int, mapped_column(primary_key=True)]

timestamp = Annotated[datetime, mapped_column(default=datetime.utcnow)]


class Base(DeclarativeBase):
    type_annotation_map = {
        str16: types.VARCHAR(16),
        str64: types.VARCHAR(64),
        str128: types.VARCHAR(128),
        str256: types.VARCHAR(256),
        str_random: types.VARCHAR(64),
        smallint: types.SMALLINT,
        int_identifier: types.SMALLINT,
        timestamp: types.TIMESTAMP,
    }


Friend = Table(
    "friend", Base.metadata,
    Column("accountinfo_id_user", Integer, ForeignKey("accountinfo.id"), primary_key=True),
    Column("accountinfo_id_friend", Integer, ForeignKey("accountinfo.id"), primary_key=True)
)


# Friend = Table(
#     "friend", Base.metadata,
#     Column("accountinfo_id_user", ForeignKey("accountinfo.id"), primary_key=True),
#     Column("accountinfo_id_friend", ForeignKey("accountinfo.id"), primary_key=True)
# )


# ==============================================================================
class Account(Base):
    __tablename__ = 'account'
    id: Mapped[int_PK]
    username: Mapped[str128]  # Mapped without Optional[] is set to nullable = False
    password: Mapped[str128]

    accountinfo_id: Mapped[Optional[int]] = mapped_column(ForeignKey("accountinfo.id"))
    accountinfo_rel: Mapped[Optional["Accountinfo"]] = relationship(back_populates="account_rel",
                                                                    cascade='save-update, merge, delete')


# ==============================================================================
class Accountinfo(Base):
    __tablename__ = 'accountinfo'
    id: Mapped[int_PK]
    name: Mapped[str_random]

    # Account reference
    account_rel: Mapped[Optional["Account"]] = relationship(back_populates='accountinfo_rel')

    friend_rel: Mapped[List["Accountinfo"]] = relationship("Accountinfo", secondary=Friend,
                                                           primaryjoin="friend.c.accountinfo_id_user == accountinfo.c.id",
                                                           secondaryjoin="friend.c.accountinfo_id_friend == accountinfo.c.id")

# friend_rel_right: Mapped[List["Accountinfo"]] = relationship("Accountinfo", secondary=Friend,
#                                                              primaryjoin=id == Friend.c.accountinfo_id_friend,
#                                                              secondaryjoin=id == Friend.c.accountinfo_id_user,
#                                                              back_populates="friend_rel_left")

# ==============================================================================
# class Friend(Base):
#     __tablename__ = 'friend'
#     accountinfo_id_user: mapped_column(ForeignKey("accountinfo.id"), primary_key=True)
#     accountinfo_id_friend: mapped_column(ForeignKey("accountinfo.id"), primary_key=True)

# this relationship is viewonly and selects across the union of all

# friends
# Friend_union = (select(Friend.c.accountinfo_id_user, Friend.c.accountinfo_id_friend)
#                 .union(select(Friend.c.accountinfo_id_friend, Friend.c.accountinfo_id_user)
#                        ).alias())

# Accountinfo.all_friends = relationship('Accountinfo',
#                                        secondary=Friend_union,
#                                        primaryjoin=Accountinfo.id == Friend_union.c.accountinfo_id_user,
#                                        secondaryjoin=Accountinfo.id == Friend_union.c.accountinfo_id_friend,
#                                        viewonly=True)


In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Engine = create_engine(url=f"postgresql+psycopg2://testuser1:testuser1pwd@localhost:18000/testdb1?sslmode=disable",
                       echo=True)
Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)
PostgresSession = sessionmaker(bind=Engine, autoflush=True, autocommit=False)

In [None]:
session = PostgresSession()
session.begin()

In [None]:
acc1, acc2, acc3 = Accountinfo(name="acc1"), Accountinfo(name="acc2"), Accountinfo(name="acc3")

In [None]:
session.add_all([acc1, acc2, acc3])

In [None]:
session.flush()

In [None]:
acc1.friend_rel

In [None]:
import random
import sqlalchemy
from typing import Annotated
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import relationship
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import backref
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from pprint import pprint

str_random = Annotated[str, mapped_column(default=''.join(random.choices(string.ascii_letters + string.digits, k=16)))]
int_PK = Annotated[int, mapped_column(primary_key=True)]


class Base(DeclarativeBase):
    type_annotation_map = {
        str_random: types.VARCHAR(64)
    }


Engine = create_engine(url=f"postgresql+psycopg2://testuser1:testuser1pwd@localhost:18000/testdb1?sslmode=disable",
                       echo=True)
Session = sessionmaker(bind=Engine)

friendship = Table(
    'friendships', Base.metadata,
    Column('user_id', Integer, ForeignKey('users.id'), index=True),
    Column('friend_id', Integer, ForeignKey('users.id')),
    UniqueConstraint('user_id', 'friend_id', name='unique_friendships'))


class User(Base):
    __tablename__ = 'users'

    id: Mapped[int_PK]
    name: Mapped[str_random]

    friends = relationship('User',
                           secondary=friendship,
                           primaryjoin=id == friendship.c.user_id,
                           secondaryjoin=id == friendship.c.friend_id)

    def befriend(self, friend):
        if friend not in self.friends:
            self.friends.append(friend)
            friend.friends.append(self)

    def unfriend(self, friend):
        if friend in self.friends:
            self.friends.remove(friend)
            friend.friends.remove(self)

    def __repr__(self):
        return '' % self.name

In [None]:
a = User(name='a')
b = User(name='b')
c = User(name='c')
d = User(name='d')
session.add_all([a, b, c, d])
for user in session.query(User):
    print(user)
    pprint(user.friends)

In [None]:
from pydantic import BaseModel, Field, constr
from enum import Enum
import uuid

In [None]:
class MessagePOST(BaseModel):
    group_id: uuid.UUID
    group_name: str | None
    content: constr(max_length=256)
    type: str


class TestConnectionMessage(BaseModel):
    message_from: str
    action: str
    message_content: MessagePOST


In [None]:
TestConnectionMessage.json_schema()

In [None]:
from enum import Enum
import uuid
from pydantic import BaseModel


class ConnectionMessageType(str, Enum):
    message = "message"
    notification = "notification"
    response = "response"
    help = "help"


class ConnectionMessageAction(str, Enum):
    new = "new"
    delete = "delete"
    pin = "pin"


class ConnectionMessageStatus(str, Enum):
    success = "SUCCESS"
    error = "ERROR"
    other = "OTHER"


class ConnectionMessage(BaseModel):
    type: ConnectionMessageType
    status: ConnectionMessageStatus | None = None
    action: ConnectionMessageAction | None = None
    to_group: uuid.UUID | None = None
    data: dict | str


new_msg = ConnectionMessage(type=ConnectionMessageType.response,
                            status=ConnectionMessageStatus.error,
                            data="hello")

new_msg.model_dump()