# Snowflake Setup for Kafka Interactive Tables Demo

This notebook contains **all the SQL commands** needed to set up Snowflake for the Kafka Interactive Tables demo. Run this notebook before starting the Kafka connector.

## What This Notebook Does

1. **Set Context** - Creates database and schema
2. **Create Role and User** - Sets up Kafka connector authentication
3. **Key Pair Authentication** - Configures RSA public key
4. **Interactive Table Creation** - Creates an Interactive Table optimized for sensor data
5. **Interactive Warehouse Setup** - Creates and configures an Interactive Warehouse
6. **Grant Permissions** - Grants ownership to Kafka connector role
7. **Sample Queries** - Provides queries for analyzing streaming data
8. **Cleanup** - Commands to remove all demo resources

## Prerequisites
- ACCOUNTADMIN role access
- RSA key pair generated (see instructions below)

---

# Part 1: Set Context

Create the database and schema for the demo.

In [None]:
USE ROLE ACCOUNTADMIN;

-- Create database and schema if they don't exist
CREATE DATABASE IF NOT EXISTS KAFKA_INTERACTIVE;
CREATE SCHEMA IF NOT EXISTS KAFKA_INTERACTIVE.STREAMING;

In [None]:
-- Set context
USE DATABASE KAFKA_INTERACTIVE;
USE SCHEMA STREAMING;

---

# Part 2: Create Role and User for Kafka Connector

The Kafka connector needs its own role with specific permissions to:
- Use the database and schema
- Create and manage tables
- Create stages and pipes for streaming

In [None]:
-- Create a role for the Kafka connector
CREATE ROLE IF NOT EXISTS KAFKA_CONNECTOR_ROLE;
GRANT ROLE KAFKA_CONNECTOR_ROLE TO ROLE ACCOUNTADMIN;

-- Grant necessary privileges
GRANT USAGE ON DATABASE KAFKA_INTERACTIVE TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT USAGE ON SCHEMA KAFKA_INTERACTIVE.STREAMING TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE TABLE ON SCHEMA KAFKA_INTERACTIVE.STREAMING TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE STAGE ON SCHEMA KAFKA_INTERACTIVE.STREAMING TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT CREATE PIPE ON SCHEMA KAFKA_INTERACTIVE.STREAMING TO ROLE KAFKA_CONNECTOR_ROLE;

In [None]:
-- Create a user for the Kafka connector
CREATE USER IF NOT EXISTS KAFKA_USER
  DEFAULT_ROLE = KAFKA_CONNECTOR_ROLE
  DEFAULT_NAMESPACE = KAFKA_INTERACTIVE.STREAMING;

GRANT ROLE KAFKA_CONNECTOR_ROLE TO USER KAFKA_USER;

---

# Part 3: Configure Key Pair Authentication

The Kafka connector uses RSA key pair authentication for secure access to Snowflake.

## Generate RSA Key Pair (Run in Terminal)

Before running the next cell, generate an RSA key pair in your terminal:

```bash
# Generate private key (unencrypted for simplicity in this demo)
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

# Generate public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

# Display the public key (you'll need this for the next step)
cat rsa_key.pub
```

Copy the public key content (without the `-----BEGIN PUBLIC KEY-----` and `-----END PUBLIC KEY-----` lines).

## Step 3.1: Assign RSA Public Key to User

**‚ö†Ô∏è IMPORTANT:** Replace the placeholder below with your actual public key content (on a single line, no headers/footers).

In [None]:
-- Replace YOUR_PUBLIC_KEY_HERE with your actual public key content
-- Example: MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A...
ALTER USER KAFKA_USER SET RSA_PUBLIC_KEY='YOUR_PUBLIC_KEY_HERE';

## Step 3.2: Verify Key Configuration

Check that the public key was assigned correctly by looking for the `RSA_PUBLIC_KEY_FP` (fingerprint) property.

In [None]:
DESC USER KAFKA_USER ->> SELECT * FROM $1 WHERE "property" = 'RSA_PUBLIC_KEY';

---

# Part 4: Create Interactive Table

Interactive Tables are a special type of Snowflake table optimized for low-latency, interactive workloads.

## Key Characteristics of Interactive Tables
- **Requires `CLUSTER BY`** - Choose columns used in your most frequent WHERE clauses
- **Optimized for selective queries** - Best for queries that filter on clustered columns
- **Works with Interactive Warehouses** - Must be added to an Interactive Warehouse for low-latency queries
- **Supports streaming ingestion** - Works with Snowpipe Streaming v2

