In [None]:
import streamlit as st
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSQLException
import json
import yaml

# Get the active session
session = get_active_session()

def fetch_columns(database: str, schema: str, table: str) -> list:
    query = f"""
        SELECT COLUMN_NAME, DATA_TYPE
        FROM {database}.INFORMATION_SCHEMA.COLUMNS
        WHERE TABLE_SCHEMA = '{schema.upper()}' AND TABLE_NAME = '{table.upper()}'
        ORDER BY ORDINAL_POSITION;
    """
    try:
        columns = session.sql(query).collect()
        return [row.as_dict() for row in columns]
    except SnowparkSQLException as e:
        st.error(f"Error fetching columns: {e.message}")
        return []

def generate_ai_description(column_name: str, data_type: str) -> dict:
    prompt = f"""
        You are a data analyst writing a semantic model for a table.
        Given a column named '{column_name}' with data type '{data_type}':
        1. Write a concise, one-sentence, business-friendly description.
        2. Suggest 2-3 common business synonyms.
        Return ONLY a JSON object with keys "description" and "synonyms".
    """
    try:
        # Notebooks can call Cortex SQL directly
        cortex_query = f"SELECT SNOWFLAKE.CORTEX.COMPLETE('mistral-large', {prompt!r})"
        response_str = session.sql(cortex_query).collect()[0][0]
        # JSON extraction in case model adds text around it
        json_str = response_str[response_str.find('{'):response_str.rfind('}')+1]
        return json.loads(json_str)
    except Exception:
        return {"description": f"Data for {column_name}", "synonyms": [column_name]}

# Generation - builds YAML string - reuses definitions if possible
def generate_semantic_model(database: str, schema: str, table: str, known_defs: dict = None) -> str:
    
    # Initialize empty dict if None passed
    if known_defs is None:
        known_defs = {}

    columns = fetch_columns(database, schema, table)
    if not columns: return ""

    dimensions = []
    measures = []
    
    st.write(f"Processing {len(columns)} columns...")
    
    # Progress bar for better UI experience
    my_bar = st.progress(0)

    for i, col in enumerate(columns):
        col_name = col["COLUMN_NAME"]
        col_type = col["DATA_TYPE"]
        
        # Check central table first
        if col_name in known_defs:
            # HIT: Use the saved definition
            ai_meta = known_defs[col_name]
        else:
            # MISS: Ask Cortex
            ai_meta = generate_ai_description(col_name, col_type)
        
        # Map snowflake type to semantic type
        semantic_type = col_type
        if any(x in col_type for x in ['VARCHAR', 'TEXT', 'CHAR']): semantic_type = 'TEXT'
        elif any(x in col_type for x in ['NUMBER', 'INT', 'FLOAT']): semantic_type = 'NUMBER'
        elif 'DATE' in col_type: semantic_type = 'DATE'
        elif 'TIMESTAMP' in col_type: semantic_type = 'TIMESTAMP'
        elif 'BOOLEAN' in col_type: semantic_type = 'BOOLEAN'

        entry = {
            'name': col_name,
            'expr': col_name,
            'data_type': semantic_type,
            'description': ai_meta.get('description'),
            'synonyms': ai_meta.get('synonyms', [])
        }
        
        # Classify dimension vs. measure
        if col_type in ('NUMBER', 'FLOAT', 'DECIMAL', 'INT', 'DOUBLE'):
            measures.append(entry)
        else:
            dimensions.append(entry)
            
        # Update progress
        my_bar.progress((i + 1) / len(columns))

    # Build structure
    model_dict = {
        "name": table,
        "description": f"Semantic model for {table}",
        "tables": [{
            "name": table,
            "base_table": {
                "database": database,
                "schema": schema,
                "table": table
            },
            "dimensions": dimensions,
            "measures": measures
        }]
    }
    
    return yaml.dump(model_dict, sort_keys=False, default_flow_style=False)

CENTRAL SEMANTIC DATABASE

In [None]:
import json
import yaml

# Fetch existing definitions
def get_known_attributes(column_names):
    if not column_names:
        return {}
    
    # Format list for SQL IN clause
    formatted_cols = ", ".join([f"'{c.upper()}'" for c in column_names])
    
    try:
        # Query the repository
        df = session.sql(f"""
            SELECT ATTRIBUTE_NAME, DESCRIPTION, SYNONYMS 
            FROM CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.SEMANTIC_ATTRIBUTE_REPOSITORY 
            WHERE ATTRIBUTE_NAME IN ({formatted_cols})
        """).collect()
        
        # Convert to dictionary for easy lookup
        known_defs = {}
        for row in df:
            known_defs[row['ATTRIBUTE_NAME']] = {
                "description": row['DESCRIPTION'],
                "synonyms": json.loads(row['SYNONYMS']) if row['SYNONYMS'] else []
            }
        return known_defs
    except Exception as e:
        return {}

