## Example Join Key Finder Agent

Purpose:  demonstrate a LangChain agent that can help us reverse
engineer unfamiliar database tables--for example, ones that were
recently imported into a company's data warehouse from a vendor, with
minimal documentation.

Amazon Q Developer was used to create the first draft of these functions.
For more details, please see:
https://github.com/zenbananas19/ai-for-data-modeling

March 13, 2025

In [None]:
# -- Useful libraries
import os
import sys
import io
import re
import json
import csv as csv
import psycopg2
import pandas as pd
from typing import Dict

from langchain.tools import StructuredTool
from langchain.agents import initialize_agent, AgentType

# Many LLM options are available.
# For one of the agent's steps, let's try DeepSeek through Ollama.
from langchain.llms import OpenAI
from langchain_community.llms import Ollama


In [None]:
# -- Database connection setup.  Please configure yours as appropriate.

DB_CONFIG = {
    "dbname": "a_company_database",
    "user": "your_user_name",
    "password": f"{secure_password}",
    "host": "cloud data warehouse URL",
    "port": "a port number"
}

In [8]:
# -- See if ollama list shows a running instance
print("TESTING OLLAMA CONNECTION")
!ollama list

# Load the local model.  Both of these run fine on a Macbook Pro.
# llm = Ollama(model="deepseek-r1:14b")
llm = Ollama(model="deepseek-r1:8b")

# Try a test prompt
response = llm.invoke("What is LangChain?")
print(response)

TESTING OLLAMA CONNECTION
NAME               ID              SIZE      MODIFIED     
deepseek-r1:14b    ea35dfe18182    9.0 GB    10 hours ago    
deepseek-r1:8b     28f8fd6cdc67    4.9 GB    4 weeks ago     
llama3:latest      365c0bd3c000    4.7 GB    6 months ago    
<think>

</think>

LangChain is a framework designed to enable large language models (LLMs) such as OpenAI's GPT-3 to process and generate text in sequences. It allows for chaining multiple LLM operations, enabling tasks like text generation, summarization, question answering, and more complex reasoning processes.

Key features of LangChain include:

1. **Chaining Operations**: Enables the concatenation of multiple LLM responses or operations.
2. **Context Management**: Maintains context across different steps of an operation.
3. **Multi-Modal Support**: Can integrate text, images, audio, or other data types.
4. **Flexibility**: Supports various use cases and customization through plugins.

LangChain is particularly use

## Agent Functions

Each of these functions will become LangChain Agent tools, below.

In [None]:
# -- Helper function to query our cloud data warehouse
def execute_redshift_query(query: str):
    """Executes a SQL query on Redshift and returns the result as a DataFrame."""
    conn = psycopg2.connect(**DB_CONFIG)
    df = None
    try:
        with conn.cursor() as cur:
            cur.execute(query)
            colnames = [desc[0] for desc in cur.description]
            rows = cur.fetchall()
            df = pd.DataFrame(rows, columns=colnames)
    finally:
        conn.close()
    return df

In [None]:
# -- Step 1 & 3: Pull up to 5000 random rows from a table
def get_random_rows(table_name: str):
    """Fetches 5000 random rows from the specified database table."""
    query = f"""
    SELECT * FROM {table_name}
    ORDER BY RANDOM()
    LIMIT 5000;
    """
    return execute_redshift_query(query)

In [None]:
# -- Step 2 & 4: Analyze table and detect primary key candidates
def analyze_primary_key(df: Dict[str, list]):
    """Identifies the most likely primary key based on cardinality, null counts, data types, and sample values."""
    df = pd.DataFrame(df)  # Convert dictionary to DataFrame
    total_rows = len(df)
    pk_characteristics = {}
    
    for column in df.columns:
        unique_count = df[column].nunique()
        null_count = df[column].isna().sum()
        data_type = str(df[column].dtype)
        sample_value = df[column].dropna().iloc[0] if null_count < total_rows else None
        
        # Probability is based on uniqueness and non-null values
        pk_prob = 0.85 * ((unique_count / total_rows) * (1 - null_count / total_rows)) \
                    + 0.15 * (("id" in column.lower()) | ("tag" in column.lower()))
        
        pk_characteristics[column] = {
            "probability": round(pk_prob, 2),
            "data_type": data_type,
            "sample_value": sample_value
        }
    
    return pk_characteristics


In [None]:
# -- Step 5: Recommend join keys based on primary key analysis.
# -- Note:  tune the weightings for your database column naming conventions.
 