For our IoT sensor data, we'll cluster by `device_id` and `timestamp` since those will be common filter conditions.

In [None]:
CREATE OR REPLACE INTERACTIVE TABLE SENSOR_DATA (
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT,
    device_id VARCHAR(50),
    sensor_type VARCHAR(50),
    value FLOAT,
    unit VARCHAR(20),
    timestamp TIMESTAMP_NTZ,
    location VARIANT
)
CLUSTER BY (device_id, timestamp);

### Verify Interactive Table Creation

In [None]:
SHOW TABLES LIKE 'SENSOR_DATA';

In [None]:
SELECT GET_DDL('TABLE', 'SENSOR_DATA');

---

# Part 5: Create Interactive Warehouse

Interactive Warehouses are optimized for low-latency queries on Interactive Tables.

## Key Characteristics of Interactive Warehouses
- **Always running** - They don't auto-suspend (you can manually suspend)
- **Low-latency optimized** - Tuned for sub-second query response
- **Query timeout** - SELECT commands default to 5-second timeout
- **Table restriction** - Can ONLY query Interactive Tables

We'll create an XSMALL warehouse which is appropriate for:
- Working data sets less than 500 GB
- Development and testing scenarios

‚ö†Ô∏è _You will get a `NotebookSqlException` telling you that the `SENSOR_IWH` warehouse is suspended. This is okay. We are going to resume it next_.

In [None]:
CREATE OR REPLACE INTERACTIVE WAREHOUSE SENSOR_IWH
    WAREHOUSE_SIZE = 'XSMALL';

### Resume the Interactive Warehouse

Interactive Warehouses are created in a suspended state. We need to resume it before use.

In [None]:
ALTER WAREHOUSE SENSOR_IWH RESUME;

### Verify Interactive Warehouse

In [None]:
SHOW WAREHOUSES LIKE 'SENSOR_IWH';

### Add Interactive Table to Interactive Warehouse

Before querying an Interactive Table from an Interactive Warehouse, you must explicitly add the table to the warehouse. This starts the cache-warming process.

In [None]:
ALTER WAREHOUSE SENSOR_IWH ADD TABLES (SENSOR_DATA);

---

# Part 6: Grant Permissions to Kafka Connector

The Kafka connector needs ownership of the Interactive Table to stream data into it.

In [None]:
GRANT INSERT, SELECT ON TABLE SENSOR_DATA TO ROLE KAFKA_CONNECTOR_ROLE;
GRANT OWNERSHIP ON TABLE SENSOR_DATA TO ROLE KAFKA_CONNECTOR_ROLE REVOKE CURRENT GRANTS;

---

# Part 7: Test Setup (Before Kafka)

Let's verify everything is set up correctly before starting the Kafka connector.

In [None]:
-- Switch to Interactive Warehouse for testing
USE WAREHOUSE SENSOR_IWH;

In [None]:
-- Verify table is accessible (will return 0 rows before streaming starts)
SELECT COUNT(*) as total_records FROM SENSOR_DATA;

### Setup Complete! ‚úÖ

The Snowflake setup is now complete. Next steps:

1. **Start Kafka** - Run `docker compose up -d` in your project directory
2. **Create Kafka Topic** - Create the `sensor_data` topic
3. **Configure Kafka Connector** - Deploy the Snowflake connector with your credentials
4. **Start Streaming Data** - Run the data generator script
5. **Query Data** - Use the cells below to query streaming data

---

# Part 8: Querying Streaming Data

After you've configured and started the Kafka connector, run the cells below to query the streaming data.

**Note:** Make sure you're using the Interactive Warehouse (`SENSOR_IWH`) for low-latency queries.

In [None]:
USE WAREHOUSE SENSOR_IWH;

## Verify Test Message

After sending your first test message with `send_message.py`, run this cell to verify it arrived in Snowflake.

**Note:** It may take 5-10 seconds for data to appear due to Snowpipe Streaming latency.

In [None]:
-- Check all data in the table (useful for verifying first test message)
SELECT * FROM SENSOR_DATA;

## Sample Queries

Below is a selection of a few sample queries we can issue against the streaming data, for illustrative purposes.

### Recent Sensor Readings

Get the most recent sensor readings from all devices.

