In [1]:
from langchain.chat_models import ChatOpenAI
import os
langchain_chat_kwargs = {
    "temperature": 0,
    "max_tokens": 4000,
    "verbose": True,
}
chat_openai_model_kwargs = {
    "top_p": 1.0,
    "frequency_penalty": 0.0,
    "presence_penalty": -1,
}


# Optional: set the API key for OpenAI if it's not set in the environment.
os.environ["OPENAI_API_KEY"] = ""
def get_chat_openai(model_name):
    llm = ChatOpenAI(
        model_name=model_name,
        model_kwargs=chat_openai_model_kwargs,
        **langchain_chat_kwargs
    )
    return llm

In [2]:
## set up imports of local module code
import os
nb_path = os.path.abspath("")
from sys import path
from os.path import dirname

path.insert(0,  dirname(dirname(nb_path)))


In [3]:
import pandas as pd
from trilogy import Executor, Dialects
from trilogy.core.models import Environment
from sqlalchemy import create_engine
from trilogy.core.models import Datasource, Concept, ColumnAssignment, Grain, Function
from trilogy.core.enums import DataType, Purpose, FunctionType
from os.path import dirname
from pathlib import PurePath
from typing import Optional


def setup_engine() -> Executor:
    engine = create_engine(r"duckdb:///:memory:", future=True)
    csv = PurePath(nb_path) / "train.csv"
    df = pd.read_csv(csv)
    _ = df
    output = Executor(engine=engine, dialect=Dialects.DUCK_DB)

    output.execute_raw_sql("CREATE TABLE raw_titanic AS SELECT * FROM df")
    return output


def setup_titanic(env: Environment):
    namespace = "passenger"
    id = Concept(
        name="id", namespace=namespace, datatype=DataType.INTEGER, purpose=Purpose.KEY
    )
    age = Concept(
        name="age",
        namespace=namespace,
        datatype=DataType.INTEGER,
        purpose=Purpose.PROPERTY,
        keys=[id],
    )

    name = Concept(
        name="name",
        namespace=namespace,
        datatype=DataType.STRING,
        purpose=Purpose.PROPERTY,
        keys=[id],
    )

    pclass = Concept(
        name="class",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.INTEGER,
        keys=[id],
    )
    survived = Concept(
        name="survived",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.BOOL,
        keys=[id],
    )
    fare = Concept(
        name="fare",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.FLOAT,
        keys=[id],
    )
    embarked = Concept(
        name="embarked",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.INTEGER,
        keys=[id],
    )
    cabin = Concept(
        name="cabin",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.STRING,
        keys=[id],
    )
    ticket = Concept(
        name="ticket",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.STRING,
        keys=[id],
    )

    last_name = Concept(
        name="last_name",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.STRING,
        keys=[id],
        lineage=Function(
            operator=FunctionType.INDEX_ACCESS,
            arguments=[
                Function(
                    operator=FunctionType.SPLIT,
                    arguments=[name, ","],
                    output_datatype=DataType.ARRAY,
                    output_purpose=Purpose.PROPERTY,
                    arg_count=2,
                ),
                1,
            ],
            output_datatype=DataType.STRING,
            output_purpose=Purpose.PROPERTY,
            arg_count=2,
        ),
    )
    all_concepts = [
        id,
        age,
        survived,
        name,
        pclass,
        fare,
        cabin,
        embarked,
        ticket,
        last_name,
    ]
    for x in all_concepts:
        env.add_concept(x)

    env.add_datasource(
        Datasource(
            identifier="raw_data",
            address="raw_titanic",
            columns=[
                ColumnAssignment(alias="passengerid", concept=id),
                ColumnAssignment(alias="age", concept=age),
                ColumnAssignment(alias="survived", concept=survived),
                ColumnAssignment(alias="pclass", concept=pclass),
                ColumnAssignment(alias="name", concept=name),
                ColumnAssignment(alias="fare", concept=fare),
                ColumnAssignment(alias="cabin", concept=cabin),
                ColumnAssignment(alias="embarked", concept=embarked),
                ColumnAssignment(alias="ticket", concept=ticket),
            ],
            grain=Grain(components=[id]),
        ),
    )
    return env


