In [21]:
# DATABASE 
import os
import dotenv; dotenv.load_dotenv()
from supabase import create_client, Client



url = os.environ['PROJECT_URL']
key = os.environ['SERVICE_ROLE']

supabase: Client = create_client(url, key)


#  langChain GPT-4o-mini task manager.

### task manager with nlp


#### using library
- pgvector ( vectorstore )
- supabase database 
- langgraph 
- openai embedding, llm

In [22]:
import os

os.environ['OPENAI_API_KEY'] = ''

os.environ['LANGCHAIN_API_KEY'] = ''
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "llan"

In [23]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model='gpt-4o-mini',
    temperature=0,
    streaming=True
)


In [24]:
from langchain_core.tools import tool
from langchain_core.runnables.config import RunnableConfig
from typing import Annotated
import datetime
from langchain.vectorstores import SupabaseVectorStore
from langchain_openai import OpenAIEmbeddings
import pytz, uuid


# OpenAI text-embedding-ada-002
embeddings = OpenAIEmbeddings(model='text-embedding-3-small', dimensions=512)


# Supabase vectorstore pgvector 
vectorstore = SupabaseVectorStore(
    client=supabase,
    embedding=embeddings,
    table_name='tasks',
    query_name="match_tasks"
)


In [25]:
# SUPABASE.PY
from langchain_core.documents import Document

def similarity_search_by_vector_with_relevance_scores(
        query,
        filter,
        k = 4,
        score_threshold = None,
    ):
        match_documents_params = vectorstore.match_args(query, filter)
        query_builder = vectorstore._client.rpc(vectorstore.query_name, match_documents_params)
        query_builder.params = query_builder.params.set("limit", k)

        res = query_builder.execute()
        match_result = [
            (
                Document(
                    metadata=search.get("metadata", {}),  # type: ignore
                    page_content=search.get("content", ""),
                ),
                search.get("similarity", 0.0),
                search.get('id', '')
            )
            for search in res.data
            if search.get("content")
        ]

        if score_threshold is not None:
            match_result = [
                (doc, id)
                for doc, similarity, id in match_result
                if similarity >= score_threshold
            ]
    

        return match_result


In [26]:


@tool
def retriever_tool(query : Annotated[str, 'query'], config: RunnableConfig):
    "Searches for tasks based on a query. Input should be a search query string."
    configurable = config.get('configurable')

    rst = similarity_search_by_vector_with_relevance_scores(
        embeddings.embed_query(query),
        filter={"user": configurable.get('thread_id')},
        score_threshold=0.8
    )

    rstl = []
    tz = pytz.timezone(configurable.get('timezone'))

    for doc, id in rst:
        rstl.append({
            'id' : id,
            'content' : doc.page_content,
            'date'  : datetime.datetime.fromtimestamp(doc.metadata['task_date'], tz).strftime('%Y-%m-%d %H:%M:%S %Z%z'),
            'completed' : doc.metadata['completed']
        })
       
        
    return rstl



# DatetimeParser prompt in langchain
# DatetimeParser in langchain 
DateType = Annotated[str, "Write a datetime string that matches the following pattern: '%Y-%m-%dT%H:%M:%S.%fZ'. Examples: 969-03-01T08:41:26.015244Z, 798-06-03T17:13:25.764744Z, 1519-11-09T23:51:38.563271ZReturn ONLY this string, no other words!"]
def parser(dateString):
    return datetime.datetime.strptime(dateString.strip(), "%Y-%m-%dT%H:%M:%S.%fZ").timestamp()





# add task in vector database with id
@tool
def addTask(content : Annotated[str, 'something to do'], date : DateType, config: RunnableConfig):
    'add tast to database'

    try:

        configurable = config.get('configurable')
        res = vectorstore.add_texts(
            texts=[content],
            metadatas=[{
                'completed' : False,
                'task_date' : parser(date),
                'user' : configurable.get('thread_id'),
            }],
        )
        return True
    
    except Exception as e:
        return e
    




