## What You'll Learn Today

Welcome to ArenaFlow Intelligence Hub!  Throughout this lab you'll see how easy it is to integrate Cortex AI into Snowflake's core data engineering features to batch process unstructured data.

We'll highlight several Snowflake features to throughout this lab:
- [Snowflake Cortex](https://docs.snowflake.com/en/user-guide/snowflake-cortex/llm-functions)
    - Task Specific Functions for analyzing sentiment and summarization
    - Cortex Complete to generate a prompt and response 
- [Snowflake Notebooks on Container Runtime](https://docs.snowflake.com/en/developer-guide/snowflake-ml/notebooks-on-spcs)
    - Execute the notebook on a GPU powered compute pool
    - Leverage a pre-built image containing hundreds of common Python packages
    - Create an external access integration to install additional 3rd party Python packages
- [Triggered tasks](https://docs.snowflake.com/en/user-guide/tasks-intro#triggered-tasks)
    - Process data based on specific data change events
- [Dynamic Tables](https://docs.snowflake.com/en/user-guide/dynamic-tables-intro)
    - Incrementally process changed data
- [Cortex Analyst](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-analyst)
    - Create a chat applications for a "talk to your data" experience
- [Streamlit in Snowflake](https://docs.snowflake.com/en/developer-guide/streamlit/about-streamlit)
    - Host your Cortex Analyst chat app directly within Snowsight

## Lab Setting
Picture this: a packed stadium, fans buzzing, the air thick with excitement and every second, tweets are flying about the game, the food, the vibe. The ArenaFlow Intelligence Hub grabs that chaos and turns it into gold. This platform’s your real-time window into what’s happening, analyzing fan tweets on the fly to tell you what’s hot, what’s not, and where to step up your game, all refreshed every minute.

Snowflake’s Cortex AI functions are the MVPs here. They’re like your personal opportunity scoping crew, decoding the mood of every tweet, summing up the chatter, and flagging trends faster than you can say “touchdown.” Stadium bosses get the scoop to act quick and send staff to a jammed concession or amplify a player’s epic moment to keep the crowd roaring. It’s your sixth sense for the stadium, turning raw data into a live playbook for better experiences, smoother ops, and smarter moves.

In [None]:
!pip install fake_profile

In [None]:
# standard python packages
import json
import random
from pprint import pprint
from datetime import datetime, timezone
from typing import Union


# package for generating fake tweets
from fake_profile import Xprofile

# parallel execution
from concurrent.futures import ThreadPoolExecutor
import asyncio

# snowflake packages
from snowflake.snowpark.context import get_active_session

In [None]:
# establish a session
session = get_active_session()

# print the current session context
print(f"""
SESSION CONTEXT
  Account:   {session.get_current_account()}
  Role:      {session.get_current_role()}
  Database:  {session.get_current_database()}
  Schema:    {session.get_current_schema()}
  Warehouse: {session.get_current_warehouse()}
"""
)

## Create Tweet Tables

We'll create two tables for storing fan tweets
- `raw_tweets`: a landing table for raw, simulated tweets
- `enriched_tweets`: a table for tweets which have been enriched with various Cortex AI functions

In [None]:
CREATE OR REPLACE TABLE RAW_TWEETS (
    TWEET_ID STRING,
    TWEET_TEXT STRING,
    USER_HANDLE STRING,
    USER_LOCATION STRING,
    RETWEET_COUNT INTEGER,
    LIKE_COUNT INTEGER,
    REPLY_COUNT INTEGER,
    QUOTE_COUNT INTEGER,
    FOLLOWER_COUNT INTEGER,
    VERIFIED_STATUS BOOLEAN,
    COMMENT_COUNT INTEGER,
    GAME_MINUTE INTEGER,
    CREATED_AT TIMESTAMP_NTZ
);

In [None]:
CREATE OR REPLACE TABLE ENRICHED_TWEETS (
    tweet_id VARCHAR(255) PRIMARY KEY,
    tweet_text VARCHAR(1000),
    user_handle VARCHAR(50),
    user_location VARCHAR(100),
    retweet_count INTEGER,
    like_count INTEGER,
    reply_count INTEGER,
    quote_count INTEGER,
    follower_count INTEGER,
    verified_status BOOLEAN,
    comment_count INTEGER,
    game_minute INTEGER,
    created_at TIMESTAMP_NTZ,
    sentiment_output FLOAT,
    summary_output VARCHAR,
    complete_response OBJECT,
    processed_at TIMESTAMP_NTZ,
    complete_output_array ARRAY,
    complete_output_string VARCHAR
)
;

## Generating Context-Specific Tweets with Cortex AI

We'll use the [SNOWFLAKE.CORTEX.COMPLETE](https://docs.snowflake.com/en/sql-reference/functions/complete-snowflake-cortex) to build contex-specific tweets, in this case tweets about an NBA game.

_Why this matters_:
- Gathering enough data to build enterprise-ready applications can be challenging
- More and more companies have turned to AI to generate synthetic data to train other AI applications
- Synthetic data is crucial for AI applications because it provides a cost-effective and privacy-preserving way to generate large, diverse datasets, overcoming limitations of scarce or sensitive real-world data

With Snowflake, you can generate synthetic data using Cortex and also the native [GENERATE_SYNTHETIC_DATA](https://docs.snowflake.com/en/user-guide/synthetic-data) function

In [None]:
EMOTIONS = [
    "excited", "upset", "nervous", "thrilled", "frustrated", "ecstatic",
    "disappointed", "hyped", "stunned", "pumped", "heartbroken", "awed",
    "anxious", "elated", "furious", "relieved", "nostalgic", "impressed",
    "bored", "hopeful", "shocked", "overjoyed", "tense", "proud",
    "exhausted", "confused", "amused", "enraged", "content", "skeptical",
    "inspired", "devastated", "grateful", "anticipating", "smug", "awed"
]
PLAYERS = [
    "Rookie Jaden Cole, crossover king",
    "Captain Maya Reed, sharpshooting star",
    "Center Amari Blake, rim protector",
    "Point Guard Kai Nguyen, playmaking maestro",
    "Forward Zoe Hunter, dunk machine",
    "Sixth Man Riley Patel, clutch shooter",
    "Veteran Lena Brooks, defensive anchor",
    "Sharpshooter Alex Chen, three-point specialist",
    "Defensive Stopper Taylor Green, lockdown artist",
    "Bench Sparkplug Sam Rivera, energy booster",
    "Veteran Playmaker Jordan Lee, court general",
    "High-Flying Forward Casey Thompson, above-the-rim threat",
    "Gritty Rebounder Morgan White, glass cleaner",
    "Crafty Ballhandler Avery Kim, assist machine",
    "Towering Center Drew Wilson, paint patroller",
    "Speedy Guard Quinn Parker, transition terror",
    "Stretch Four Jamie Ortiz, floor spacer",
    "Hustle King Devon Miles, effort machine",
    "Rookie Sensation Ellie Ford, fearless finisher",
    "Fan Favorite Chris Lowe, trick-shot artist",
    "Backup Big Tatum Hayes, post-up powerhouse",
    "Perimeter Threat Skylar Dean, catch-and-shoot ace",
    "Tenacious Wing Blair Evans, all-around grinder",
    "Undersized Scrapper Nick Soto, heart and soul",
    "Sophomore Standout Liam Gray, rising star",
    "Silent Assassin Peyton Shaw, quiet killer",
    "Veteran Enforcer Reese Carter, physical force",
    "Dynamic Duo Leader Harper Lane, team glue"
]
ACTIONS = [
    "nails a deep three-pointer",
    "throws down a monster slam dunk",
    "dishes a no-look pass",
    "swats a shot into the stands",
    "crosses over two defenders",
    "hits a fadeaway jumper",
    "steals the ball and sprints for a layup",
    "banks in a buzzer-beater",
    "executes a perfect pick-and-roll",
    "threads a needle with a behind-the-back pass",
    "posterizes a defender with a dunk",
    "drains a contested fadeaway",
    "locks down the opponent's star player",
    "grabs a crucial offensive rebound",
    "draws a charge with savvy positioning",
    "hits a step-back jumper over tight defense",
    "flips a one-handed pass to the corner",
    "skies for a putback slam",
    "fakes out a defender with a hesitation",
    "snags a tipped pass for a fast break",
    "buries a pull-up jumper in transition",
    "denies a drive with a perfectly timed block",
    "spins baseline for a reverse layup",
    "sinks a floater over outstretched arms",
    "whips a cross-court pass for an open three",
    "elevates for a tomahawk dunk",
    "shuts down a pick with quick feet",
    "splashes a midrange shot off the dribble",
    "secures a loose ball in traffic",
    "finishes a tough and-one at the rim",
    "strips the ball on a double-team",
    "rises for a game-sealing block"
]
VENDORS = [
    "Burger Bonanza, with sizzling burgers",
    "Pizza Pavilion, wood-fired garlic knots",
    "Nacho Nation, spicy loaded nachos",
    "Sweet Spot, cinnamon funnel cakes",
    "Popcorn Palace, buttery buckets",
    "Slam Dunk Dogs, gourmet hot dogs",
    "Hoop Gear Hub, team jerseys and caps"
]
SECTIONS = [
    "West Concourse, rocking live music",
    "East Plaza, buzzing with fan games",
    "Sky Deck, premium rooftop views",
    "South Bleachers, rowdy chant leaders",
    "North Stands, family-friendly fun",
    "Courtside VIP, star-studded seats",
    "Fan Zone, interactive dunk contests"
]
GAME_MOMENTS = [
    "a game-tying three at the buzzer",
    "a ferocious alley-oop dunk",
    "a controversial foul call",
    "a clutch steal in crunch time",
    "a dramatic overtime winner",
    "a comeback from 15 points down",
    "a no-look assist for an easy layup",
    "a block that shakes the arena",
    "a full-court press that forces a turnover",
    "a fast-break dunk that ignites the crowd",
    "a technical foul for arguing with the ref",
    "a record-breaking scoring streak",
    "a momentum-shifting block",
    "a half-court shot that swishes through",
    "a flagrant foul that sparks controversy",
    "a timeout called to ice the free-throw shooter",
    "a buzzer-beating tip-in to win",
    "a double-overtime nail-biter",
    "a rookie’s breakout scoring run",
    "a defensive stand that holds the lead",
    "a three-pointer that breaks the franchise record",
    "a steal followed by a behind-the-back dunk",
    "a missed call that has the crowd roaring",
    "a perfect inbounds play for the win",
    "a shot clock violation that shifts momentum",
    "a player ejection after a heated exchange",
    "a fan-favorite hits a milestone",
    "a last-second block to secure the victory",
    "a crossover that sends the defender sliding",
    "a putback dunk off a missed free throw",
    "a referee review that overturns a call",
    "a celebration dunk that draws a penalty"
]
REASONS = [
    "reacting to an insane play",
    "raving about the food",
    "complaining about long lines",
    "soaking in the electric vibe",
    "cheering a clutch moment",
    "marveling at the halftime show",
    "hyping the crowd’s energy",
    "gushing over a player’s hustle",
    "celebrating a personal best",
    "critiquing the coach's strategy",
    "admiring the team's chemistry",
    "lamenting a missed opportunity",
    "praising the arena's acoustics",
    "commenting on the officiating",
    "sharing a funny moment from the stands",
    "expressing disbelief at a stat line",
    "bragging about courtside seats",
    "mocking the rival team’s fans",
    "noticing a celebrity in the crowd",
    "loving the pregame light show",
    "groaning at the parking chaos",
    "cheering a teammate’s assist",
    "shouting about a bad call",
    "toasting to a big lead",
    "capturing the jumbotron dance cam",
    "boasting about the team’s defense",
    "reminiscing about past games",
    "geeking out over player stats",
    "thanking the mascot for the laughs",
    "envying the VIP lounge perks",
    "ranting about the shot clock reset",
    "singing along to the arena anthem"
]
BUSINESS_REASONS = [
    "raving about the food",
    "complaining about long lines",
    "bragging about courtside seats",
    "envying the VIP lounge perks",
    "groaning at the parking chaos",
    "marveling at the halftime show",
    "loving the pregame light show",
    "capturing the jumbotron dance cam",
    "thanking the mascot for the laughs"
]

In [None]:
def rewrite_text(
    text:str,
    emotions:list[str]=EMOTIONS,
    players:list[str]=PLAYERS,
    actions:list[str]=ACTIONS,
    vendors:list[str]=VENDORS,
    sections:list[str]=SECTIONS,
    game_moments:list[str]=GAME_MOMENTS,
    business_reasons:list[str]=BUSINESS_REASONS,
    reasons:list[str]=REASONS,
    is_comment:bool=False, 
    game_minute:int=random.randint(0, 65)
) -> str:

    context = "comment" if is_comment else "tweet"
    escaped_text = text.replace('"', '\\"')

    time_periods = [
        "early in the first quarter" if game_minute <= 12 else
        "late in the first quarter" if game_minute <= 24 else
        "second quarter" if game_minute <= 36 else
        "third quarter" if game_minute <= 48 else
        "fourth quarter" if game_minute <= 60 else
        "overtime" if game_minute <= 65 else
        "final moments"
    ]

    # make random selections from predefined lists
    selected_emotion = random.choice(emotions)
    selected_player = random.choice(players)
    selected_action = random.choice(actions)
    selected_vendor = random.choice(vendors)
    selected_section = random.choice(sections)
    selected_moment = random.choice(game_moments)
    
    business_focus = random.random() < 0.7
    if business_focus:
        selected_reason = random.choice(business_reasons)
        focus_instruction = "Particularly focus on aspects that could help the business make decisions to drive revenue, such as customer satisfaction with vendors, lines, or stadium amenities."
    else:
        selected_reason = random.choice(reasons)
        focus_instruction = ""
    
    selected_time = time_periods[0]

    # create a prompt to generate a simulated tweet
    prompt_content = f"""
    Rewrite this {context} to fit a basketball game day at ArenaFlow stadium, creating a vivid and engaging tweet that captures the unique atmosphere. {focus_instruction}
    Focus on a combination of the following elements, choosing at least two to highlight:
    - **Player and action**: Feature {selected_player} who {selected_action}.
    - **Vendor and offering**: Include {selected_vendor}.
    - **Stadium section**: Reference {selected_section}.
    - **Fan reaction**: Capture a {selected_emotion} fan {selected_reason}.
    - **Game moment**: Highlight {selected_moment} {selected_time}.
    
    To ensure variety and freshness:
    - Use diverse vocabulary and sentence structures, e.g., 'the arena erupts like a volcano' instead of 'the crowd goes wild.'
    - Incorporate sensory details, like the squeak of sneakers or the aroma of popcorn.
    - Vary the tone: excited, humorous, poetic, or analytical.
    - Keep the tweet under 280 characters, including spaces and punctuation.
    
    Do not quote or mimic the original text. Original {context}: "{escaped_text}".
    Return the rewritten text as a plain string.
    """
    
    try:
        query = f"""
        SELECT
          SNOWFLAKE.CORTEX.COMPLETE(
            'mistral-large2',
            [
              {{
                'role': 'user',
                'content': '{prompt_content.replace("'", "''")}'
              }}
            ],
            {{
              'temperature': 0.7,
              'max_tokens': 500
            }}
          ) AS rewritten_text
        """
        
        result = session.sql(query).collect()
        
        if not result:
            raise Exception(f"No response from SNOWFLAKE.CORTEX.COMPLETE for {context}")
        
        response = result[0]["REWRITTEN_TEXT"]
        try:
            response_dict = json.loads(response)
            if "choices" in response_dict and response_dict["choices"]:
                rewritten_text = response_dict["choices"][0]["messages"]
                if rewritten_text.startswith('"') and rewritten_text.endswith('"'):
                    rewritten_text = rewritten_text[1:-1]
            else:
                raise Exception(f"Unexpected JSON response format: {response}")
        except json.JSONDecodeError:
            rewritten_text = response
        
        return rewritten_text
    
    except Exception as e:
        raise Exception(f"Cortex Complete failed for {context}: {str(e)}")

## Simulating a single tweet

In [None]:
generator = Xprofile()
users, tweets, comments, likes, retweets = generator.generate_fake_twitter_data(user_count=1, tweets_per_user=1)
print('USERS')
pprint(users[0], indent=4)
print('TWEETSW')
pprint(tweets[0], indent=4)
print('COMMENTS')
pprint(comments[0], indent=4)
print('LIKES')
pprint(likes[0], indent=4)
print('RETWEETS')
pprint(retweets[0], indent=4)

In [None]:
test_tweet = tweets[0]["text"]
augmented_tweet = rewrite_text(test_tweet, game_minute = 10)

print(f"""
RAW TWEET:
{test_tweet}
 
SIMULATED TWEET:
{augmented_tweet}
""")

## Simulating multiple tweets

Now that simulating a single tweet looks good, we'll build out a framework for simulating multiple tweets

In [None]:
def process_tweet(tweet:dict, users:dict, game_minute:int) -> Union[None, dict]:
    
    print(f"Processing tweet: {tweet['tweet_id']}")
    
    try:
        tweet_rewritten = rewrite_text(tweet['text'], is_comment=False, game_minute=game_minute)
        
        print(f"Rewritten tweet: {tweet_rewritten}")
        
        user = next((u for u in users if u["user_id"] == tweet["user_id"]), None)
        if not user:
            return None

        # Simplify scaling with single factor, no caps
        scale_factor = random.uniform(0.5, 1.5)  # 50%–150% of original
        retweet_count = max(0, int(tweet["retweet_count"] * scale_factor))
        like_count = max(0, int(tweet["like_count"] * scale_factor))
        reply_count = max(0, int(tweet["reply_count"] * scale_factor))
        quote_count = max(0, int(tweet["quote_count"] * scale_factor))
        follower_count = max(0, int(user["follower_count"] * scale_factor))

        tweet_data = {
            "TWEET_ID": f"TWEET_{tweet['tweet_id']}",
            "TWEET_TEXT": tweet_rewritten,
            "USER_HANDLE": f"@{user['username']}",
            "USER_LOCATION": user["location"],
            "RETWEET_COUNT": retweet_count,
            "LIKE_COUNT": like_count,
            "REPLY_COUNT": reply_count,
            "QUOTE_COUNT": quote_count,
            "FOLLOWER_COUNT": follower_count,
            "VERIFIED_STATUS": user["verified_status"],
            "COMMENT_COUNT": 0,
            "GAME_MINUTE": game_minute,
            "CREATED_AT": datetime.now().replace(tzinfo=None)  # Convert to TIMESTAMP_NTZ
        }
        
        return tweet_data
        
    except Exception as e:
        print(f"Error processing tweet {tweet['tweet_id']}: {str(e)}")
        return None

In [None]:
num_tweets = 5
generator = Xprofile()
users, tweets, comments, likes, retweets = generator.generate_fake_twitter_data(user_count=num_tweets, tweets_per_user=1)

# Process only the first n tweets
raw_tweets = tweets[:num_tweets]
processed_tweets = []


# iterate through raw tweets, processing each with Cortex Complete
for tweet in raw_tweets:

    # simulate tweet with Cortex Complete
    processed_tweet = process_tweet(tweet, users, game_minute = random.randint(0, 65))

    # append to list of processed tweets
    processed_tweets.append(processed_tweet)


# Use Snowpark DataFrame to save simulated tweets to the RAW_TWEETS table
if processed_tweets:
    df_batch = session.create_dataframe(processed_tweets)
    df_batch.write.mode("append").save_as_table("RAW_TWEETS")

In [None]:
SELECT * 
FROM RAW_TWEETS 
ORDER BY tweet_id;

## Unlocking Insights with Cortex AI-Powered Analysis

All Cortex functions can be called natively in SQL, allowing you to:
- Easily integrate LLM batch processing directly into your data engineering pipelines
- Execute Cortex AI functions entire tables of data all at once
- Apply existing [data governance](https://docs.snowflake.com/en/guides-overview-govern) policies to  Cortex AI (RBAC, tagging, masking, row-level security, etc.)


We'll use several Cortex functions to begin batch processing our tweets:
- [SNOWFLAKE.CORTEX.SENTIMENT](https://docs.snowflake.com/en/sql-reference/functions/sentiment-snowflake-cortex): Returns an overall sentiment score for the given input text
- [SNOWFLAKE.CORTEX.SUMMARIZE](https://docs.snowflake.com/en/sql-reference/functions/summarize-snowflake-cortex): Summarizes the given input text
- [SNOWFLAKE.CORTEX.COMPLETE](https://docs.snowflake.com/en/sql-reference/functions/complete-snowflake-cortex): Given a prompt, generates a response (completion) using your choice of supported language model

In [None]:
SET summarize_prompt = 'Summarize the following tweet in 1-2 sentences, focusing on fan engagement, player highlights, or vendor experiences, and highlight specific business opportunities like improving operations, targeting promotions, or addressing customer pain points: ';

SELECT
    tweet_text as raw_tweet
    ,SNOWFLAKE.CORTEX.SUMMARIZE($summarize_prompt || tweet_text)::varchar AS summarized_tweet
FROM
    RAW_TWEETS
;

In [None]:
SELECT
    TWEET_TEXT as RAW_TWEET,
    SNOWFLAKE.CORTEX.SENTIMENT(TWEET_TEXT) AS TWEET_SENTIMENT
FROM
    RAW_TWEETS
;

In [None]:
SET llm = 'mistral-large2';
SET instructions = 'Analyze the tweet and categorize it into one or more of the below categories based on its content: ';
SET output_format = 'Return an array of category names that best fit the tweet: ';

In [None]:
SELECT
    TWEET_TEXT as RAW_TWEET,
    SNOWFLAKE.CORTEX.COMPLETE(
        $llm,
        [{
            'role': 'user',
            'content': $instructions || 
                       '1. Fan Engagement (fan enthusiasm, chants, crowd energy), ' ||
                       '2. Player Highlight (praising player performance), ' ||
                       '3. Vendor Opportunity (positive or negative food/merchandise experiences, excluding crowded lines or maintenance), ' ||
                       '4. Game Atmosphere (overall arena vibe), ' ||
                       '5. Vendor Operations (crowded vendor lines with keywords like "long line", "wait time", "queue", "crowded", "slow", "register", or maintenance issues like "spill", "mess", "broken", "dirty", "clean"). ' ||
                       $output_format || tweet_text
                    }],
                {'temperature': 0, 'max_tokens': 100}
            ) AS complete_response
FROM
    RAW_TWEETS
;

## Cortex Complete Structured Outputs

[Structured output](https://docs.snowflake.com/en/user-guide/snowflake-cortex/complete-structured-outputs) lets you supply a JSON schema that completion responses must follow.

**_Why this matters_**:
Output from even the most sophisticated LLMs can sometimes be inconsistent. By applying a structured schema, Cortes will ensure that the model's output will conform to the supplied schema

```sql
options: {
    ...
    response_format: {
        'type': 'json',
        'schema': {
            'type': 'object',
            'properties': {
                'property_name': {
                    'type': 'string'
                },
                ...
            },
            'required': ['property_name', ...]
        }
    }
```

In [None]:
SELECT
    TWEET_TEXT as RAW_TWEET,
    SNOWFLAKE.CORTEX.COMPLETE(
        $llm
        ,[{
            'role': 'user',
            'content': $instructions || 
                       '1. Fan Engagement (fan enthusiasm, chants, crowd energy), ' ||
                       '2. Player Highlight (praising player performance), ' ||
                       '3. Vendor Opportunity (positive or negative food/merchandise experiences, excluding crowded lines or maintenance), ' ||
                       '4. Game Atmosphere (overall arena vibe), ' ||
                       '5. Vendor Operations (crowded vendor lines with keywords like "long line", "wait time", "queue", "crowded", "slow", "register", or maintenance issues like "spill", "mess", "broken", "dirty", "clean"). ' ||
                       $output_format || tweet_text
        }]
        ,{
            'temperature': 0,
            'max_tokens': 100,
            'response_format': {
                    'type': 'json',
                    'schema': {
                        'type': 'object',
                        'properties': {
                            'categories': {
                                'type': 'array',
                                'items': { 'type': 'string' }
                            }
                        },
                        'required': ['categories']
                    }
                }
            }
        ) AS complete_response
FROM
    RAW_TWEETS
;

## Full LLM Processing

The below query combines all Cortex function calls into a single query
- Tweet summarization
- Tweet sentiment
- Tweet categorization with structured output

In [None]:
SELECT
    *
    ,SNOWFLAKE.CORTEX.SUMMARIZE($summarize_prompt || tweet_text) AS summary_output
    ,SNOWFLAKE.CORTEX.SENTIMENT(tweet_text) AS sentiment_output
    ,SNOWFLAKE.CORTEX.COMPLETE(
        $llm,
        [{
            'role': 'user',
            'content': $instructions || 
                       '1. Fan Engagement (fan enthusiasm, chants, crowd energy), ' ||
                       '2. Player Highlight (praising player performance), ' ||
                       '3. Vendor Opportunity (positive or negative food/merchandise experiences, excluding crowded lines or maintenance), ' ||
                       '4. Game Atmosphere (overall arena vibe), ' ||
                       '5. Vendor Operations (crowded vendor lines with keywords like "long line", "wait time", "queue", "crowded", "slow", "register", or maintenance issues like "spill", "mess", "broken", "dirty", "clean"). ' ||
                       $output_format || tweet_text
        }],
        {
            'temperature': 0,
            'max_tokens': 100,
            'response_format': {
                    'type': 'json',
                    'schema': {
                        'type': 'object',
                        'properties': {
                            'categories': {
                                'type': 'array',
                                'items': { 'type': 'string' }
                            }
                        },
                        'required': ['categories']
                    }
                }
            }
        ) AS complete_response
FROM
    RAW_TWEETS
;

## Full Tweet Processing

The below CTE calls all of our Cortex processing functions in a single statement and additionally generates an array of categories from the COMPLETE structured out

This CTE takes advantage of [cell referencing](https://docs.snowflake.com/en/user-guide/ui-snowsight/notebooks-develop-run#reference-cells-and-variables-in-sf-notebooks). Rather than rewriting the LLM processing query, we can simply write `select * from {{llm_processing}}` to use the results from the previous `{{llm_processing}}` cell output

-> _This is particularly useful when writing complex CTE's!!!_

In [None]:
WITH
categorization AS (
    select * from {{llm_processing}}
)
SELECT 
    c.*
    ,ARRAY_AGG(cat.value)::ARRAY AS complete_output_array
    ,ARRAY_TO_STRING(complete_output_array, ',') AS complete_output_string
FROM
    categorization c
LEFT JOIN LATERAL FLATTEN(input => c.complete_response:structured_output) so
LEFT JOIN LATERAL FLATTEN(input => so.value:raw_message:categories) cat
GROUP BY ALL
HAVING
    complete_output_string IS NOT NULL
;

## Building Continuous Data Engineering Pipelines with Cortex AI

Now that we've built a query to batch process our unstructured data with Cortex, we can leverage other Snowflake data engineering features to continuously process new tweets as they flow into our environment

1. Simulate streaming tweets into the `RAW_TWEETS` table
2. Create stored procedure for LLM batch processing
3. Create a stream on the `RAW_TWEETS` to capture DML updates
4. Build a [triggered task](https://docs.snowflake.com/en/user-guide/tasks-intro#triggered-tasks) to call our SPROC whenever our stream has data
5. Insert processed tweets into the `ENRICHED_TWEETS` table
6. Build a [Dynamic Table](https://docs.snowflake.com/en/user-guide/dynamic-tables-intro) to generate our "gold layer" data for BI, reporting, and other analysis
7. Develop a chat app with [Cortex Analyst](https://docs.snowflake.com/en/user-guide/snowflake-cortex/cortex-analyst) and [Streamlit in Snowflake](https://docs.snowflake.com/en/developer-guide/streamlit/about-streamlit) for a "talk to your data" experience

In [None]:
CREATE OR REPLACE PROCEDURE PROCESS_NEW_TWEETS()
RETURNS STRING
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
BEGIN

    -- Insert new records from raw_tweets into enriched_tweets with sentiment, summary, and categorization
    INSERT INTO ENRICHED_TWEETS (
        tweet_id,
        tweet_text,
        user_handle,
        user_location,
        retweet_count,
        like_count,
        reply_count,
        quote_count,
        follower_count,
        verified_status,
        comment_count,
        game_minute,
        created_at,
        sentiment_output,
        summary_output,
        complete_response,
        processed_at,
        complete_output_array,
        complete_output_string
    )
    WITH 
    tweets AS (
        SELECT 
            *
        FROM
            RAW_TWEETS
        WHERE 1=1
            AND tweet_id IS NOT NULL
    ),
    categorization AS (
        SELECT 
            t.*
            ,SNOWFLAKE.CORTEX.SENTIMENT(t.tweet_text) AS sentiment_output
            ,SNOWFLAKE.CORTEX.SUMMARIZE(
                'Summarize the following tweet in 1-2 sentences, focusing on fan engagement, player highlights, or vendor experiences, and highlight specific business opportunities like improving operations, targeting promotions, or addressing customer pain points: ' || t.tweet_text
            ) AS summary_output
            ,SNOWFLAKE.CORTEX.COMPLETE(
                'mistral-large2',
                [{
                    'role': 'user',
                    'content': 'Analyze the tweet and categorize it into one or more of these five categories based on its content: ' ||
                               '1. Fan Engagement (fan enthusiasm, chants, crowd energy), ' ||
                               '2. Player Highlight (praising player performance), ' ||
                               '3. Vendor Opportunity (positive or negative food/merchandise experiences, excluding crowded lines or maintenance), ' ||
                               '4. Game Atmosphere (overall arena vibe), ' ||
                               '5. Vendor Operations (crowded vendor lines with keywords like "long line", "wait time", "queue", "crowded", "slow", "register", or maintenance issues like "spill", "mess", "broken", "dirty", "clean"). ' ||
                               'Return an array of category names that best fit the tweet: ' || t.tweet_text
                }]
                ,{
                    'temperature': 0
                    ,'max_tokens': 100
                    ,'response_format': {
                        'type': 'json',
                        'schema': {
                            'type': 'object',
                            'properties': {
                                'categories': {
                                    'type': 'array',
                                    'items': { 'type': 'string' }
                                }
                            },
                            'required': ['categories']
                        }
                    }
                }
            ) AS complete_response
        FROM tweets as t
    )
    SELECT 
        c.*
        ,CURRENT_TIMESTAMP(2) as processed_at
        ,ARRAY_AGG(cat.value)::ARRAY AS complete_output_array
        ,ARRAY_TO_STRING(complete_output_array, ',') AS complete_output_string
    FROM categorization c
    LEFT JOIN LATERAL FLATTEN(input => c.complete_response:structured_output) so
    LEFT JOIN LATERAL FLATTEN(input => so.value:raw_message:categories) cat
    GROUP BY ALL
    HAVING
        complete_output_string IS NOT NULL
    ;
    
    RETURN 'Successfully processed new tweets into enriched_tweets';
EXCEPTION
    WHEN OTHER THEN
        RETURN 'Error processing tweets: ' || SQLERRM;
END;
$$;


## Triggered Tasks

First we'll build a stream on `RAW_TWEETS` to record DML statements (updates, inserts, deletes) made against the table

The task will call the `process_new_tweets()` SPROC whenever condition `SYSTEM$STREAM_HAS_DATA('raw_tweets_st')` evaluates to `true`

In [None]:
CREATE OR REPLACE STREAM RAW_TWEETS_ST ON TABLE RAW_TWEETS;

CREATE OR REPLACE TASK TWEET_PROCESSING
  WAREHOUSE = DEV_WH_ARENAFLOW_HOLS
  WHEN SYSTEM$STREAM_HAS_DATA('RAW_TWEETS_ST')
  AS
  CALL PROCESS_NEW_TWEETS();

In [None]:
ALTER TASK tweet_processing RESUME;

## Continuous Batch Processing with Cortex

The following functions simulates batches of tweets

- A batch of tweets is generated from the `fake_profile` library
- NBA specific tweets are generated using Cortex Complete
- The batch of augmented tweets are converted to a [Snowpark DataFrame](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes) then saved to the `RAW_TWEETS` table
- The `process_new_tweets()` SPROC is called via the `tweet_processing` triggered task

In [None]:
async def insert_batch(batch_data, session):
    if batch_data:
        try:
            df_batch = session.create_dataframe(batch_data)
            df_batch.write.mode("append").save_as_table("RAW_TWEETS")
            print(f"Inserted batch of {len(batch_data)} tweets")
        except Exception as e:
            print(f"Error inserting batch: {str(e)}")

In [None]:
async def simulate_streaming(tweets, users, session, num_tweets=5):
    
    executor = ThreadPoolExecutor(max_workers=5)  # Limit concurrent workers
    batch_size_range = (3, 15)  # Random batch size between 1 and 7
    delay_range = (0.5, 2.0)  # Random delay between 0.5 and 2 seconds
    cumulative_delay = 0.0  # Initialize cumulative delay to track simulated time

    remaining_tweets = tweets[:num_tweets]
    
    while remaining_tweets:
        
        # Calculate current game minute based on cumulative delay (1 minute = 10 seconds of simulated time)
        current_game_minute = min(int(cumulative_delay // 10), 65)
        
        batch_size = random.randint(*batch_size_range)
        batch_tweets = remaining_tweets[:batch_size]
        remaining_tweets = remaining_tweets[batch_size:]

        # Process tweets in parallel with the current game minute
        loop = asyncio.get_event_loop()
        tasks = [loop.run_in_executor(executor, process_tweet, t, users, current_game_minute) for t in batch_tweets]
        batch_data = await asyncio.gather(*tasks)
        batch_data = [data for data in batch_data if data is not None]

        # Insert batch asynchronously
        if batch_data:
            await insert_batch(batch_data, session)

        # Simulate time passing with a random delay and update cumulative delay
        delay = random.uniform(*delay_range)
        cumulative_delay += delay
        await asyncio.sleep(delay)

## Dynamic Table for Gold Layer Processing

The `ENRICHED_TWEETS_DT` Dynamic Table is the final layer of processing in our data pipelines

The DT will incrementally process new tweets as they are inserted into the `ENRICHED_TWEETS` table. The setting `TARGET_LAG = '1 minute'` ensures that our gold layer data is never more than 1 minute out of date

In [None]:
CREATE OR REPLACE DYNAMIC TABLE ENRICHED_TWEETS_DT (
    TWEET_ID,
    RETWEETS,
    LIKES,
    REPLIES,
    QUOTES,
    CREATED_AT,
    PROCESSED_AT,
    STREAM_LATENCY,
    GAME_MINUTE,
    SENTIMENT,
    SUMMARY,
    TOPIC  
)
CLUSTER BY (GAME_MINUTE, SENTIMENT)
REFRESH_MODE = AUTO
WAREHOUSE = 'DEV_WH_ARENAFLOW_HOLS' 
TARGET_LAG = '1 minute'
AS
SELECT
    tweet_id,
    retweet_count,
    like_count,
    reply_count,
    quote_count,
    created_at,
    processed_at,
    TIMESTAMPDIFF(second, created_at, processed_at) AS stream_latency,
    game_minute,
    CASE
        WHEN sentiment_output >= 0.7 THEN 'good'
        ELSE 'bad'
    END AS sentiment,
    summary_output,
    complete_output_string
FROM
   ENRICHED_TWEETS
;

In [None]:
n = 100
generator = Xprofile()
users, tweets, comments, likes, retweets = generator.generate_fake_twitter_data(user_count=n, tweets_per_user=1)

# Run the asynchronous streaming simulation
asyncio.run(simulate_streaming(tweets, users, session, num_tweets=n))

In [None]:
-- ALTER TASK tweet_processing SUSPEND;
