In [1]:
!pip install -Uqq langchain
!pip install -Uqq langchain-community
!pip install -Uqq langchain-ollama

In [2]:
from langchain.llms import Ollama
from langchain_ollama import ChatOllama
from langchain.memory import ConversationBufferMemory
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_message_histories import SQLChatMessageHistory
from langchain_core.messages import HumanMessage
from langchain.memory import  ConversationBufferMemory
from langchain_core.runnables import RunnableSequence, RunnableLambda

In [3]:
! rm memory.db

In [4]:
generator_model_name = 'codeup'
reviewer_model_name = 'codellama:70b'
init_code = []
reviewed_code = []
refactored_code = []

In [5]:
code_generator = ChatOllama(model=generator_model_name)
code_reviewer = ChatOllama(model=reviewer_model_name)


In [6]:
code_generation_prompt = ChatPromptTemplate.from_template("""You are an expert .Net software engineer using the C# language. Cplete the following task:
{task}""")

In [7]:
code_review_prompt = ChatPromptTemplate.from_template("""You are an expert .Net software engineer using the C# language. You are reviewing code submitted to you for accuracy, readability, testability, and other software development metrics.
Please review this code and make suggestions: code: {code}""")

In [8]:
final_code_prompt = ChatPromptTemplate.from_template("""You are an expert .Net software engineer using the C# Language, your job is to refactor the code from the review suggestions.
Please refactor this code according to the suggestions. code: {code} feedback: {feedback}. Return only the refactored code.  Do not return any reasoning.""")

In [9]:
def get_session_history(session_id):
    return SQLChatMessageHistory(session_id, "sqlite:///memory.db")

In [10]:
def generate_code(task: str) -> str:
    chain =  code_generation_prompt | code_generator
    code = chain.invoke(task)
    init_code.append(code.content)
    return {'code': code.content}

In [11]:
def review_code(code: dict) -> str:
    chain =  code_review_prompt | code_reviewer
    feedback = chain.invoke(code['code'])
    reviewed_code.append(feedback.content)
    return {'code':code,'feedback':feedback.content}

In [12]:
def refactor_code(code_feedback: dict) -> str:
    chain =  final_code_prompt | code_generator
    result = chain.invoke(code_feedback)
    refactored_code.append(result.content)
    return result

In [13]:
generate_code_lambda = RunnableLambda(generate_code)
review_code_lambda = RunnableLambda(review_code)
refactor_code_lambda = RunnableLambda(refactor_code)

In [14]:
generate_review_refactor_sequence = generate_code_lambda | review_code_lambda | refactor_code_lambda

In [15]:
with_message_history = RunnableWithMessageHistory(
    generate_review_refactor_sequence,  
    get_session_history)

In [16]:
spec = """I am writing a C# Standard library that will be distributed as a nuget package implementing the outbox pattern.
I will be using a Postgresql database version 16 to store messages and track sending of the messages.
I would like to have the following columns:
1. Id which is a unique primary key column
2. Correlation Id to idenifty the message
3. Created At which is DateTime
4. Published At which is DateTime
5. Status which is an enum.  You should decide the values for the enum
6. Payload which should be able to hold both json and protobuf serialized messages
7. Retried which would hold the number of times a the system tried to send the message before succeeding

The payload column should accomodate any size of message.
The payload column should be able to hold either JSON or Protobuf messages.  

Make any changes to the columns you beleive should be made.
Add any additional tables necessary to suppor this table.
The table will primarily be searched on status, created at, and published at columns, or some combination thereof.  Please suggest indexes that might help.
Can you generate the Postgresql schema I should use?  The payload column that holds the message should be able to hold different serializations such as JSON and Protobuf.
Return just the SQL statements needed for the table."""