def create_passenger_dimension(exec: Executor, name: str):
    exec.execute_raw_sql(f"CREATE SEQUENCE seq_{name} START 1;")
    exec.execute_raw_sql(
        f"""create table dim_{name} as 
                         SELECT passengerid id, name, age,
                          SPLIT(name, ',')[1] last_name
                            FROM raw_data

"""
    )


def create_arbitrary_dimension(exec: Executor, key: str, name: str):
    exec.execute_raw_sql(
        f"""create table dim_{name} as 
                         with tmp as 
                         (select {key}
                         from raw_data group by 1
                         )
                         SELECT  row_number() over() as id,
                         {key} as {name}
                          FROM tmp
"""
    )


def create_fact(
    exec: Executor,
    dims: Optional[list[str]] = None,
    include: Optional[list[str]] = None,
):
    exec.execute_raw_sql(
        """create table fact_titanic as 
                         SELECT 
                         row_number() OVER () as fact_key,
                         passengerid,
                         survived,
                         fare,
                         embarked,
                         b.id class_id,
                         cabin  
                         FROM raw_data a 
                         LEFT OUTER JOIN dim_class b on a.pclass=b.class
                         """
    )


def setup_normalized_engine() -> Executor:
    engine = create_engine(r"duckdb:///:memory:", future=True)
    csv = PurePath(dirname(nb_path)) / "train.csv"
    df = pd.read_csv(csv)
    _ = df
    output = Executor(engine=engine, dialect=Dialects.DUCK_DB)

    output.execute_raw_sql("CREATE TABLE raw_data AS SELECT * FROM df")
    create_passenger_dimension(output, "passenger")
    create_arbitrary_dimension(output, "pclass", "class")
    create_fact(output, ["passenger"])
    return output


def setup_titanic_distributed(env: Environment):
    namespace = "passenger"
    id = Concept(
        name="id", namespace=namespace, datatype=DataType.INTEGER, purpose=Purpose.KEY
    )
    age = Concept(
        name="age",
        namespace=namespace,
        datatype=DataType.INTEGER,
        purpose=Purpose.PROPERTY,
        keys=[id],
    )

    name = Concept(
        name="name",
        namespace=namespace,
        datatype=DataType.STRING,
        purpose=Purpose.PROPERTY,
        keys=[id],
    )
    class_id = Concept(
        name="_class_id",
        namespace=namespace,
        purpose=Purpose.KEY,
        datatype=DataType.INTEGER,
        # keys=[id],
    )
    pclass = Concept(
        name="class",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.INTEGER,
        keys=[class_id],
    )
    survived = Concept(
        name="survived",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.BOOL,
        keys=[id],
    )
    survived = Concept(
        name="survived",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.BOOL,
        keys=[id],
    )
    fare = Concept(
        name="fare",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.FLOAT,
        keys=[id],
    )
    embarked = Concept(
        name="embarked",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.INTEGER,
        keys=[id],
    )
    cabin = Concept(
        name="cabin",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.STRING,
        keys=[id],
    )
    last_name = Concept(
        name="last_name",
        namespace=namespace,
        purpose=Purpose.PROPERTY,
        datatype=DataType.STRING,
        keys=[id],
        lineage=Function(
            operator=FunctionType.INDEX_ACCESS,
            arguments=[
                Function(
                    operator=FunctionType.SPLIT,
                    arguments=[name, ","],
                    output_datatype=DataType.ARRAY,
                    output_purpose=Purpose.PROPERTY,
                    arg_count=2,
                ),
                1,
            ],
            output_datatype=DataType.STRING,
            output_purpose=Purpose.PROPERTY,
            arg_count=2,
        ),
    )
    for x in [id, age, survived, name, pclass, fare, cabin, embarked, last_name]:
        env.add_concept(x)

    env.add_datasource(
        Datasource(
            identifier="dim_passenger",
            address="dim_passenger",
            columns=[
                ColumnAssignment(alias="id", concept=id),
                ColumnAssignment(alias="age", concept=age),
                ColumnAssignment(alias="name", concept=name),
                ColumnAssignment(alias="last_name", concept=last_name),
                # ColumnAssignment(alias="pclass", concept=pclass),
                # ColumnAssignment(alias="name", concept=name),
                # ColumnAssignment(alias="fare", concept=fare),
                # ColumnAssignment(alias="cabin", concept=cabin),
                # ColumnAssignment(alias="embarked", concept=embarked),
            ],
            grain=Grain(components=[id]),
        ),
    )

    env.add_datasource(
        Datasource(
            identifier="fact_titanic",
            address="fact_titanic",
            columns=[
                ColumnAssignment(alias="passengerid", concept=id),
                ColumnAssignment(alias="survived", concept=survived),
                ColumnAssignment(alias="class_id", concept=class_id),
                ColumnAssignment(alias="fare", concept=fare),
                ColumnAssignment(alias="cabin", concept=cabin),
                ColumnAssignment(alias="embarked", concept=embarked),
            ],
            grain=Grain(components=[id]),
        ),
    )

    env.add_datasource(
        Datasource(
            identifier="dim_class",
            address="dim_class",
            columns=[
                ColumnAssignment(alias="id", concept=class_id),
                ColumnAssignment(alias="class", concept=pclass),
                # ColumnAssignment(alias="fare", concept=fare),
                # ColumnAssignment(alias="cabin", concept=cabin),
                # ColumnAssignment(alias="embarked", concept=embarked),
            ],
            grain=Grain(components=[class_id]),
        ),
    )

    return env


