## RSQL Lineage Generation

### Expert Question Answerer for InsureLLM

In [48]:
import os
import glob
import json
from pathlib import Path
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import List, Optional
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

In [49]:
load_dotenv(override=True)
openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key:
    print(f"OpenAI API Key exists and begins {openai_api_key[:8]}")
else:
    print("OpenAI API Key not set")


OpenAI API Key exists and begins sk-proj-


In [50]:
# Define the Schema for Collibra-Compatible Lineage
class LineageStep(BaseModel):
    operation: str = Field(description="The SQL operation: COPY, DELETE, INSERT, etc.")
    source_name: str = Field(description="The source table or S3 path")
    target_name: str = Field(description="The destination table")
    logic: str = Field(description="Short description of the transformation logic")

class LineageFull(BaseModel):
    script_name: str
    lineage: List[LineageStep]

In [51]:
SYSTEM_PROMPT_MESSAGE1 = """

You are a Senior Data Architect specializing in Metadata Management and Collibra Data Governance. 
Your task is to parse RSQL (Amazon Redshift) ETL scripts and generate a technical data lineage map 
in a structured JSON format.

### OBJECTIVE
Analyze the provided SQL code to identify every movement of data. You must distinguish between external 
sources (S3), staging areas (Temp Tables), and final production targets.

### EXTRACTION RULES
1. SOURCE IDENTIFICATION: Look for 'COPY' commands to identify S3 buckets, manifests, and file formats (e.g., PARQUET).
2. STAGING IDENTIFICATION: Identify 'CREATE TEMP TABLE' or 'CREATE TABLE' statements that serve as intermediate hops.
3. TARGET IDENTIFICATION: Identify the final table receiving data via 'INSERT INTO' or 'MERGE'.
4. TRANSFORMATION LOGIC: 
   - If you see a 'DELETE' followed by an 'INSERT' on the same table using a shared key, label the operation as "UPSERT".
   - If you see a 'TRUNCATE' followed by an 'INSERT', label it as "FULL_RELOAD".
5. ATTRIBUTE MAPPING: Capture join keys (e.g., sales_id) and filter conditions.

### OUTPUT JSON SCHEMA
Your output must be a valid JSON object following this structure:
{
  "script_metadata":{ "name": "string", "dialect": "Redshift/RSQL" },
  "lineage": [
    {
      "step_id": integer,
      "operation": "COPY|INSERT|DELETE|UPSERT|MAINTENANCE",
      "source": { "name": "string", "type": "S3_BUCKET|TABLE|VIEW" },
      "target": { "name": "string", "type": "TABLE|TEMP_TABLE" },
      "logic": "Detailed description of the SQL transformation",
      "impact_columns": ["list", "of", "keys"]
    }
  ]
}

### CONSTRAINTS
- Do not include administrative SQL like 'SET' or 'ECHO' in the lineage steps.
- Ensure the 'target' of one step correctly matches the 'source' of the subsequent step to maintain a continuous chain.
- Provide only the JSON output; do not include conversational filler.

"""

In [None]:
SYSTEM_PROMPT_MESSAGE = """
You are a Senior Data Architect specializing in Metadata Management and Collibra Data Governance. 
Your task is to parse RSQL (Amazon Redshift) ETL scripts and generate a technical data lineage map 
in a structured JSON format.

### OBJECTIVE
Analyze the provided SQL code to identify every movement of data. You must distinguish between external 
sources (S3), staging areas (Temp Tables), and final production targets.

### EXTRACTION RULES
1. SOURCE IDENTIFICATION: Look for 'COPY' commands to identify S3 buckets, manifests, and file formats (e.g., PARQUET).
2. STAGING IDENTIFICATION: Identify 'CREATE TEMP TABLE' or 'CREATE TABLE' statements that serve as intermediate hops.
3. TARGET IDENTIFICATION: Identify the final table receiving data via 'INSERT INTO' or 'MERGE'.
4. ATTRIBUTE MAPPING: Capture join keys (e.g., sales_id) and filter conditions.

### OUTPUT JSON SCHEMA
Your output must be a valid JSON object following this structure:
{{
  "script_metadata": {{ "name": "string", "dialect": "Redshift/RSQL" }},
  "lineage": [
    {{
      "step_id": "integer",
      "operation": "COPY|INSERT|DELETE|UPSERT|MAINTENANCE",
      "source": {{ "name": "string", "type": "S3_BUCKET|TABLE|VIEW" }},
      "target": {{ "name": "string", "type": "TABLE|TEMP_TABLE" }},
      "logic": "Detailed description of the SQL transformation",
      "impact_columns": ["list", "of", "keys"]
    }}
  ]
}}

### CONSTRAINTS
- Do not include administrative SQL like 'SET' or 'ECHO' in the lineage steps.
- Ensure the 'target' of one step correctly matches the 'source' of the subsequent step to maintain a continuous chain.
- Provide only the JSON output; do not include conversational filler.
"""

In [53]:
input_folder="sql_container/etl/*.sql"
rSqlFilenames = glob.glob(input_folder)
rSqlFiles={}
if not rSqlFilenames:
    print("No SQL files found.")
else:
    print(rSqlFilenames)
    for rSqlFilename in rSqlFilenames:
        name = Path(rSqlFilename).stem
        with open(rSqlFilename, "r", encoding="utf-8") as f:
            rSqlFiles[name.lower()] = f.read()


['sql_container/etl\\sample_etl_work.sql']


In [54]:
USER_PROMPT_MESSAGE = """
Analyze this SQL script and extract the lineage steps: \n\n {sql_code}
"""

In [55]:
output_json="sql_container/output/"
llm = ChatOpenAI(model="gpt-4o", temperature=0)
structured_llm = llm.with_structured_output(LineageFull)
prompt = ChatPromptTemplate.from_messages([
        ("system", SYSTEM_PROMPT_MESSAGE),
        ("user", USER_PROMPT_MESSAGE)
    ])
chain = prompt | structured_llm

for rSqlFile in rSqlFiles:
    lineage_data = chain.invoke({"sql_code": rSqlFiles[rSqlFile]})
    out_file=output_json+rSqlFile+".json"
    with open(out_file, 'w') as f:
        json.dump(lineage_data.dict(), f, indent=2)
    print(f"Lineage saved to {output_json}")
    
   

Lineage saved to sql_container/output/


C:\Users\Samrat\AppData\Local\Temp\ipykernel_16828\1254290724.py:14: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  json.dump(lineage_data.dict(), f, indent=2)


In [None]:
input_folder=""
output_json=""
output_image=""

# --- SECTION 1: Input Reference Handling ---
sql_files = [f for f in os.listdir(input_folder) if f.endswith('.sql')]
if not sql_files:
    print("No SQL files found.")
else:
    # For this example, we process the first SQL file found
    file_path = os.path.join(input_folder, sql_files[0])
    with open(file_path, 'r') as f:
        sql_content = f.read()

In [None]:
USER_PROMPT_MESSAGE = f"""
Analyze this SQL script and extract the lineage steps:


{sql_code}
"""

In [None]:
llm = ChatOpenAI(model="gpt-4o", temperature=0)
structured_llm = llm.with_structured_output(LineageFull)

prompt = ChatPromptTemplate.from_messages([
        ("system", SYSTEM_PROMPT_MESSAGE),
        ("user", USER_PROMPT_MESSAGE)
    ])