# Interactive Table Streaming Demo (Snowpipe Streaming SDK)

This notebook demonstrates:
1. Creating an Interactive Table for real-time analytics
2. Generating realistic UK customer data as JSON
3. **Streaming data directly via Snowpipe Streaming SDK** (the ONLY way to populate Interactive Tables)
4. Querying and analyzing UK web traffic data

‚ö†Ô∏è **Important:** Interactive Tables can ONLY be populated via the Snowpipe Streaming SDK - regular SQL INSERT/COPY does NOT work!

---

## 1Ô∏è‚É£ Setup Database & Schema

In [None]:
-- Cell: setup_database
-- Create the database and schema
CREATE DATABASE IF NOT EXISTS INTERACTIVE_JSON_DB;
USE DATABASE INTERACTIVE_JSON_DB;
CREATE SCHEMA IF NOT EXISTS STREAMING;
USE SCHEMA STREAMING;

## 2Ô∏è‚É£ Create Interactive Table

Interactive Tables are optimized for real-time analytics with sub-second query latency.

‚ö†Ô∏è **Note:** Interactive Tables can only be populated via Snowpipe Streaming SDK, not regular SQL.

In [None]:
-- Cell: create_interactive_table
-- Create Interactive Table (for Ingest SDK streaming)
CREATE OR REPLACE INTERACTIVE TABLE CUSTOMERS CLUSTER BY (CLIENTIP) (
    EVENTDATE DATE,
    COUNTERID NUMBER(38,0),
    CLIENTIP VARCHAR(16777216),
    SEARCHENGINEID NUMBER(38,0),
    SEARCHPHRASE VARCHAR(16777216),
    RESOLUTIONWIDTH NUMBER(38,0),
    TITLE VARCHAR(16777216),
    ISREFRESH NUMBER(38,0),
    DONTCOUNTHITS NUMBER(38,0)
);

---

In [None]:
-- Cell: grant_permissions
-- Grant necessary permissions for streaming
GRANT USAGE ON DATABASE INTERACTIVE_JSON_DB TO ROLE ACCOUNTADMIN;
GRANT USAGE ON SCHEMA STREAMING TO ROLE ACCOUNTADMIN;
GRANT INSERT ON TABLE CUSTOMERS TO ROLE ACCOUNTADMIN;

## 3Ô∏è‚É£ Insert Customer Data

This Python cell generates realistic UK web analytics data with UK IP addresses, search phrases, and websites.

In [None]:
# Cell: generate_uk_data
import json
import random
from datetime import datetime, timedelta

# UK-specific data
UK_CITIES_IPS = {
    "London": ["185.86.", "194.168.", "212.58.", "31.52."],
    "Manchester": ["81.105.", "82.132.", "109.170.", "86.156."],
    "Birmingham": ["92.233.", "86.149.", "109.157.", "81.103."],
    "Leeds": ["90.216.", "86.146.", "109.154.", "81.108."],
    "Glasgow": ["92.238.", "86.159.", "109.148.", "81.111."],
    "Liverpool": ["81.100.", "86.147.", "109.171.", "90.218."],
    "Bristol": ["86.153.", "109.155.", "81.106.", "92.234."],
    "Edinburgh": ["86.158.", "109.149.", "81.112.", "92.239."],
    "Cardiff": ["86.154.", "109.156.", "81.107.", "92.235."],
    "Belfast": ["86.160.", "109.150.", "81.113.", "92.240."],
}

UK_SEARCH_PHRASES = [
    "best fish and chips near me", "weather forecast london", "premier league results",
    "train times to manchester", "nhs appointment booking", "uk visa requirements",
    "cheap flights heathrow", "rightmove houses for sale", "bbc news live",
    "tesco delivery slots", "argos click and collect", "amazon uk prime",
    "royal mail tracking", "dvla tax check", "energy price cap uk",
]

UK_PAGE_TITLES = [
    "BBC News - Home", "The Guardian | News", "Daily Mail Online",
    "Sky News - Breaking News", "Rightmove - UK Property", "Amazon.co.uk",
    "Tesco Groceries Online", "Sainsbury's Shopping", "Argos | Same Day Delivery",
    "John Lewis & Partners", "NHS Health A-Z", "GOV.UK Services",
]

RESOLUTIONS = [1920, 1366, 1536, 1440, 1280, 2560, 3840, 1680]

def generate_uk_ip():
    city = random.choice(list(UK_CITIES_IPS.keys()))
    prefix = random.choice(UK_CITIES_IPS[city])
    return f"{prefix}{random.randint(1, 254)}.{random.randint(1, 254)}"