def recommend_join_keys(data: Dict[str, Dict[str, Dict]]):
    """Finds common primary key candidates across two tables using weighted conditions for probability, data types, and sample values."""
    pk_table1 = data["pk_table1"]
    pk_table2 = data["pk_table2"]
    key_scores = {}
    
    for col1, attr1 in pk_table1.items():
        for col2, attr2 in pk_table2.items():
            score = (
                0.22 * (col1 == col2) +
                0.22 * (attr1["probability"] > 0.8) +
                0.22 * (attr2["probability"] > 0.8) +
                0.22 * (attr1["data_type"] == attr2["data_type"]) +
                0.16 * (attr1["sample_value"] is not None and attr2["sample_value"] is not None and len(str(attr1["sample_value"])) == len(str(attr2["sample_value"])))
            )
            best_col_score = max(key_scores.get(col1, {"score": 0})["score"], score)
            if best_col_score >= 0.5:
                key_scores[col1] = {
                    "score": max(key_scores.get(col1, {"score": 0})["score"], score),
                    "table": "both" if col1 in pk_table1 and col1 in pk_table2 else ("table1" if col1 in pk_table1 else "table2")
                }
    
    sorted_keys = sorted(key_scores.items(), key=lambda x: x[1]["score"], reverse=True)
    return {"recommended_keys": [{"key": key, "score": details["score"], "table": details["table"]} for key, details in sorted_keys]}
    

In [None]:
# -- Step 6: Execute an inner join query
# AgentType.ZERO_SHOT... requires a single argument, so we're making it a dictionary.
def execute_join_query(data: Dict[str, str]):
    """Executes an inner join query on two tables using the specified join key."""
    table1 = data["table1"]
    table2 = data["table2"]
    join_key = data["join_key"]
    query = f"""
    SELECT COUNT(*) AS match_count FROM {table1} t1
    INNER JOIN {table2} t2 ON t1.{join_key} = t2.{join_key};
    """
    return execute_redshift_query(query)

In [None]:
# -- Step 7: Assess the join operation
def assess_join_success(data: Dict[str, list]):
    """Analyzes the join success by calculating the match count and success rate."""
    df = pd.DataFrame(data["join_result"])  # Convert dictionary to DataFrame
    total_rows1 = data["total_rows1"]
    total_rows2 = data["total_rows2"]
    match_count = df.iloc[0, 0]
    join_success_rate = match_count / min(total_rows1, total_rows2)
    return {"match_count": match_count, "join_success_rate": round(join_success_rate, 2)}


In [None]:
# -- Define LangChain Tools
get_table1_tool = StructuredTool.from_function(get_random_rows, description="Fetches 5000 random rows from a specified table.")
get_table2_tool = StructuredTool.from_function(get_random_rows, description="Fetches 5000 random rows from a specified table.")
analyze_pk_tool = StructuredTool.from_function(analyze_primary_key, description="Analyzes a dictionary representing a DataFrame to determine primary key probabilities, data types, and sample values.")
recommend_keys_tool = StructuredTool.from_function(recommend_join_keys, description="Recommends the best columns to use as join keys between two tables, considering weighted probabilities, data types, and sample values.")
execute_join_tool = StructuredTool.from_function(execute_join_query, description="Executes an inner join query using a dictionary containing table names and a join key.")
assess_join_tool = StructuredTool.from_function(assess_join_success, description="Assesses the success of a join operation using a dictionary containing join results and row counts.")


In [None]:
# -- Initialize LangChain Agent
# One of llm = Ollama(model="deepseek-r1:8b") or llm = Ollama(model="deepseek-r1:14b") is defined at the top.

agent = initialize_agent(
    tools=[get_table1_tool, get_table2_tool, analyze_pk_tool, recommend_keys_tool, execute_join_tool, assess_join_tool],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
    handle_parsing_errors=True,
    max_execution_time=150  # give it 150 seconds to run
)

In [None]:

# -- Example run (Replace 'table1' and 'table2' with actual table names)
table1_df = get_random_rows("sample.orderinfo")
table2_df = get_random_rows("sample.skuinfo")
pk_table1 = analyze_primary_key(table1_df.to_dict(orient='list'))
pk_table2 = analyze_primary_key(table2_df.to_dict(orient='list'))
jk_args = {"pk_table1": pk_table1, "pk_table2": pk_table2}
key_scores = recommend_join_keys(jk_args)

