In [None]:
import os, getpass
from os import getenv
from langchain_openai import AzureChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import ConfigurableFieldSpec
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

In [None]:
if not getenv("AZURE_OPENAI_API_KEY"):
  os.environ["AZURE_OPENAI_API_KEY"] = getpass.getpass("Enter API key for Azure: ")
endpoint = getenv("AZURE_OPENAI_ENDPOINT")
# endpoint = getenv("AZURE_OPENAI_ENDPOINT").format(getenv("AZURE_OPENAI_DEPLOYMENT_NAME"),  getenv("AZURE_OPENAI_API_VERSION"))

In [None]:
# prompt = ChatPromptTemplate.from_template("Tell me a joke about {word} in 100 words.")
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful translator. Translate the user sentence to French.",
        ),
        ("human", "{input}"),
    ]
)
parser = StrOutputParser()
llm = AzureChatOpenAI(
    azure_endpoint=getenv("AZURE_OPENAI_ENDPOINT"),
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.01,
    stream_usage=True
)

chain = prompt | llm

In [None]:
messages = [  
(  
"system",  
"You are a helpful translator. Translate the user sentence to French.",  
),  
("human", "I love programming."),  
] 

In [None]:
await llm.ainvoke(messages)

In [None]:
# get the full response at once
if chain:
    response = await chain.ainvoke({"input": "story on earth"})
    print(response)

In [None]:
# stream the response
full_message = None

async for chunk in chain.astream({"input": "on earth"}):
    # print(chunk.content, end="", flush=True)
    if full_message is None:
        full_message = chunk
    else:
        full_message += chunk  # Uses the overloaded __add__ method

# After streaming is complete, full_message contains the aggregated content and metadata
print(full_message)


In [None]:
print(type(full_message))

**Chat history in memory with proxy history upto last 2 messages**

In [None]:
from typing import Sequence
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field

class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    messages: list[BaseMessage] = Field(default_factory=list)

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        self.messages.extend(messages)

    def clear(self) -> None:
        self.messages = []

class ProxyHistory(BaseChatMessageHistory):
    def __init__(self, full_history: InMemoryHistory, limit: int = 2):
        self._full_history = full_history
        self._limit = limit

    @property
    def messages(self) -> Sequence[BaseMessage]: # type: ignore
        # Only return the last N messages
        return self._full_history.messages[-self._limit:]

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        # Delegate writes to the original history
        self._full_history.add_messages(messages)

    def clear(self) -> None:
        self._full_history.clear()


store = {}
session_id = "student1234"

def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryHistory()
    return ProxyHistory(store[session_id], limit=2)



In [72]:
from langchain_core.tools import tool
import datetime
@tool
def current_time(input: str) -> str:
    """
    Get the current date and time
    """
    return "The current time is: 2001-01-01 00:00:00"


In [None]:
# prompt = ChatPromptTemplate.from_template("Tell me a joke about {word} in 100 words.")
prompt = ChatPromptTemplate.from_messages([
    ("system", os.environ["SYSTEM_PROMPT"]),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}"),
])
parser = StrOutputParser()
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.01,
    stream_usage=True
)

chain = prompt | llm
chain_with_history = RunnableWithMessageHistory(
    chain, # type: ignore
    get_session_history=get_by_session_id,
    input_messages_key="input",
    history_messages_key="history",
)

In [None]:
async for chunk in chain_with_history.astream({"input": "wow"},
    config={"configurable": {"session_id": "base"}}):
    print(chunk, end="", flush=True)

In [None]:
print(store)

**Chat history with Scylla DB**

In [None]:
# Create keyspace and tables
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

# Connect to ScyllaDB
cluster = Cluster(['127.0.0.1'])  # Replace with your Scylla host IP if different
session = cluster.connect()

# Create keyspace (if not exists)
KEYSPACE = "eva"
session.execute(f"""
    CREATE KEYSPACE IF NOT EXISTS {KEYSPACE}
    WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
""")

# Set keyspace
session.set_keyspace(KEYSPACE)

