# Snowflake-Specific Workflows

This notebook demonstrates advanced Snowflake-specific workflows that showcase the unique value of the `langchain-snowflake` package.

## What You'll Learn

1. **Authentication Methods** - Compare all 4 authentication approaches
2. **Connection Troubleshooting** - Diagnose and fix connection issues
3. **Semantic Models & Views** - Use Cortex Analyst with your data models
4. **RAG with Cortex Search** - Retrieval-Augmented Generation with Snowflake data
5. **Async Patterns** - High-throughput async processing
6. **Batch Processing** - Efficient bulk operations

## Prerequisites

- Completed `getting_started.ipynb`
- Snowflake account with Cortex features enabled
- Semantic models or views in your Snowflake environment
- Cortex Search service set up

## 1. Authentication Methods Comparison

The `langchain-snowflake` package supports 4 authentication methods. Let's compare them:

In [None]:
import os

from langchain_snowflake import get_default_session

print("Authentication Methods Comparison")

# Method 1: Environment Variables (Most Flexible)
print("\n1 Environment Variables (create_session_from_env)")
print(" Uses SNOWFLAKE_* environment variables")
print(" Supports password, PAT, and key pair")
print(" Best for development and CI/CD")
print(" Easy to switch between auth methods")

# Method 2: Personal Access Token (Production Recommended)
print("\n2 Personal Access Token (create_session_from_pat)")
print(" Most secure for production")
print(" No password exposure")
print(" Easy token rotation")
print(" Requires PAT setup in Snowflake")

# Method 3: Key Pair (Enterprise)
print("\n3 Key Pair Authentication (create_session_from_key_pair)")
print(" Most secure (no shared secrets)")
print(" Supports private key rotation")
print(" Best for enterprise environments")
print(" Requires key pair generation and setup")

# Method 4: Connection String (Simple)
print("\n4 Connection String (create_session_from_connection_string)")
print(" Simple one-line setup")
print(" Compatible with existing workflows")
print(" Exposes credentials in string")
print(" Less secure for production")

print("\nSmart Default (get_default_session)")
print(" Tries all methods in priority order")
print(" Key Pair → PAT → Password → Connection String")
print(" Best for robust applications")

# Demonstrate the smart default
try:
    session = get_default_session()
    if session:
        print("\nSuccessfully connected using smart default!")
        result = session.sql(
            "SELECT CURRENT_USER() as user, CURRENT_WAREHOUSE() as warehouse"
        ).collect()
        print(f" User: {result[0]['USER']}")
        print(f" Warehouse: {result[0]['WAREHOUSE']}")
    else:
        print("\nNo valid authentication method found")
        print(" Please set up at least one authentication method")
except Exception as e:
    print(f"\nAuthentication failed: {e}")
    print(" See connection troubleshooting section below")

## 2. Semantic Models & Views with Cortex Analyst

**This is pure Snowflake gold!** Use Cortex Analyst with your semantic models and views for intelligent data analysis:

In [None]:
from langchain_snowflake import SnowflakeCortexAnalyst

# Use the session from authentication above
if "session" not in locals():
    session = get_default_session()

print(" Cortex Analyst with Semantic Models & Views")

# Option A: Using Semantic Model File
print("\nA) Using Semantic Model File")
print(" Semantic models provide rich context about your data relationships")

# Replace with your actual semantic model path
SEMANTIC_MODEL_FILE = "@your_stage.schema.path/your_model.yaml"

try:
    # Create analyst with semantic model
    analyst_with_model = SnowflakeCortexAnalyst(
        session=session,
        semantic_model_file=SEMANTIC_MODEL_FILE,
        use_rest_api=True,  # Recommended for semantic models
    )

    # Example queries with rich context
    model_queries = [
    "What are our top performing products by revenue this quarter?",
    "Show me customer retention trends by region",
    "Compare sales performance across different customer segments"
    ]

    print(f" Model file: {SEMANTIC_MODEL_FILE}")
    for query in model_queries:
        print(f"\n Query: {query}")
        try:
            result = analyst_with_model.run(query)
            print(f" SQL: {result}")
        except Exception as e:
            print(f" Error: {e}")