# load task with date range (timestamp) 
@tool
def loadTask(startDate : DateType, endDate : DateType, config: RunnableConfig):
    'load user task. Returns the task from startDate to endDate.'

    try:
            
        configurable = config.get('configurable')
        res = (
            supabase.table('tasks').select('content, metadata, id')
            .eq('metadata->>user', configurable.get('thread_id'))
            .gte('metadata->>task_date', parser(startDate))
            .lte('metadata->>task_date', parser(endDate))
            .execute()
        )
        return res
    
    except Exception as e:
        return e




@tool
def deleteTask(id : Annotated[str,'want delete id of task'], config: RunnableConfig):
    'delete user task with id. Use it carefully only when the user wants to delete it'

    try:
            
        configurable = config.get('configurable')
        res = (
            supabase.table('tasks')
            .delete()
            .eq('metadata->>user', configurable.get('thread_id'))
            .eq('id', id)
            .execute()
        )
        
        return True
    
    except Exception as e:
        return e


@tool
def changeCompletedOfTask(id : Annotated[str,'id of the task (that you want to change the "completed")'], config: RunnableConfig):
    'change "completed" of task with task id'

    try:
            
        configurable = config.get('configurable')
        res = (
            supabase.table('tasks')
            .select('metadata')
            .eq('metadata->>user', configurable.get('thread_id'))
            .eq('id', id)
            .execute()
        )
        
        metadata = res.data[0]['metadata']
        metadata['completed'] = True

        res = (
            supabase.table('tasks')
            .update({'metadata': metadata})
            .eq('metadata->>user', configurable.get('thread_id'))
            .eq('id', id)
            .execute()
        )
        return True
    
    except Exception as e:
        return e



tools = [addTask, loadTask, retriever_tool, deleteTask, changeCompletedOfTask]



In [27]:
from typing import Annotated
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig
from langgraph.graph.message import AnyMessage, add_messages
from typing_extensions import TypedDict

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        result = self.runnable.invoke({
            **state,
            'time': config.get('configurable').get('time'),
            'timezone': config.get('configurable').get('timezone')
        })
        return {'messages': result}

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            'system',
            "You are a competent secretary; meticulously add and manage users' schedules to the database."
            "If an error occurs, please stop and report it immediately."
            'current time : {time}'
            'timezone : {timezone}'
        ),
        ('placeholder', '{messages}')
    ]
)

assistant_runnable = primary_assistant_prompt | llm.bind_tools(tools)

In [28]:
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode

builder = StateGraph(State)

builder.add_node('assistant', Assistant(assistant_runnable))
builder.add_node('tools', ToolNode(tools))

builder.add_edge(START, 'assistant')
builder.add_conditional_edges(
    'assistant',
    tools_condition
)
builder.add_edge('tools', 'assistant')


<langgraph.graph.state.StateGraph at 0x12b887e60>

In [29]:
from psycopg import Connection
from langgraph.checkpoint.postgres import PostgresSaver


connection_kwargs = {
    "autocommit": True,
    "prepare_threshold": 0,
}


conn =  Connection.connect(os.environ['PROJECT_DATABASE_URL'], **connection_kwargs)
checkpointer = PostgresSaver(conn)

graph = builder.compile(checkpointer=checkpointer)







In [30]:

import datetime
from langchain_core.messages import AIMessageChunk




if __name__ == '__main__': 
    request = [

        {
            "messages": ("user", '지금 몇시지')
        },
        
        {
            "configurable": 
                {
                    "thread_id": '38a970b2-8f2a-406d-b52c-34dc490773f0',
                    'time' : '2024-10-09-21:30',
                    'timezone' : 'Asia/Seoul'
                }
        },

    ]

    # for event in graph.stream(
    #     *request,stream_mode="values",
    # ):
    #     event["messages"][-1].pretty_print()
    import pprint

    pprint.pprint(graph.invoke(*request,stream_mode="values"))


    # request[0]['messages'][1] = '안녕 친구?'



    conn.close()


DuplicatePreparedStatement: prepared statement "_pg3_0" already exists