# Create tables
table_queries = [
    """
    CREATE TABLE IF NOT EXISTS chathistory (
        userid uuid,
        chatid uuid,
        visible boolean,
        chathistoryjson blob,
        chattitle text,
        createdon timestamp,
        nettokenconsumption int,
        PRIMARY KEY (userid, chatid)
    );
    """,
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS chathistory_by_visible AS
    SELECT userid, chatid, chathistoryjson, chattitle, createdon, nettokenconsumption
    FROM chathistory
    WHERE visible IS NOT NULL AND userid IS NOT NULL AND chatid IS NOT NULL
    PRIMARY KEY (visible, userid, chatid);
    """,
    """
    CREATE TABLE IF NOT EXISTS availablemodels (
        deploymentid uuid,
        isactive boolean,
        apikey text,
        deploymentname text,
        endpoint text,
        modelname text,
        modeltype text,
        modelversion text,
        provider text,
        PRIMARY KEY ((deploymentid), isactive)
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS usersubscriptions (
        userid uuid,
        modelid uuid,
        PRIMARY KEY (userid, modelid)
    );
    """,
    """
    CREATE TABLE IF NOT EXISTS users (
        email text,
        partner text,
        userid uuid,
        firstname text,
        lastname text,
        role text,
        PRIMARY KEY ((email, partner), userid)
    );
    """
]

for query in table_queries:
    session.execute(query)

print("Keyspace and tables created successfully.")
rows = session.execute(f"SELECT table_name FROM system_schema.tables WHERE keyspace_name='{KEYSPACE}';")
for row in rows:
    print("Table:", row.table_name)


In [None]:
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage, FunctionMessage, trim_messages, AIMessageChunk
from pydantic import BaseModel, Field
from copy import deepcopy

from typing import Sequence

class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    messages: list[BaseMessage] = Field(default_factory=list)

    def add_messages(self, messages: Sequence[BaseMessage]) -> None:
        self.messages.extend(messages)

    def clear(self) -> None:
        self.messages = []
    
    def edit_message_at_index(self, index: int, new_message: BaseMessage) -> None:
        if 0 <= index < len(self.messages):
            self.messages[index] = new_message
            # Truncate all messages after the edited one
            self.messages = self.messages[:index + 1]
        else:
            raise IndexError("Message index out of range.")

store = {}
branch = "main"


def get_chat_history_by_branch(branch: str) -> BaseChatMessageHistory:
    if branch not in store:
        store[branch] = InMemoryHistory()
    return store[branch]


def append_message_to_branch(message: BaseMessage, branch: str) -> None:
    if branch not in store:
        store[branch] = InMemoryHistory()
    store[branch].messages.append(message)

def create_branch_from(parent_branch: str, new_branch: str, edit_index: int, new_message: BaseMessage):
    parent_history = store.get(parent_branch)
    if not parent_history:
        raise ValueError(f"Parent branch '{parent_branch}' does not exist.")

    # Clone messages up to the edit point
    new_history = InMemoryHistory(messages=deepcopy(parent_history.messages[:edit_index]))
    # Add the new edited message
    new_history.add_messages([new_message])
    # Store the new branch
    store[new_branch] = new_history


In [None]:

parser = StrOutputParser()
llm = AzureChatOpenAI(
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT_NAME"],
    api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    temperature=0.01,
    stream_usage=True
)

In [None]:
async def process_input(input):
    prompt = ChatPromptTemplate.from_messages([
        ("system", "You are a helpful bot"),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{input}"),
    ])
    chain = prompt | llm
    chain_with_history = RunnableWithMessageHistory(
        chain,
        get_session_history=get_chat_history_by_branch,
        input_messages_key="input",
        history_messages_key="history",
        history_factory_config=[
            ConfigurableFieldSpec(
                id="branch",
                annotation=str,
                name="Chat Branch Name",
                description="Unique name for the chat branch",
                default="",
                is_shared=True,
            ),
        ],
    )
    # ai_message =""
    async for chunk in chain_with_history.astream({"input": input},
                                                  config={"configurable": {"branch": branch}}):
        print(chunk.content, end="", flush=True)
        # ai_message += chunk
    # InMemoryHistory().add_messages([HumanMessage(content=input), AIMessage(content=ai_message)])


In [None]:
branch="part"

In [None]:
await process_input(HumanMessage(content="number 4 in french"))

In [None]:
print(store)

In [None]:
from langchain_core.messages import HumanMessage, AIMessage

# Create a new branch 'feature' by editing message at index 2 ('c') to 'g'
create_branch_from(
    parent_branch='main',
    new_branch='b',
    edit_index=0,
    new_message=HumanMessage(content="check")
)

# Add a new message 'j' to the 'feature' branch
store['b'].add_messages([AIMessage(content="j")])


**Cassandra to Postgres Migration Script**

In [None]:
from cassandra.cluster import Cluster
import psycopg2
import uuid
import sys

# Cassandra config
CASSANDRA_KEYSPACE = "eva"
CASSANDRA_CONTACT_POINTS = ['localhost']  # update as needed

# Postgres config
POSTGRES_CONN_PARAMS = {
    'host': 'localhost',
    'database': 'postgres',
    'user': 'dev_user',
    'password': 'dev_password',
    'port': 5432
}

def connect_cassandra():
    cluster = Cluster(CASSANDRA_CONTACT_POINTS)
    session = cluster.connect()
    session.set_keyspace(CASSANDRA_KEYSPACE)
    return session

def connect_postgres():
    conn = psycopg2.connect(**POSTGRES_CONN_PARAMS)
    conn.autocommit = True
    return conn

def migrate_users(cassandra_session, pg_conn):
    rows = cassandra_session.execute("SELECT * FROM users")
    cur = pg_conn.cursor()
    
    for row in rows:
        try:
            cur.execute("""
                INSERT INTO public.users (user_id, email, partner, first_name, last_name, role)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (email, partner) DO NOTHING;
            """, (
                row.userid,
                row.email,
                row.partner,
                row.firstname,
                row.lastname,
                row.role or 'user'
            ))
        except Exception as e:
            print(f"Error inserting user {row.email}: {e}")
    cur.close()
    print("✔ Migrated users")

def migrate_chathistory(cassandra_session, pg_conn):
    rows = cassandra_session.execute("SELECT * FROM chathistory")
    cur = pg_conn.cursor()

    for row in rows:
        try:
            cur.execute("""
                INSERT INTO public.chat_history (chat_id, user_id, visible, history_blob, chat_title, last_updated, token_count)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (chat_id) DO NOTHING;
            """, (
                row.chatid,
                row.userid,
                row.visible if row.visible is not None else True,
                bytes(row.chathistoryjson),
                row.chattitle,
                row.createdon,
                row.nettokenconsumption or 0
            ))
        except Exception as e:
            print(f"Error inserting chat {row.chatid}: {e}")
    cur.close()
    print("✔ Migrated chat history")

def migrate_availablemodels(cassandra_session, pg_conn):
    rows = cassandra_session.execute("SELECT * FROM availablemodels")
    cur = pg_conn.cursor()

    for row in rows:
        try:
            cur.execute("""
                INSERT INTO public.ai_models (
                    model_id, is_active, api_key, deployment_name, endpoint,
                    model_name, model_type, model_version, provider
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (deployment_name, model_version) DO NOTHING;
            """, (
                row.deploymentid,
                row.isactive if row.isactive is not None else True,
                row.apikey,
                row.deploymentname,
                row.endpoint,
                row.modelname,
                row.modeltype,
                row.modelversion,
                row.provider
            ))
        except Exception as e:
            print(f"Error inserting model {row.deploymentname}: {e}")
    cur.close()
    print("✔ Migrated models")

def migrate_usersubscriptions(cassandra_session, pg_conn):
    rows = cassandra_session.execute("SELECT * FROM usersubscriptions")
    cur = pg_conn.cursor()

    for row in rows:
        try:
            cur.execute("""
                INSERT INTO public.subscriptions (sub_id, user_id, model_id)
                VALUES (%s, %s, %s)
                ON CONFLICT (user_id, model_id) DO NOTHING;
            """, (
                uuid.uuid4(),  # generating a new sub_id
                row.userid,
                row.modelid
            ))
        except Exception as e:
            print(f"Error inserting subscription for user {row.userid}: {e}")
    cur.close()
    print("✔ Migrated subscriptions")

def main():
    cassandra_session = connect_cassandra()
    pg_conn = connect_postgres()
    import psycopg2.extras
    psycopg2.extras.register_uuid()
    try:
        migrate_users(cassandra_session, pg_conn)
        migrate_chathistory(cassandra_session, pg_conn)
        migrate_availablemodels(cassandra_session, pg_conn)
        migrate_usersubscriptions(cassandra_session, pg_conn)
    finally:
        pg_conn.close()
        cassandra_session.shutdown()

if __name__ == "__main__":
    main()


✔ Migrated users
✔ Migrated chat history
✔ Migrated models
✔ Migrated subscriptions
