# Storing Memory in a Postgres SQL DB

Langchain message history is now stored as an in-memory dict. Let's store it in a SQL database instead.

In [2]:
# First install postgres then start the service (MacOS)
!brew install postgresql
!brew services start postgresql

# To stop
# !brew services stop postgresql

[34m==>[0m [1mTapping homebrew/services[0m
Cloning into '/usr/local/Homebrew/Library/Taps/homebrew/homebrew-services'...
remote: Enumerating objects: 2969, done.[K
remote: Counting objects: 100% (430/430), done.[K
remote: Compressing objects: 100% (148/148), done.[K
remote: Total 2969 (delta 303), reused 333 (delta 280), pack-reused 2539[K
[KReceiving objects: 100% (2969/2969), 821.58 KiB | 7.54 MiB/s, done.
[KResolving deltas: 100% (1444/1444), done.
Tapped 1 command (48 files, 1010.4KB).
[34m==>[0m [1mSuccessfully started `postgresql@14` (label: homebrew.mxcl.postgresql@14)[0m


In [None]:
# For linux
!sudo apt update
!sudo apt install postgresql postgresql-contrib

# to start
# !sudo service postgresql start

# to stop
# !sudo service postgresql stop

In [None]:
!pip install psycopg2
# or
!poetry add psycopg2

In [5]:
# create role then database
!psql postgres

# Then run the following commands in the psql shell
# CREATE ROLE reco_admin WITH LOGIN PASSWORD 'averysecurepasswordthatyouwillneverguess';
# ALTER ROLE reco_admin CREATEDB;


# Exit the psql shell then login
# \q
# psql -d postgres -U reco_admin
# \du
# CREATE DATABASE reco WITH OWNER reco_admin ENCODING 'UTF8';
# \c reco
# \q

psql: error: connection to server on socket "/tmp/.s.PGSQL.5432" failed: FATAL:  database "michaelenghoekhor" does not exist


In [None]:
# To nuke the entire database
# DROP DATABASE reco;

In [7]:
# pip install
!pip install sqlalchemy
!pip install psycopg2-binary
!export LDFLAGS="-L/usr/local/lib"
!export CPPFLAGS="-I/usr/local/include"


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [1]:
import uuid
from sqlalchemy import Text, create_engine, Column, Integer, String, DateTime, ForeignKey, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.dialects.postgresql import UUID

import typing
from langchain_community.chat_message_histories import SQLChatMessageHistory
from langchain_community.chat_message_histories.sql import DefaultMessageConverter
from langchain_core.messages import BaseMessage, message_to_dict
from sqlalchemy import create_engine
import json
from pprint import pprint


Base = declarative_base()

class Patient(Base):
    __tablename__ = 'patients'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    first_name = Column(String(100), nullable=False)
    last_name = Column(String(100), nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
    email = Column(String(120), unique=True, nullable=False)
    password = Column(String(60), nullable=False)
    healthcare_provider_id = Column(Integer, ForeignKey('healthcare_providers.id'), nullable=True)

    healthcare_provider = relationship("HealthcareProvider", back_populates="patients")
    conversation_sessions = relationship("ConversationSession", back_populates="patient", uselist=True)

    def __repr__(self):
        return f"Patient(username='{self.username}', first_name='{self.first_name}', last_name='{self.last_name}', email='{self.email}', created_at='{self.created_at}', updated_at='{self.updated_at}')"

    def new_session(self, summary: str | None = None) -> "ConversationSession":
        return ConversationSession(patient_id=self.id, summary=summary)


class HealthcareProvider(Base):
    __tablename__ = 'healthcare_providers'

    id = Column(Integer, primary_key=True)
    first_name = Column(String(100), nullable=False)
    last_name = Column(String(100), nullable=False)
    description = Column(Text, nullable=True)
    email = Column(String(120), unique=True, nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

    patients = relationship("Patient", back_populates="healthcare_provider", uselist=True)

    def __repr__(self):
        return f"HealthcareProvider(first_name='{self.first_name}', last_name='{self.last_name}', email='{self.email}', created_at='{self.created_at}', updated_at='{self.updated_at}')"


class ConversationSession(Base):
    __tablename__ = 'conversation_sessions'

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    patient_id = Column(Integer, ForeignKey('patients.id'), nullable=False)
    created_at = Column(DateTime, server_default=func.now())
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
    summary = Column(Text, nullable=True)  # Summary of the session created by the summarization engine

    patient = relationship("Patient", back_populates="conversation_sessions", uselist=False)
    messages = relationship("Message", back_populates="session")

    def __repr__(self):
        return f"ConversationSession(patient_id='{self.patient_id}', created_at='{self.created_at}', updated_at='{self.updated_at}', summary='{self.summary}')"



class Message(Base):
    __tablename__ = 'message_store'

    id = Column(Integer, primary_key=True)
    session_id = Column(UUID(as_uuid=True), ForeignKey('conversation_sessions.id'), nullable=False)
    # typical message is quite long, and we have to account for worst case
    message = Column(Text, nullable=False)
    timestamp = Column(DateTime, server_default=func.now())

    # Relationship to link back to the ConversationSession
    session = relationship("ConversationSession", back_populates="messages")

    def __repr__(self):
        return f"Message(session_id='{self.session_id}', message='{self.message}', timestamp='{self.timestamp}')"


# Engine setup (change for your PostgreSQL setup)
USER = 'reco_admin'
PASSWORD = 'averysecurepasswordthatyouwillneverguess'
HOST = 'localhost'
PORT = '5432'
DB = 'reco'
DB_URL = f'postgresql://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}'
engine = create_engine(DB_URL)

  Base = declarative_base()


In [4]:
Base.metadata.create_all(engine)  # this will create the tables. run this only once

In [2]:
# To create a new sqlalchemy session
SqlalchemySession = sessionmaker(bind=engine)
session = SqlalchemySession()

In [3]:
# Test case: Add a new patient and a message

try:
    new_patient = Patient(username="johndoe", first_name="John", last_name="Doe", email="johndoe@anemail.com", password="password")
    session.add(new_patient)
    session.flush()

    new_healthcare_provider = HealthcareProvider(first_name="Joy", last_name="Jones", email="joyjones@hcpgroup.com", description="A test healthcare provider")
    new_healthcare_provider.patients.append(new_patient)
    session.add(new_healthcare_provider)
    session.flush()

    new_session = ConversationSession(patient_id=new_patient.id)
    session.add(new_session)

    session.commit()
except Exception as e:
    print(f"Error: {e}")
    session.rollback()
    new_patient = session.query(Patient).filter_by(username="johndoe").first()
    new_healthcare_provider = session.query(HealthcareProvider).filter_by(email="joyjones@hcpgroup.com").first()
    new_session = session.query(ConversationSession).filter_by(patient_id=new_patient.id).first()

Error: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "patients_username_key"
DETAIL:  Key (username)=(johndoe) already exists.

[SQL: INSERT INTO patients (username, first_name, last_name, email, password, healthcare_provider_id) VALUES (%(username)s, %(first_name)s, %(last_name)s, %(email)s, %(password)s, %(healthcare_provider_id)s) RETURNING patients.id, patients.created_at, patients.updated_at]
[parameters: {'username': 'johndoe', 'first_name': 'John', 'last_name': 'Doe', 'email': 'johndoe@anemail.com', 'password': 'password', 'healthcare_provider_id': None}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)


In [7]:
print(new_healthcare_provider)
print(new_healthcare_provider.patients)
print(new_session)

HealthcareProvider(first_name='Joy', last_name='Jones', email='joyjones@hcpgroup.com', created_at='2024-06-25 22:46:47.303100', updated_at='2024-06-25 22:46:47.303100')
[Patient(username='johndoe', first_name='John', last_name='Doe', email='johndoe@anemail.com', created_at='2024-06-25 22:46:47.303100', updated_at='2024-06-25 22:46:47.303100')]
ConversationSession(patient_id='1', created_at='2024-06-25 22:46:47.303100', updated_at='2024-06-25 22:46:47.303100', summary='None')


In [8]:
msg_text = '{"type": "ai", "data": {"content": "Hello, I am a bot", "additional_kwargs": {}, "response_metadata": {}, "type": "ai", "name": null, "id": null, "example": false, "tool_calls": [], "invalid_tool_calls": [], "usage_metadata": null}}'
new_message = Message(session_id=new_session.id, message=msg_text)
session.add(new_message)
session.commit()

# Query the database
patient = session.query(Patient).filter_by(username="johndoe").first()
pprint(patient.conversation_sessions[0].messages)
pprint(json.loads(patient.conversation_sessions[0].messages[0].message))

pprint(new_healthcare_provider.patients)

[Message(session_id='22dbe6b6-4d32-4ef9-b63c-8fdea526d7a7', message='{"type": "ai", "data": {"content": "Hello, I am a bot", "additional_kwargs": {}, "response_metadata": {}, "type": "ai", "name": null, "id": null, "example": false, "tool_calls": [], "invalid_tool_calls": [], "usage_metadata": null}}', timestamp='2024-06-25 22:46:51.470907'),
 Message(session_id='22dbe6b6-4d32-4ef9-b63c-8fdea526d7a7', message='{"type": "ai", "data": {"content": "Hello, I am a bot", "additional_kwargs": {}, "response_metadata": {}, "type": "ai", "name": null, "id": null, "example": false, "tool_calls": [], "invalid_tool_calls": [], "usage_metadata": null}}', timestamp='2024-06-25 22:50:31.617911'),
 Message(session_id='22dbe6b6-4d32-4ef9-b63c-8fdea526d7a7', message='{"type": "ai", "data": {"content": "Hello, I am a bot", "additional_kwargs": {}, "response_metadata": {}, "type": "ai", "name": null, "id": null, "example": false, "tool_calls": [], "invalid_tool_calls": [], "usage_metadata": null}}', timest

In [9]:
# Now connecting to LangChain

class CustomMessageConverter(DefaultMessageConverter):
    def __init__(self):
        self.model_class = Message

    def to_sql_model(self, message: BaseMessage, session_id: str) -> Message:
        return self.model_class(
            session_id=session_id,
            message=json.dumps(message_to_dict(message))
        )

    def get_sql_model_class(self):
        return Message


def get_session_history(session_id):
    return SQLChatMessageHistory(
        session_id=session_id,
        connection=engine,
        session_id_field_name="session_id",
        custom_message_converter=CustomMessageConverter(),
    )


In [10]:
# Test case: New session, new patient

try:
    patient = Patient(username="ashketchum", first_name="Ash", last_name="Ketchum", email="ashketchum@pallettown.com", password="pikachu")
    patient.healthcare_provider_id = new_healthcare_provider.id
    session.add(patient)
    session.commit()
except Exception as e:
    print(f"Error: {e}")
    session.rollback()

patient = session.query(Patient).filter_by(username="ashketchum").first()

new_conversation_session = patient.new_session()
session.add(new_conversation_session)
session.commit()

chat_history = get_session_history(new_conversation_session.id)
chat_history.get_messages()

[]

In [11]:
chat_history.add_ai_message("Hello Ash, I am a bot")
chat_history.add_user_message("Hello Bot, I am Ash! I wanna be the very best!")
chat_history.add_ai_message("Like no one ever was!")
chat_history.add_user_message("To catch them is my real test!")
chat_history.get_messages()

[AIMessage(content='Hello Ash, I am a bot'),
 HumanMessage(content='Hello Bot, I am Ash! I wanna be the very best!'),
 AIMessage(content='Like no one ever was!'),
 HumanMessage(content='To catch them is my real test!')]

In [12]:
# test retrieval from "cold"
test_return_chat_history = get_session_history(new_conversation_session.id)
test_return_chat_history.get_messages()

[AIMessage(content='Hello Ash, I am a bot'),
 HumanMessage(content='Hello Bot, I am Ash! I wanna be the very best!'),
 AIMessage(content='Like no one ever was!'),
 HumanMessage(content='To catch them is my real test!')]

## Integrate RunnableWithMessageHistory into our DialogueAgent

In [14]:
from dotenv import load_dotenv
from langchain_core.runnables.history import RunnableWithMessageHistory
from reco_analysis.chatbot.chatbot import DialogueAgent

load_dotenv("../.env")

dialogue_agent = DialogueAgent()

# get Ash
patient = session.query(Patient).filter_by(username="ashketchum").first()
session_id = patient.conversation_sessions[0].id

runnable_with_history = RunnableWithMessageHistory(
    dialogue_agent.chain,
    get_session_history=lambda: get_session_history(session_id),
)

dialogue_agent.chain = runnable_with_history
dialogue_agent.memory = get_session_history(session_id)

In [15]:
dialogue_agent.get_history()

['Doctor: Hello Ash, I am a bot',
 'Patient: Hello Bot, I am Ash! I wanna be the very best!',
 'Doctor: Like no one ever was!',
 'Patient: To catch them is my real test!']

In [16]:
dialogue_agent.send("Hello Ash, I am definitely not a bot")

In [17]:
dialogue_agent.get_history()

['Doctor: Hello Ash, I am a bot',
 'Patient: Hello Bot, I am Ash! I wanna be the very best!',
 'Doctor: Like no one ever was!',
 'Patient: To catch them is my real test!',
 'Doctor: Hello Ash, I am definitely not a bot']