# Save/Upsert new definitions
def save_attributes_to_repo(yaml_content):
    try:
        data = yaml.safe_load(yaml_content)
        fields_to_upsert = []
        
        # Extract columns/measures from standard Cortex YAML structure
        if 'tables' in data:
            for table in data['tables']:
                # Process columns/dimensions
                if 'columns' in table: 
                    for col in table['columns']:
                        name = col.get('name')
                        desc = col.get('description', '')
                        synonyms = col.get('synonyms', [])
                        # Only save if we have a valid description
                        if name and desc:
                            fields_to_upsert.append((name, desc, synonyms))
                            
                # Process measures
                if 'measures' in table:
                    for meas in table['measures']:
                        name = meas.get('name')
                        desc = meas.get('description', '')
                        synonyms = meas.get('synonyms', [])
                        if name and desc:
                            fields_to_upsert.append((name, desc, synonyms))

        # Perform Merge (Upsert) into snowflake table
        if fields_to_upsert:
            # Create a temporary dataframe
            input_df = session.create_dataframe(fields_to_upsert, schema=["NAME", "DESC", "SYN"])
            input_df.create_or_replace_temp_view("TEMP_NEW_DEFS")
            
            # Merge logic
            session.sql("""
                MERGE INTO CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.SEMANTIC_ATTRIBUTE_REPOSITORY AS target
                USING TEMP_NEW_DEFS AS source
                ON target.ATTRIBUTE_NAME = source.NAME
                WHEN MATCHED THEN UPDATE SET
                    target.DESCRIPTION = source.DESC,
                    target.SYNONYMS = source.SYN,
                    target.LAST_UPDATED = CURRENT_TIMESTAMP()
                WHEN NOT MATCHED THEN INSERT
                    (ATTRIBUTE_NAME, DESCRIPTION, SYNONYMS, LAST_UPDATED)
                    VALUES (source.NAME, source.DESC, source.SYN, CURRENT_TIMESTAMP())
            """).collect()
            
            return len(fields_to_upsert)
    except Exception as e:
        print(f"Repo update failed: {e}")
        return 0

SAVE FUNCTION AND UI

In [None]:
import streamlit as st
import yaml

# Initialize Session State
if "yaml_content" not in st.session_state:
    st.session_state.yaml_content = ""

# Notebook UI
st.title("Semantic Layer Manager")

# Select Data
col1, col2 = st.columns([1, 1.5])

# Selection & Generation
with col1:
    try:
        # Database Selection
        db_list = [r["name"] for r in session.sql("SHOW DATABASES").collect()]
        default_db = "CORTEX_ANALYST_DEMO" if "CORTEX_ANALYST_DEMO" in db_list else db_list[0]
        selected_db = st.selectbox("Database", db_list, index=db_list.index(default_db) if default_db in db_list else 0)
        
        # Schema Selection
        schema_list = [r["name"] for r in session.sql(f"SHOW SCHEMAS IN DATABASE {selected_db}").collect()]
        default_schema = "REVENUE_TIMESERIES" if "REVENUE_TIMESERIES" in schema_list else schema_list[0]
        selected_schema = st.selectbox("Schema", schema_list, index=schema_list.index(default_schema) if default_schema in schema_list else 0)
        
        # Table Selection
        table_list = [r["name"] for r in session.sql(f"SHOW TABLES IN SCHEMA {selected_db}.{selected_schema}").collect()]
        selected_table = st.selectbox("Table", table_list)
        
    except Exception as e:
        st.error(f"Error loading metadata: {e}")

    # Generate Button
    if st.button("Generate Model"):
        with st.spinner("Checking repository and generating..."):
            try:
                # Get current table columns to check against repo
                desc_df = session.sql(f"DESC TABLE {selected_db}.{selected_schema}.{selected_table}").collect()
                current_columns = [r["name"] for r in desc_df]

                # Check Repo for existing definitions
                known_defs = get_known_attributes(current_columns)
                
                if known_defs:
                    st.info(f"Found {len(known_defs)} existing definitions. Reusing them.")

                # Generate Model
                st.session_state.yaml_content = generate_semantic_model(
                    selected_db, 
                    selected_schema, 
                    selected_table,
                    known_defs=known_defs
                )
            except Exception as e:
                st.error(f"Generation failed: {e}")

# Review & Save
with col2:
    st.subheader("2. Review & Save")
    
    # Editor
    edited_yaml = st.text_area("YAML Preview", value=st.session_state.yaml_content, height=500)
    
    st.markdown("---")
    
    # Save Options
    st.write("### Save as Semantic View")
    
    # View Name
    view_name = st.text_input("Semantic View Name", value=f"{selected_table}_SEMANTIC_VIEW")

    if st.button("Create Semantic View"):
        if not edited_yaml:
            st.warning("Generate a model first!")
        else:
            try:
                # Define the target Schema (DB.SCHEMA only)
                target_schema = f"{selected_db}.{selected_schema}"
                
                # Sync the "View Name" input with the YAML content
                yaml_obj = yaml.safe_load(edited_yaml)
                yaml_obj['name'] = view_name
                final_yaml = yaml.dump(yaml_obj, sort_keys=False)

                st.info(f"Creating Semantic View '{view_name}' in schema '{target_schema}'...")
                
                # Call the procedure
                session.call(
                    "SYSTEM$CREATE_SEMANTIC_VIEW_FROM_YAML",
                    target_schema,
                    final_yaml
                )
                
                # Save definitions back to repository
                count = save_attributes_to_repo(final_yaml)
                
                st.success(f"Semantic View Created: {target_schema}.{view_name}")
                if count > 0:
                    st.success(f"Repository updated with {count} field definitions.")
                st.write("You can now find this view in Cortex Analyst.")
                    
            except Exception as e:
                st.error(f"Failed to create view: {e}")

Display the updates / current central database

In [None]:
# Show the most recently updated attributes first
session.sql("""
    SELECT ATTRIBUTE_NAME, DESCRIPTION, LAST_UPDATED 
    FROM CORTEX_ANALYST_DEMO.REVENUE_TIMESERIES.SEMANTIC_ATTRIBUTE_REPOSITORY
    ORDER BY LAST_UPDATED DESC
    LIMIT 20
""").show()