In [17]:
sql = with_message_history.invoke(spec,config={"configurable": {"session_id": "1"}})

  message_history = self.get_session_history(


In [18]:
with_message_history.invoke(
    """Can you create just a flyway script for the suggested schema? 
    Assume the application incorporating this table already uses flyway and the outbox table just needs added to an existing schema.
    Adding the table should be safe and error checked in case the system has an existing outbox table.
    return just the flyway script.""",config={"configurable": {"session_id": "1"}})

AIMessage(content="```sql\n// migration 1 - add messages table\nCREATE TABLE messages (\n    id SERIAL PRIMARY KEY,\n    correlation_id TEXT NOT NULL,\n    created_at TIMESTAMPTZ NOT NULL,\n    published_at TIMESTAMPTZ NOT NULL,\n    status ENUM('pending', 'sent', 'failed') NOT NULL DEFAULT 'pending',\n    payload BYTEA NOT NULL,\n    retries INTEGER NOT NULL default 0\n);\n\n// migration 2 - add message_versions table\nCREATE TABLE message_versions (\n    id SERIAL PRIMARY KEY,\n    message_id INTEGER NOT NULL REFERENCES messages(id),\n    version INTEGER NOT NULL default 1,\n    created_at TIMESTAMPTZ NOT NULL,\n    payload BYTEA NOT NULL\n);\n\n// migration 3 - add message_status table\nCREATE TABLE message_status (\n    id SERIAL PRIMARY KEY,\n    message_id INTEGER NOT NULL REFERENCES messages(id),\n    status ENUM('pending', 'sent', 'failed') NOT NULL default 'pending',\n    created_at TIMESTAMPTZ NOT NULL,\n    updated_at TIMESTAMPTZ NOT NULL\n);\n\n// migration 4 - add indexes\

In [19]:
dto = with_message_history.invoke(
"""Create a C# model class that conforms to the PostgresSQL table specified earlier?
Return just the model class.
""",config={"configurable": {"session_id": "1"}}) 

In [20]:
task = f"""Create a C# class using Npgsql to interact with the PostgreSQL table defined defined by the included sql and the included dto with the standard CRUD operations.
sql: {sql}
dto: {dto}
    Return just the repository layer code."""
repository = with_message_history.invoke(task,config={"configurable": {"session_id": "1"}})

In [21]:
task = f"""I would like to add a set of retrieval methods to the included repository layer class that can filter records by the sending status and the created at column. Reference the dto and sql to complete this task. This method should default to unsent messages but be able to be used for the other statuses.
respository: {repository}
sql: {sql}
dto: {dto}

 Return just the repository layer code."""
repository = with_message_history.invoke(task,config={"configurable": {"session_id": "1"}})

In [22]:
task = f"""I would like to add a set of retrieval methods to the included repository layer class that can filter records by the sending status and the the published at column. Reference the dto and sql to complete this task. This method should default to unsent messages but be able to be used for the other statuses.
respository: {repository}
sql: {sql}
dto: {dto}

 Return just the repository layer code."""
repository = with_message_history.invoke(task ,config={"configurable": {"session_id": "1"}})

In [23]:
task = f"""I would would like to add a Batch method to the included repository layer class that would allow a user to specify the number of messages to send. The Batch method must retrieve and exclusively lock the number of messages.
respository: {repository}
sql: {sql}
dto: {dto}

 Return just the repository layer code."""
with_message_history.invoke(task,config={"configurable": {"session_id": "1"}})

AIMessage(content='```csharp\nusing System;\nusing System.ComponentModel.DataAnnotations;\nusing System.Text.Json.Serialization;\n\nnamespace Outbox\n{\n    public class Message\n    {\n        [Required]\n        public int Id { get; set; }\n\n        public string CorrelationId { get; set; }\n\n        public DateTime CreatedAt { get; set; } = DateTime.UtcNow;\n\n        public Status Status { get; set; }\n\n        [JsonConverter(typeof(PayloadDataTypeConverter))]\n        public string Payload { get; set; }\n\n        [Required]\n        public int SendAttempts { get; set; } = 0;\n\n        public override string ToString() => $"Message: CorrelationId = {CorrelationId}, Status = {Status}";\n    }\n\n    public enum Status\n    {\n        Pending,\n        Sent,\n        Failed\n    }\n}\n```', response_metadata={'model': 'codeup', 'created_at': '2024-09-02T20:03:27.741146996Z', 'message': {'role': 'assistant', 'content': ''}, 'done_reason': 'stop', 'done': True, 'total_duration': 6

In [24]:
history = get_session_history("1")

In [25]:
print(history)

Human: I am writing a C# Standard library that will be distributed as a nuget package implementing the outbox pattern.
I will be using a Postgresql database version 16 to store messages and track sending of the messages.
I would like to have the following columns:
1. Id which is a unique primary key column
2. Correlation Id to idenifty the message
3. Created At which is DateTime
4. Published At which is DateTime
5. Status which is an enum.  You should decide the values for the enum
6. Payload which should be able to hold both json and protobuf serialized messages
7. Retried which would hold the number of times a the system tried to send the message before succeeding

The payload column should accomodate any size of message.
The payload column should be able to hold either JSON or Protobuf messages.  

Make any changes to the columns you beleive should be made.
Add any additional tables necessary to suppor this table.
The table will primarily be searched on status, created at, and publi

In [26]:
for ic in init_code:
    print(ic)


Here is the PostgreSQL schema that implements the Outbox pattern with the requested columns:
```sql
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    correlation_id TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    published_at TIMESTAMPTZ NOT NULL,
    status ENUM('pending', 'sent', 'failed') NOT NULL DEFAULT 'pending',
    payload BYTEA NOT NULL,
    retries INTEGER NOT NULL default 0
);

CREATE TABLE message_versions (
    id SERIAL PRIMARY KEY,
    message_id INTEGER NOT NULL REFERENCES messages(id),
    version INTEGER NOT NULL default 1,
    created_at TIMESTAMPTZ NOT NULL,
    payload BYTEA NOT NULL
);

CREATE TABLE message_status (
    id SERIAL PRIMARY KEY,
    message_id INTEGER NOT NULL REFERENCES messages(id),
    status ENUM('pending', 'sent', 'failed') NOT NULL default 'pending',
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_messages_status ON messages (status);
CREATE INDEX idx_messages_created_at ON messages (c

In [27]:
for suggestions in reviewed_code:
    print(suggestions)

1) The `messages` table includes a column `correlation_id` to support correlation IDs, a column `retries` to store the number of attempts made to publish the message, and a column `payload` for storing binary data, which can be used for serialized messages.
2) The `message_versions` table tracks each version of each message using a composite primary key consisting of `message_id` (referencing the `messages` table) and `version`.
3) The `message_status` table captures the status of each message, including when it was created, published, or updated.
4) Indexes on the `status`, `created_at`, and `published_at` columns in the `messages` table help improve performance for queries based on these attributes.
5) The unique index on `message_id` and `version` in the `message_versions` table ensures that each message has a unique version number.

Please note that this schema is a simplified example, and you should consider additional features, such as auditing, security, and scalability, when de

In [28]:
for fc in refactored_code:
    print(fc)

```sql
CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    correlation_id TEXT NOT NULL,
    created_at TIMESTAMPTZ NOT NULL,
    published_at TIMESTAMPTZ NOT NULL,
    status ENUM('pending', 'sent', 'failed') NOT NULL DEFAULT 'pending',
    payload BYTEA NOT NULL,
    retries INTEGER NOT NULL default 0
);

CREATE TABLE message_versions (
    id SERIAL PRIMARY KEY,
    message_id INTEGER NOT NULL REFERENCES messages(id),
    version INTEGER NOT NULL default 1,
    created_at TIMESTAMPTZ NOT NULL,
    payload BYTEA NOT NULL
);

CREATE TABLE message_status (
    id SERIAL PRIMARY KEY,
    message_id INTEGER NOT NULL REFERENCES messages(id),
    status ENUM('pending', 'sent', 'failed') NOT NULL default 'pending',
    created_at TIMESTAMPTZ NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_messages_status ON messages (status);
CREATE INDEX idx_messages_created_at ON messages (created_at);
CREATE INDEX idx_messages_published_at ON messages (published_at);

CREATE UNIQUE 