def generate_customer_event(event_date):
    return {
        "EVENTDATE": event_date.strftime("%Y-%m-%d"),
        "COUNTERID": random.randint(100000, 999999),
        "CLIENTIP": generate_uk_ip(),
        "SEARCHENGINEID": random.choices([1, 2, 3, 4, 5, 0], weights=[50, 20, 10, 10, 5, 5])[0],
        "SEARCHPHRASE": random.choice(UK_SEARCH_PHRASES) if random.random() > 0.3 else "",
        "RESOLUTIONWIDTH": random.choice(RESOLUTIONS),
        "TITLE": random.choice(UK_PAGE_TITLES),
        "ISREFRESH": random.choices([0, 1], weights=[85, 15])[0],
        "DONTCOUNTHITS": random.choices([0, 1], weights=[95, 5])[0],
    }

# Generate 500 records
records = []
base_date = datetime.now()
for _ in range(500):
    event_date = base_date - timedelta(days=random.randint(0, 30))
    records.append(generate_customer_event(event_date))

print(f"‚úÖ Generated {len(records)} UK customer records")
print("\nSample record:")
print(json.dumps(records[0], indent=2))

## 4Ô∏è‚É£ Stream Data via Snowpipe Streaming SDK

The Snowpipe Streaming SDK is the **only** way to insert data into Interactive Tables. It requires:
- Private key authentication (keypair auth)
- The `snowpipe-streaming` package: `pip install snowpipe-streaming`
- Uses the default pipe naming convention: `TABLE_NAME-STREAMING`

In [None]:
# Cell: stream_to_interactive_table
# ============================================
# Snowpipe Streaming V2 - Stream data to Interactive Table
# ============================================
# IMPORTANT: This cell uses the Snowpipe Streaming SDK (V2)
# DO NOT use SQL INSERT - it will fail on Interactive Tables!
# ============================================

import os

# ============================================
# CONFIGURATION - Update these values
# ============================================
SNOWFLAKE_ACCOUNT = "SFSEEUROPE-COLM_USWEST"  
SNOWFLAKE_USER = "ADMIN"
PRIVATE_KEY_PATH = os.path.expanduser("~/.ssh/snowflake/rsa_key.p8")

# Target table details
DATABASE = "INTERACTIVE_JSON_DB"
SCHEMA = "STREAMING"
TABLE = "CUSTOMERS"
ROLE = "ACCOUNTADMIN"

# Default pipe naming convention for Interactive Tables: TABLE_NAME-STREAMING
PIPE_NAME = f"{TABLE}-STREAMING"

# ============================================
# Load Private Key as PEM string
# ============================================
def load_private_key_pem(path):
    with open(path, "rb") as key_file:
        return key_file.read().decode('utf-8')

# ============================================
# Stream Data using Snowpipe Streaming V2 SDK
# ============================================
print("=" * 60)
print("SNOWPIPE STREAMING V2 - Interactive Table Data Ingestion")
print("=" * 60)

# Import the Snowpipe Streaming SDK
from snowflake.ingest.streaming import StreamingIngestClient

# Load private key as PEM string
private_key_pem = load_private_key_pem(PRIVATE_KEY_PATH)

print(f"üîå Connecting to Snowflake...")
print(f"   Account: {SNOWFLAKE_ACCOUNT}")
print(f"   User: {SNOWFLAKE_USER}")
print(f"   Target: {DATABASE}.{SCHEMA}.{TABLE}")
print(f"   Default Pipe: {PIPE_NAME}")

# Create Snowpipe Streaming V2 client
client = StreamingIngestClient(
    client_name=f"{TABLE}_client",
    db_name=DATABASE,
    schema_name=SCHEMA,
    pipe_name=PIPE_NAME,
    properties={
        "account": SNOWFLAKE_ACCOUNT,
        "user": SNOWFLAKE_USER,
        "private_key": private_key_pem,
        "role": ROLE,
        "url": f"https://{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com"
    }
)

# Open a streaming channel to the Interactive Table
print(f"\nüì° Opening streaming channel...")
channel, status = client.open_channel(f"{TABLE}_channel")
print(f"   Channel status: {status}")

# Stream records using Snowpipe Streaming V2 (NOT SQL INSERT!)
print(f"\nüöÄ Streaming {len(records)} records via Snowpipe Streaming V2...")
print("   (Using channel.append_row() - NOT SQL INSERT)")

for i, record in enumerate(records):
    # Use Snowpipe Streaming V2 append_row method
    # This is the ONLY way to insert into Interactive Tables
    channel.append_row(record)
    
    # Progress indicator every 100 records
    if (i + 1) % 100 == 0:
        print(f"   ‚úì Streamed {i + 1}/{len(records)} records...")