except Exception as e:
    print(f" Semantic model setup needed: {e}")
    print(" Create a semantic model in your Snowflake environment first")

# Option B: Using Semantic View
print("\nB) Using Semantic View")
print(" Semantic views are pre-built views with business logic")

# Replace with your actual semantic view name
SEMANTIC_VIEW = "your_database.your_schema.your_semantic_view"

try:
    # Create analyst with semantic view
    analyst_with_view = SnowflakeCortexAnalyst(
        session=session, semantic_view=SEMANTIC_VIEW, use_rest_api=True
    )

    view_queries = [
    "Show me the latest KPI dashboard metrics",
    "What are the current trends in our main business metrics?",
    "Generate a summary report of this month's performance"
    ]

    print(f" View: {SEMANTIC_VIEW}")
    for query in view_queries:
        print(f"\n Query: {query}")
        try:
            result = analyst_with_view.run(query)
            print(f" SQL: {result}")
        except Exception as e:
            print(f" Error: {e}")

except Exception as e:
    print(f" Semantic view setup needed: {e}")
    print(" Create a semantic view in your Snowflake environment first")

# Option C: Standard Analyst (for comparison)
print("\nC) Standard Analyst (No Semantic Context)")
print(" Basic text-to-SQL without semantic understanding")

standard_analyst = SnowflakeCortexAnalyst(session=session)

standard_query = "Count total records in the customers table"
print(f"\n Query: {standard_query}")
try:
    result = standard_analyst.run(standard_query)
    print(f" SQL: {result}")
except Exception as e:
    print(f" Error: {e}")

## 3. RAG with Cortex Search

Retrieval-Augmented Generation using Snowflake's native Cortex Search:

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_snowflake import ChatSnowflake, SnowflakeCortexSearchRetriever

print("RAG with Cortex Search")

# Replace with your actual Cortex Search service
CORTEX_SEARCH_SERVICE = "your_cortex_search_service"

try:
    # Initialize retriever with auto-formatting for RAG
    retriever = SnowflakeCortexSearchRetriever(
        session=session,
        service_name=CORTEX_SEARCH_SERVICE,
        k=5,  # Number of documents to retrieve
        auto_format_for_rag=True,  # Automatic document formatting
    )

    # Initialize chat model
    llm = ChatSnowflake(
        session=session, model="claude-3-5-sonnet", temperature=0.1, max_tokens=1000
    )

    # Create RAG prompt template
    rag_prompt = ChatPromptTemplate.from_template("""
    Answer the question based on the following context from Snowflake Cortex Search:

    Context:
    {context}

    Question: {question}

    Provide a comprehensive answer based on the retrieved context. If the context doesn't contain enough information, say so clearly.
    """)

    # Build RAG chain
    rag_chain = (
        {"context": retriever, "question": RunnablePassthrough()} | rag_prompt | llm
    )

    # Test queries
    test_queries = [
    "What are the main features of our product?",
    "How do customers typically use our service?",
    "What are the most common customer questions?"
    ]

    print(f"Search Service: {CORTEX_SEARCH_SERVICE}")
    print("Auto-formatting: Enabled")
    print(f" LLM: {llm.model}")

    for query in test_queries:
        print("\n" + "=" * 50)
        print(f"Question: {query}")

        try:
            # Test retrieval only
            print("\nRetrieved Documents:")
            docs = retriever.get_relevant_documents(query)
            print(f" Found {len(docs)} documents")
            for i, doc in enumerate(docs[:2]):  # Show first 2
                print(f" Doc {i + 1}: {doc.page_content[:100]}...")

            # Test full RAG
            print("\n RAG Response:")
            response = rag_chain.invoke(query)
            print(f" {response.content}")

        except Exception as e:
            print(f" Error: {e}")

    print("\nRAG pipeline working!")

except Exception as e:
    print(f"RAG setup needed: {e}")
    print("\nSetup Instructions:")
    print(" 1. Create a Cortex Search service in Snowflake")
    print(" 2. Load documents into your search service")
    print(" 3. Update CORTEX_SEARCH_SERVICE variable above")
    print(" 4. See Snowflake docs for Cortex Search setup")

## 4. Async Patterns for High Throughput