from logging import StreamHandler, DEBUG

from trilogy_nlp.constants import logger

executor = setup_engine()

env = Environment()
model = setup_titanic(env)
for c in env.concepts:
    print(c)

executor.environment = env
environment = env
logger.setLevel(DEBUG)
logger.addHandler(StreamHandler())




__trilogy_internal.concept_name
__trilogy_internal.datasource
__trilogy_internal.query_text
passenger.id
passenger.id.count
passenger.age
passenger.survived
passenger.name
passenger.class
passenger.fare
passenger.cabin
passenger.embarked
passenger.ticket
passenger.last_name


In [4]:
from langchain.tools import Tool
from datetime import datetime
import json
from pydantic import BaseModel
from trilogy.core.enums import ComparisonOperator, Ordering

class FilterResult(BaseModel):
    """The result of the filter prompt"""

    column: str
    values: list[str|int |float|bool]
    operator: ComparisonOperator




class OrderResult(BaseModel):
    """The result of the order prompt"""

    column: str
    order: Ordering


class InitialParseResponse(BaseModel):
    """The result of the initial parse"""

    columns: list[str]
    limit: Optional[int] = 100
    order: Optional[list[OrderResult]] = None
    filtering: Optional[list[FilterResult]] = None

    @property
    def selection(self) -> list[str]:
        filtering = [f.concept for f in self.filtering]
        order = [x.concept for x in self.order]
        return list(set(self.metrics + self.dimensions + filtering + order))



def run_query_save_results(query:str):
    executor.execute_query(query)

def validate_query(query:str):
    return json.dumps({'validated':True})
    parsed = InitialParseResponse.model_validate_json(query)
    for x in parsed.fields:
        if x not in environment.concepts:
            raise ValueError(f"{x} in output columns not in concepts")
    for y in parsed.order:
        if y.concept not in environment.concepts:
            raise ValueError(f"{y.concept} in ordering not in concepts")
    for z in parsed.filtering:
        if z.concept not in environment.concepts:
            raise ValueError(f"{z.concept} in filtering not in concepts")
    
def get_dataset_description(query:str):
    """
    Get the description of the dataset.
    """
    return json.dumps({'description':'This is a dataset contain the details of a subset of the passengers on board the Titanic (891 to be exact) and importantly, will reveal whether they survived or not, and some other information about them.'})


def get_today_date(query: str) -> str:
    """
    Useful to get the date of today.
    """
    # Getting today's date in string format
    today_date_string = datetime.now().strftime("%Y-%m-%d")
    return today_date_string

def concept_to_string(concept:Concept):
    return concept.address

def get_concepts(*args, **kwargs) -> str:
    return json.dumps({'concepts':[concept_to_string(x) for 
                                   x in environment.concepts.values()]})

