<a href="https://colab.research.google.com/github/maheshboj/agenticai_basics/blob/Langchain_components/SF_DDL_Builder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
%pip install langchain lanchain_openai pyarrow pandas

[31mERROR: Could not find a version that satisfies the requirement lanchain_openai (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for lanchain_openai[0m[31m
[0m

In [10]:
%pip install pyyaml



In [11]:
%pip install -qU langchain-openai

In [12]:
%pip install -qU langchain_community

In [13]:
%pip install -qU snowflake-connector-python snowflake-sqlalchemy

In [14]:
import os
import json
from typing import Dict, List, Any
import pandas as pd
import pyarrow.parquet as pq
from langchain.agents import create_agent
from langchain_core.messages import SystemMessage
from langchain.tools import tool
from pydantic import BaseModel, Field
from typing import List
import yaml
from langchain_community.utilities import SQLDatabase

In [15]:
def df_to_schema(df: pd.DataFrame) -> List[Dict[str, Any]]:
    """
    Convert pandas DataFrame dtypes to a simple schema representation:
    [{ "column": "id", "dtype": "int64", "nullable": False, "sample_values": [...] }, ...]
    """
    schema = []
    # small samples for inference
    sample = df.head(10).to_dict(orient="list")
    for col in df.columns:
        dtype = str(df[col].dtype)
        nullable = df[col].isnull().any()
        samples = sample.get(col, [])[:5]
        schema.append({
            "column": col,
            "dtype": dtype,
            "nullable": bool(nullable),
            "sample_values": samples
        })
    return schema

In [16]:
@tool("read_local_folder_and_infer_schema")
def read_local_folder_and_infer_schema(folder_path: str) -> str:
    """
    Scans a folder, reads supported files (csv, json, parquet, xlsx),
    and returns a JSON string that lists each file and its inferred schema.
    """
    result = []
    for entry in os.listdir(folder_path):
        full = os.path.join(folder_path, entry)
        if os.path.isdir(full):
            continue
        name_lower = entry.lower()
        try:
            if name_lower.endswith(".csv"):
                df = pd.read_csv(full, nrows=1000)  # sample up to 1000 rows
            elif name_lower.endswith(".json"):
                # Try JSON lines first, fallback to standard JSON
                try:
                    df = pd.read_json(full, lines=True)
                except ValueError:
                    df = pd.read_json(full)
            elif name_lower.endswith(".parquet") or name_lower.endswith(".pq"):
                df = pq.read_table(full).to_pandas()
            elif name_lower.endswith(".xlsx") or name_lower.endswith(".xls"):
                df = pd.read_excel(full)
            else:
                # skip unknown
                continue
        except Exception as e:
            result.append({
                "file": entry,
                "error": f"failed to read: {repr(e)}"
            })
            continue

        schema = df_to_schema(df)
        result.append({
            "file": entry,
            "rows_sampled": len(df),
            "columns": schema
        })

    return json.dumps({"folder": folder_path, "files": result}, indent=2)

In [18]:
# output=read_local_folder_and_infer_schema("/content/sample_data/files")

In [19]:
### ---------- Helper: map pandas dtype -> Snowflake type ----------
def map_dtype_to_snowflake(dtype: str, sample_values: List[Any]) -> str:
    """
    Simple mapping rules. Expand as needed for production.
    """
    # normalize dtype
    d = dtype.lower()
    if "int" in d:
        return "NUMBER"
    if "float" in d or "double" in d or "decimal" in d:
        return "FLOAT"
    if "bool" in d or "boolean" in d:
        return "BOOLEAN"
    # date/datetime detection via dtype or sample inspection
    if "datetime" in d or "timestamp" in d:
        return "TIMESTAMP_NTZ"
    if "date" in d:
        return "DATE"

    # otherwise treat as text; try to detect length from samples
    max_len = 0
    for v in sample_values:
        try:
            max_len = max(max_len, len(str(v)))
        except Exception:
            pass
    if max_len <= 256 and max_len > 0:
        return f"VARCHAR({max_len})"
    return "TEXT"

In [20]:
@tool("generate_snowflake_ddl")
def generate_snowflake_ddl(schema_json_str: str, table_name_hint: str = None) -> str:
    """
    Input: JSON string produced by `read_local_folder_and_infer_schema`.
    Output: a textual DDL for Snowflake CREATE TABLE statements.
    """
    data = json.loads(schema_json_str)
    files = data.get("files", [])
    ddl_texts = []

    for f in files:
        if "error" in f:
            continue
        file_name = f.get("file")
        if table_name_hint:
            tbl = table_name_hint
        else:
            # derive table name from file name (remove extension, non-alphanum -> _)
            base = os.path.splitext(file_name)[0]
            tbl = "".join([c if c.isalnum() else "_" for c in base]).upper()[:128]

        columns = f.get("columns", [])
        col_lines = []
        for col in columns:
            col_name = col["column"]
            sf_type = map_dtype_to_snowflake(col["dtype"], col.get("sample_values", []))
            nullable = "" if not col.get("nullable", False) else "NULL"
            # default to NOT NULL if no nulls, else allow NULL
            null_clause = "NOT NULL" if not col.get("nullable", False) else "NULL"
            col_line = f'  "{col_name.upper()}" {sf_type} {null_clause}'
            col_lines.append(col_line)

        ddl = f'CREATE OR REPLACE TABLE {tbl} (\n' + ",\n".join(col_lines) + "\n);\n-- source file: " + file_name
        ddl_texts.append(ddl)

    return "\n\n".join(ddl_texts)

In [21]:
from google.colab import userdata
os.environ['OPENAI_API_KEY']=userdata.get('OPENAI_KEY2')

In [22]:
from langchain.chat_models import init_chat_model
llm = init_chat_model('openai:gpt-4o-mini')

In [24]:
# def build_agents_and_run(folder_path: str, table_name_hint: str = None):
#     """
#     Example orchestration:
#      1) Use agent A's tool to get schema
#      2) Pass schema JSON into agent B tool to get DDL
#     """
#     # Build a simple agent: specify the model name according to your provider.
#     # For example: model="openai:gpt-5" or "openai:gpt-4o"
#     # You must have the provider API keys configured in env for your model.
#     agent = create_agent(
#         model=llm,  # replace with your model identifier
#         tools=[read_local_folder_and_infer_schema, generate_snowflake_ddl],
#         system_prompt=SystemMessage(content="You are an assistant that calls the tools to inspect local files and produce Snowflake DDL. Use the tools rather than inventing file contents.")
#     )

#     print("\n=== Agent-run (agent may call tools interactively) ===")

#     question = f"Please inspect files in {folder_path} and generate Snowflake DDL for the files in the folder."
#     for step in agent.stream(
#         {"messages": [{"role": "user", "content": question}]},
#         stream_mode="values",
#     ):
#         step["messages"][-1].pretty_print()
# if __name__ == "__main__":
#     folder = "/content/sample_data/files"  # change to your folder
#     build_agents_and_run(folder, table_name_hint=None)


In [23]:
class SF_DDL(BaseModel):
  DDL: List[str] = Field(description="List of DDL for Snowflake DB")

In [25]:
folder_path="/content/sample_data/files"
agent1 = create_agent(
    model=llm,  # replace with your model identifier
    tools=[read_local_folder_and_infer_schema, generate_snowflake_ddl],
    system_prompt=SystemMessage(content="You are an assistant that calls the tools to inspect local files and produce Snowflake DDL. Use the tools rather than inventing file contents."),
    response_format=SF_DDL
)



In [26]:
question = f"Please inspect files in {folder_path} and generate Snowflake DDL for the files in the folder."
reponse=agent1.invoke({"messages": [{"role": "user", "content": question}]})

In [30]:
def create_snowflake_tables(ddl: str) -> None:
  with open("snowflake_config.yaml", "r") as f:
      config = yaml.safe_load(f)

  sf = config["snowflake"]

  # Build Snowflake connection URI
  connection_uri = (
      f"snowflake://{sf['user']}:{sf['password']}@{sf['account']}/"
      f"{sf['database']}/{sf['schema']}?warehouse={sf['warehouse']}"
  )

  # If role is included
  if "role" in sf:
      connection_uri += f"&role={sf['role']}"

  #print("Snowflake URI:", connection_uri)

  db = SQLDatabase.from_uri(connection_uri)
  db.run(ddl)
  print("Done!" , f"create sf table for {ddl}")

In [27]:
for ddl in reponse['structured_response'].DDL:
  print(create_snowflake_tables(ddl))

CREATE OR REPLACE TABLE DIM_Customer (CustomerKey FLOAT, "First Name" STRING NOT NULL, "Last Name" STRING NOT NULL, "Full Name" STRING NOT NULL, Gender STRING NOT NULL, DateFirstPurchase STRING NOT NULL, "Customer City" STRING);
CREATE OR REPLACE TABLE DIM_Product (ProductKey FLOAT, ProductItemCode STRING NOT NULL, "Product Name" STRING NOT NULL, "Sub Category" STRING, "Product Category" STRING, "Product Color" STRING, "Product Size" STRING, "Product Line" STRING, "Product Model Name" STRING, "Product Description" STRING, "Product Status" STRING NOT NULL);
CREATE OR REPLACE TABLE FACT_InternetSales (ProductKey INT NOT NULL, OrderDateKey INT NOT NULL, DueDateKey INT NOT NULL, ShipDateKey INT NOT NULL, CustomerKey INT NOT NULL, SalesOrderNumber STRING NOT NULL, SalesAmount FLOAT NOT NULL);