In [None]:
prompt = """
Please consider yourself a database administrator who is working on a data model for joining two unknown tables.
First goal:  from the lists of foreign key candidates and their scores, please identify the best foreign keys to join the two tables.
The JSON schema meanings are:
'key' = the column name
'score' = likelihood of being a good join key
'table' = which of the two tables the column appears in.  If 'both', it appears in both tables.

Please return a valid JSON object with the final answer, like this:
{'Final Result': 'key name', 'Recommended Keys': [{'key': 'key name', 'score': 0.6, 'table': 'table1'}, ...]}


Details:  the lists of columns for two database tables are provided.
These columns were evaluated according to the following formula, implemented in Python, for their likelihood of being primary keys, and for potentially being foreign keys for joining to other tables.
                0.22 * (column1 == column2) +
                0.22 * (attr1["primary_key_probability"] > 0.8) +
                0.22 * (attr2["primary_key_probability"] > 0.8) +
                0.22 * (attr1["column_data_type"] == attr2["column_data_type"]) +
                0.16 * (attr1["sample_value"] is not None and attr2["sample_value"] is not None and len(str(attr1["sample_value"])) == len(str(attr2["sample_value"])))

The formula produces a score, where a high score close to 1.0 is more likely to be of value as a join key.

"""



In [None]:
# -- Simple approach to capturing the output, as the verbose record of the LLM's thoughts and actions.
# -- We expect the final output to be in a JSON at the bottom of the response text.

# Redirect stdout to a string buffer
captured_output = io.StringIO()
sys.stdout = captured_output

# Run your agent
response_text = agent.invoke({"input": {"prompt": prompt, "key_scores": key_scores}})    #, tool="recommend_keys_tool")

# Restore stdout
sys.stdout = sys.__stdout__

# Get the captured output as a string
verbose_output = captured_output.getvalue()

verbose_output

> Entering new AgentExecutor chain...
<think>
Okay, so I'm trying to figure out how to best join two tables based on the information given. The prompt mentions that we're working with a database administrator's task, and the goal is to identify the best foreign keys for joining two unknown tables. 

First, let me parse through the details provided. There's a JSON schema mentioned where each key has attributes like 'key', 'score', and 'table'. The score is calculated using a formula that takes into account several factors:

1. 0.22 * (column1 == column2): This checks if the columns are identical in both tables, which would be ideal for a join.
2. 0.22 * (attr1["primary_key_probability"] > 0.8) and similarly for attr2: This looks at whether each column has a high probability of being a primary key, which suggests they're unique and good candidates for joins.
3. 0.22 * (data types match between the two columns): Same data type is important because you can't join different data types effectively.
4. 0.16 * (sample values match in length): This ensures that the sample values from both tables are non-null and of the same length, which might indicate they're linked.

The provided key_scores show a list of keys with their scores and which table(s) they belong to. The 'sku_id' key has the highest score at 0.82 and appears in both tables, which is great because it means it exists in both, making it more likely to be a good join key. 

So, considering the formula, 'sku_id' with such a high score should definitely be considered the best foreign key for joining. It satisfies multiple conditions: it's present in both tables, has a high primary key probability, and matches in data type and sample value length. Plus, it has a higher score than all other keys.

I also notice that 'sku_id' is the only key with a score above 0.8, which makes it stand out. The next highest is 0.66, but those don't have as strong a presence in both tables or meet the high primary key probability threshold.

Therefore, based on the analysis, 'sku_id' should be selected as the best foreign key to join these two tables.
</think>

To determine the best foreign keys for joining the two tables, we analyzed each candidate column using the provided formula. The 'sku_id' column emerged as the top choice due to its high score and presence in both tables.

**Final Result:**
{'Final Result': 'sku_id', 'Recommended Keys': [{'key': 'sku_id', 'score': 0.82, 'table': 'both'}]}
Observation: Invalid Format: Missing 'Action:' after 'Thought:
Thought:<think>
Alright, let me walk through how I arrived at the conclusion to use 'sku_id' as the best foreign key.

1. **Understanding the Problem**: The task is to join two tables based on their columns. Each column has a score indicating its likelihood of being a good join key.

2. **Analyzing Key Scores**: The 'planid' column has the highest score (0.82) and appears in both tables, which are strong indicators for a successful join.

3. **Considering Formula Components**:
   - The column is present in both tables.
   - It has a high primary key probability.
   - Data types match.
   - Sample values are of equal length and non-null.

4. **Conclusion**: 'sku_id' satisfies all the necessary conditions, making it the optimal choice for joining the two tables.

**Final Answer:**
{'Final Result': 'sku_id', 'Recommended Keys': [{'key': 'sku_id', 'score': 0.82, 'table': 'both'}]}

> Finished chain.


## TO DO:  extract the final answer from the response text, and test its recommended join key in steps 6 and 7