# Close channel and client properly
print("\nüì§ Flushing and closing channel...")
channel.close()
client.close()

print(f"\n" + "=" * 60)
print(f"‚úÖ SUCCESS: Streamed {len(records)} records to Interactive Table")
print(f"   Table: {DATABASE}.{SCHEMA}.{TABLE}")
print(f"   Method: Snowpipe Streaming V2 (append_row)")
print("=" * 60)

## 5Ô∏è‚É£ Verify Data & Analytics

In [None]:
-- Cell: count_records
-- Check record count in Interactive Table
SELECT COUNT(*) AS TOTAL_RECORDS FROM CUSTOMERS;

In [None]:
-- Cell: traffic_by_city
-- Traffic by UK City (based on IP prefix)
SELECT 
    CASE 
        WHEN CLIENTIP LIKE '185.86.%' OR CLIENTIP LIKE '31.52.%' THEN 'London'
        WHEN CLIENTIP LIKE '81.105.%' OR CLIENTIP LIKE '82.132.%' THEN 'Manchester'
        WHEN CLIENTIP LIKE '92.233.%' OR CLIENTIP LIKE '86.149.%' THEN 'Birmingham'
        WHEN CLIENTIP LIKE '90.216.%' OR CLIENTIP LIKE '86.146.%' THEN 'Leeds'
        WHEN CLIENTIP LIKE '92.238.%' OR CLIENTIP LIKE '86.159.%' THEN 'Glasgow'
        WHEN CLIENTIP LIKE '81.100.%' OR CLIENTIP LIKE '86.147.%' THEN 'Liverpool'
        WHEN CLIENTIP LIKE '86.153.%' OR CLIENTIP LIKE '109.155.%' THEN 'Bristol'
        WHEN CLIENTIP LIKE '86.158.%' OR CLIENTIP LIKE '109.149.%' THEN 'Edinburgh'
        WHEN CLIENTIP LIKE '86.154.%' OR CLIENTIP LIKE '109.156.%' THEN 'Cardiff'
        WHEN CLIENTIP LIKE '86.160.%' OR CLIENTIP LIKE '109.150.%' THEN 'Belfast'
        ELSE 'Other UK'
    END AS CITY,
    COUNT(*) AS VISITORS,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS PERCENTAGE
FROM CUSTOMERS
GROUP BY 1
ORDER BY 2 DESC;

In [None]:
-- Cell: top_search_phrases
-- Top Search Phrases
SELECT SEARCHPHRASE, COUNT(*) AS SEARCHES
FROM CUSTOMERS
WHERE SEARCHPHRASE != ''
GROUP BY SEARCHPHRASE
ORDER BY 2 DESC
LIMIT 10;

In [None]:
-- Cell: most_visited_pages
-- Most Visited Pages
SELECT TITLE AS PAGE_TITLE, COUNT(*) AS PAGE_VIEWS
FROM CUSTOMERS
GROUP BY TITLE
ORDER BY 2 DESC
LIMIT 10;

In [None]:
-- Cell: daily_traffic
-- Daily traffic trend
SELECT 
    EVENTDATE,
    COUNT(*) AS DAILY_VISITORS
FROM CUSTOMERS
GROUP BY EVENTDATE
ORDER BY EVENTDATE DESC
LIMIT 30;

---

## üìã Summary

This notebook demonstrates streaming data directly into an **Interactive Table** using the Snowpipe Streaming SDK.

| Object | Name | Purpose |
|--------|------|---------|
| Database | `INTERACTIVE_JSON_DB` | Container for streaming demo |
| Interactive Table | `CUSTOMERS` | Real-time analytics with sub-second latency |
| Default Pipe | `CUSTOMERS-STREAMING` | System-generated pipe for streaming |

### Key Points:

‚ö†Ô∏è **Interactive Tables can ONLY be populated via:**
- Snowpipe Streaming SDK (this notebook)
- Snowflake Kafka Connector
- Snowflake Spark Connector (streaming mode)

‚ùå **These do NOT work with Interactive Tables:**
- SQL `INSERT` statements
- `COPY INTO` commands
- Regular Snowpipe

### Default Pipe Convention:
As of December 2025, Snowflake automatically creates a default pipe for streaming with the naming convention: `TABLE_NAME-STREAMING`

### Prerequisites:

```bash
# Install required packages
pip install snowpipe-streaming

# Generate keypair for authentication
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

# Assign public key to Snowflake user
# ALTER USER your_user SET RSA_PUBLIC_KEY='<public_key_content>';
```

### To stream more data:
Simply run the Python cells again with new records - the Streaming SDK will stream them directly to the Interactive Table!

---

**End of Notebook**