def sql_agent_tools():
    tools = [
                Tool.from_function(
            func=get_dataset_description,
            name="get_dataset_description",
            description="""
            Get some context on the datasets you're working with, if the question is not clear""",
        ),
        Tool.from_function(
            func=validate_query,
            name="validate_response",
            description="""
            Check that your response is formatted properly and accurate before sending it to the CEO.
            """,
        ),
        Tool.from_function(
            func=get_concepts,
            name="get_concepts",
            description="""
            The list of fields that can be selected.
            """,
        ),
        Tool.from_function(
            func=get_today_date,
            name="get_today_date",
            description="""
            Useful to get the date of today.
            """,
        ),
    ]
    return tools

In [16]:
from langchain.agents import create_structured_chat_agent

llm_agent = get_chat_openai('gpt-3.5-turbo-1106')

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

system = '''Thought Process: You are an analyst. Your job is to get questions from 
the CEO and tell them how to write a 
SQL query to answer them in a step by step fashion. 
You have access to a single predefined sql table and have to pick the best columns from 
it to answer the question; you cannot create or derive any new concepts. 
If nothing seems like it will accurately answer the question, 
you should tell the CEO that his question can't be answered yet. 

Your goal will be to create a summary of steps in JSON format for the CEO. 
The final output should be a VALID JSON blob with the following keys and values followed by a stopword: <EOD>:
- columns: a list of columns
- limit: a number of records to limit the results to, -1 if none specified
- order: a list of  columns to order the results by, with the option to specify ascending or descending
-- column: a column name to order by
-- order: the direction of ordering, "asc" or "desc"
- filtering: a list of all objects to filter the results on, where each object has the following keys:
-- column: a column to filter on
-- values: the value the column is filtered to
-- operator: the comparison operator, one of "=", "in", "<", ">", "<=", "like", or ">=". A range, or between, should be expressed as two inequalities. 



You have access to the following tools:

{tools}

Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).

Valid "action" values: "Final Answer" or {tool_names}

You should always call the the validate_response tool with your final answer before sending it to the CEO.

Provide only ONE action per $JSON_BLOB, as shown:

```
{{
    "action": $TOOL_NAME,
    "action_input": $INPUT
}}
```

Follow this format:

Question: input question to answer
Thought: consider previous and subsequent steps
Action:
```
$JSON_BLOB
```
Observation: action result
... (repeat Thought/Action/Observation N times)
Thought: I know what to respond
Action:
```
{{
    "action": "Final Answer",
    "action_input": "Final response to human"
}}

Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation'''

human = '''{input}

{agent_scratchpad}

(reminder to respond in a JSON blob no matter what)'''

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system),
        MessagesPlaceholder("chat_history", optional=True),
        ("human", human),
    ]
)
agent = create_structured_chat_agent(
    llm=llm_agent,
    tools=sql_agent_tools(),
    # suffix=CUSTOM_SUFFIX,
    prompt =  prompt,
    
)

In [22]:
from langchain.agents import AgentExecutor, create_structured_chat_agent

agent_executor = AgentExecutor(
    agent=llm_agent, tools=sql_agent_tools(), verbose=True, handle_parsing_errors=True
)

result = agent_executor.invoke( "List the first names of survivors on the titanic?"
)

IndexError: list index out of range

In [11]:
print(result)

{'input': 'How many passengers survived in each class?', 'output': "I need to select the 'passenger_class' and 'survived' columns from the dataset. Then I can group the data by 'passenger_class' and count the number of passengers who survived in each class."}


In [12]:
from pprint import pprint
pprint(result['output'])

("I need to select the 'passenger_class' and 'survived' columns from the "
 "dataset. Then I can group the data by 'passenger_class' and count the number "
 'of passengers who survived in each class.')


In [9]:
InitialParseResponse.model_validate(result['output'])

ValidationError: 1 validation error for InitialParseResponse
  Input should be a valid dictionary or instance of InitialParseResponse [type=model_type, input_value="I'm unable to retrieve t...nd their relationships.", input_type=str]
    For further information visit https://errors.pydantic.dev/2.6/v/model_type