Demonstrate native async patterns with `aiohttp` and `collect_nowait()`:

In [None]:
import asyncio
import time

from langchain_snowflake import ChatSnowflake, CortexSentimentTool, CortexSummarizerTool

print("Async Patterns for High Throughput")

# Initialize async-capable components
async_llm = ChatSnowflake(
    session=session, model="claude-3-5-sonnet", temperature=0.1, max_tokens=200
)

sentiment_tool = CortexSentimentTool(session=session)
summarizer_tool = CortexSummarizerTool(session=session)


async def demonstrate_async_patterns():
    """Demonstrate various async patterns."""

    print("\nPattern 1: Concurrent Chat Completions")
    print(" Multiple LLM calls in parallel using aiohttp")

    questions = [
        "What is Snowflake?",
        "Explain cloud computing",
        "What is machine learning?",
        "Define data warehousing",
        "What is ETL?",
    ]

    # Sync approach (for comparison)
    start_time = time.time()
    sync_results = []
    for question in questions:
        result = async_llm.invoke(question)
        sync_results.append(result.content[:50] + "...")
        sync_time = time.time() - start_time

    print(f" Sync time: {sync_time:.2f}s")

    # Async approach
    start_time = time.time()
    async_tasks = [async_llm.ainvoke(question) for question in questions]
    async_results = await asyncio.gather(*async_tasks)
    async_time = time.time() - start_time

    print(f" Async time: {async_time:.2f}s")
    print(f" Speedup: {sync_time / async_time:.1f}x faster")

    print("\nPattern 2: Concurrent Tool Execution")
    print(" Multiple Cortex tools in parallel")

    texts = [
        "This product is absolutely amazing!",
        "The service was disappointing and slow.",
        "Pretty good overall, could be improved.",
        "Excellent customer support experience.",
        "Worst purchase I've ever made.",
    ]

    # Async tool execution
    start_time = time.time()
    sentiment_tasks = [sentiment_tool._arun(text) for text in texts]
    sentiment_results = await asyncio.gather(*sentiment_tasks)
    async_tool_time = time.time() - start_time

    print(f" Processed {len(texts)} sentiments in {async_tool_time:.2f}s")
    for i, (text, sentiment) in enumerate(zip(texts, sentiment_results)):
        print(f' {i + 1}. "{text[:30]}..." → {sentiment}')

    print("\nPattern 3: Mixed Async Operations")
    print(" Combining LLM calls and tool execution")

    # Parallel summarization and sentiment analysis
    long_text = "Snowflake is a cloud-based data platform that revolutionizes how organizations store, process, and analyze data. With its unique architecture that separates storage and compute, Snowflake provides unprecedented flexibility and scalability."

    start_time = time.time()
    summary_task = summarizer_tool._arun(long_text)
    sentiment_task = sentiment_tool._arun(long_text)
    llm_analysis_task = async_llm.ainvoke(
        f"Analyze this text in 1 sentence: {long_text}"
    )

    summary, sentiment, llm_analysis = await asyncio.gather(
        summary_task, sentiment_task, llm_analysis_task
    )

    mixed_time = time.time() - start_time

    print(f" Mixed operations completed in {mixed_time:.2f}s")
    print(f" Summary: {summary}")
    print(f" Sentiment: {sentiment}")
    print(f" LLM Analysis: {llm_analysis.content}")


# Run async demonstration
try:
    await demonstrate_async_patterns()
    print("\nAsync patterns working!")
except Exception as e:
    print(f"\nAsync error: {e}")

## 5. Batch Processing Examples

Efficient bulk operations for production workloads:

In [None]:
from typing import List

from langchain_snowflake import (
    CortexSentimentTool,
    CortexSummarizerTool,
    CortexTranslatorTool,
)