In [None]:
SELECT 
    device_id,
    sensor_type,
    value,
    unit,
    timestamp,
    location:building::STRING as building,
    location:floor::INTEGER as floor,
    location:zone::STRING as zone
FROM SENSOR_DATA
WHERE timestamp >= DATEADD(minute, -5, CURRENT_TIMESTAMP())
ORDER BY timestamp DESC
LIMIT 20;

### Real-Time Aggregations

Calculate average sensor values by type in the last minute.

In [None]:
SELECT 
    sensor_type,
    COUNT(*) as reading_count,
    ROUND(AVG(value), 2) as avg_value,
    ROUND(MIN(value), 2) as min_value,
    ROUND(MAX(value), 2) as max_value,
    ROUND(STDDEV(value), 2) as stddev_value
FROM SENSOR_DATA
WHERE timestamp >= DATEADD(minute, -1, CURRENT_TIMESTAMP())
GROUP BY sensor_type
ORDER BY reading_count DESC;

### Device Activity Monitor

See which devices are actively reporting in the last 30 seconds.

In [None]:
SELECT 
    device_id,
    COUNT(*) as readings,
    MAX(timestamp) as last_reading,
    DATEDIFF('second', MAX(timestamp), CURRENT_TIMESTAMP()) as seconds_ago
FROM SENSOR_DATA
WHERE timestamp >= DATEADD(second, -30, CURRENT_TIMESTAMP())
GROUP BY device_id
ORDER BY readings DESC;

### Building Summary

Aggregate sensor data by building location.

In [None]:
SELECT 
    location:building::STRING as building,
    COUNT(*) as total_readings,
    COUNT(DISTINCT device_id) as active_devices,
    COUNT(DISTINCT sensor_type) as sensor_types
FROM SENSOR_DATA
WHERE timestamp >= DATEADD(minute, -5, CURRENT_TIMESTAMP())
GROUP BY location:building
ORDER BY total_readings DESC;

## Data Freshness Check

Verify the streaming pipeline is working by checking the most recent data.

In [None]:
SELECT 
    COUNT(*) as total_records,
    MIN(timestamp) as oldest_record,
    MAX(timestamp) as newest_record,
    DATEDIFF('second', MAX(timestamp), CURRENT_TIMESTAMP()) as data_lag_seconds
FROM SENSOR_DATA;

---

# Part 9: Monitoring the Streaming Pipeline

## View Streaming Channels

Snowpipe Streaming v2 creates channels for data ingestion.

In [None]:
SHOW CHANNELS IN SCHEMA KAFKA_INTERACTIVE.STREAMING;

## View Default Pipe

Snowpipe Streaming v2 automatically creates a default pipe for each table.

In [None]:
SHOW PIPES IN SCHEMA KAFKA_INTERACTIVE.STREAMING;

## Table Storage Information

In [None]:
SELECT 
    TABLE_NAME,
    ROW_COUNT,
    BYTES / (1024*1024) as SIZE_MB,
    LAST_ALTERED
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = 'SENSOR_DATA';

---

# Part 10: Cleanup

Run these cells when you're done with the demo to clean up all Snowflake resources.

**‚ö†Ô∏è WARNING:** These commands will permanently delete all demo resources.

**Before running cleanup:**
1. Stop the data generator (Ctrl+C)
2. Remove the Kafka connector: `curl -X DELETE http://localhost:8083/connectors/snowflake-sensor-data`
3. Stop the Kafka cluster: `docker compose down -v`

In [None]:
-- USE ROLE ACCOUNTADMIN;

-- Suspend and drop Interactive Warehouse
-- ALTER WAREHOUSE IF EXISTS SENSOR_IWH SUSPEND;
-- DROP WAREHOUSE IF EXISTS SENSOR_IWH;

-- Drop table
-- DROP TABLE IF EXISTS KAFKA_INTERACTIVE.STREAMING.SENSOR_DATA;

-- Drop schema
-- DROP SCHEMA IF EXISTS KAFKA_INTERACTIVE.STREAMING;

-- Drop database
-- DROP DATABASE IF EXISTS KAFKA_INTERACTIVE;

-- Drop user and role
-- DROP USER IF EXISTS KAFKA_USER;
-- DROP ROLE IF EXISTS KAFKA_CONNECTOR_ROLE;

### Cleanup Complete! üßπ

Uncomment and run the commands above to remove all Snowflake resources.