async def batch_sentiment_analysis(texts: List[str], batch_size: int = 10):
    """Process large batches of text for sentiment analysis."""
    sentiment_tool = CortexSentimentTool(session=session)

    print(f"\nBatch Sentiment Analysis ({len(texts)} texts)")
    print(f" Batch size: {batch_size}")
    results = []
    start_time = time.time()

    # Process in batches to avoid overwhelming the system
    for i in range(0, len(texts), batch_size):
        batch = texts[i : i + batch_size]
        print(
            f" Processing batch {i // batch_size + 1}/{(len(texts) - 1) // batch_size + 1}"
        )

        # Async processing within each batch
        batch_tasks = [sentiment_tool._arun(text) for text in batch]
        batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)

        results.extend(batch_results)

    total_time = time.time() - start_time
    successful = len([r for r in results if not isinstance(r, Exception)])

    print(f" Processed {successful}/{len(texts)} texts in {total_time:.2f}s")
    print(f" Rate: {successful / total_time:.1f} texts/second")

    return results


async def batch_document_processing(documents: List[str]):
    """Process documents with multiple Cortex functions."""
    sentiment_tool = CortexSentimentTool(session=session)
    summarizer_tool = CortexSummarizerTool(session=session)
    translator_tool = CortexTranslatorTool(session=session)

    print(f"\nMulti-Function Document Processing ({len(documents)} docs)")
    start_time = time.time()

    tasks = []
    for i, doc in enumerate(documents):
        doc_tasks = {
            "sentiment": sentiment_tool._arun(doc),
            "summary": summarizer_tool._arun(doc),
            "translation": translator_tool._arun(
                {"text": doc, "target_language": "es"}
            ),
        }
        tasks.append(doc_tasks)

    # Process each document with multiple functions in parallel
    all_sentiment_tasks = [task["sentiment"] for task in tasks]
    all_summary_tasks = [task["summary"] for task in tasks]
    all_translation_tasks = [task["translation"] for task in tasks]

    sentiments, summaries, translations = await asyncio.gather(
        asyncio.gather(*all_sentiment_tasks, return_exceptions=True),
        asyncio.gather(*all_summary_tasks, return_exceptions=True),
        asyncio.gather(*all_translation_tasks, return_exceptions=True),
    )

    total_time = time.time() - start_time
    total_operations = len(documents) * 3  # 3 operations per document

    print(f" Completed {total_operations} operations in {total_time:.2f}s")
    print(f" Rate: {total_operations / total_time:.1f} operations/second")

    # Show sample results
    for i, (sentiment, summary, translation) in enumerate(
        zip(sentiments[:2], summaries[:2], translations[:2])
    ):
        print(f"\n Document {i + 1}:")
        print(f" Original: {documents[i][:50]}...")
        if not isinstance(sentiment, Exception):
            print(f" Sentiment: {sentiment}")
        if not isinstance(summary, Exception):
            print(f" Summary: {summary}")
        if not isinstance(translation, Exception):
            print(f" Spanish: {translation}")

    return sentiments, summaries, translations


# Sample data for batch processing
sample_reviews = [
    "The product quality is outstanding and delivery was fast.",
    "Poor customer service, very disappointed with the experience.",
    "Good value for money, would recommend to others.",
    "The interface is confusing and hard to navigate.",
    "Excellent features and great user experience overall.",
    "Too expensive for what you get, not worth it.",
    "Fast shipping and product matches description perfectly.",
    "Customer support was helpful and resolved my issue quickly.",
    "The product broke after just one week of use.",
    "Amazing quality and exceeded my expectations completely.",
]

sample_documents = [
    "Snowflake's cloud data platform enables organizations to mobilize their data with Snowflake's Data Cloud. Our platform provides a unified experience across multiple public clouds, allowing customers to securely share and analyze data.",
    "Machine learning capabilities in Snowflake include Snowpark ML for data science workflows, automatic feature engineering, model training, and deployment. The platform supports popular ML frameworks and languages.",
    "Data governance in Snowflake ensures security, privacy, and compliance through features like role-based access control, data masking, and audit trails. Organizations can maintain control while enabling data sharing.",
]


async def run_batch_examples():
    """Run all batch processing examples."""
    # Example 1: Batch sentiment analysis
    await batch_sentiment_analysis(sample_reviews, batch_size=5)

    # Example 2: Multi-function document processing
    await batch_document_processing(sample_documents)


try:
    await run_batch_examples()
    print("\nBatch processing examples completed!")
except Exception as e:
    print(f"\nBatch processing error: {e}")