# RFID Badge Event Simulator - Snowflake Notebook

This notebook demonstrates Snowflake's **Snowpipe Streaming REST API** by generating and sending RFID badge events directly via HTTP POST.

**What this showcases:**
- Native REST API ingestion (no external infrastructure)
- JWT key-pair authentication
- Channel-based streaming with continuation tokens
- Real-time data ingestion (<10 second latency)

**Prerequisites:**
1. SQL setup scripts executed (creates database, pipe, tables)
2. RSA key pair generated and registered in Snowflake
3. Snowflake secrets created with credentials (see Setup section)


## üìã Setup: Create Snowflake Secrets (Run Once)

Before running this notebook, execute these SQL commands in a Snowflake worksheet:

```sql
-- Create secrets to store credentials
CREATE OR REPLACE SECRET RFID_JWT_PRIVATE_KEY
  TYPE = GENERIC_STRING
  SECRET_STRING = '<paste_your_private_key_here>';  -- Full PEM content

CREATE OR REPLACE SECRET RFID_ACCOUNT
  TYPE = GENERIC_STRING
  SECRET_STRING = 'YOUR_ACCOUNT_IDENTIFIER';  -- e.g., MYORG-ACCOUNT

CREATE OR REPLACE SECRET RFID_USER
  TYPE = GENERIC_STRING
  SECRET_STRING = 'YOUR_USERNAME';
```

**To get your private key:**
1. If generated via OpenSSL: `cat config/rsa_key.p8`
2. Copy entire content including `-----BEGIN PRIVATE KEY-----` and `-----END PRIVATE KEY-----`
3. Paste as multi-line string in SECRET_STRING


In [None]:
# Cell 1: Import Required Libraries
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import _snowflake
import requests
import json
import hashlib
import base64
import time
import random
from datetime import datetime, timedelta

from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding

print("‚úÖ Libraries imported successfully")


In [None]:
# Cell 2: Configuration - Load from Snowflake Secrets
# This uses Snowflake's secure secret storage instead of local .env files

def get_session():
    """Get Snowflake session (auto-provided in Snowflake Notebooks)"""
    return snowpark.context.get_active_session()

def load_secrets():
    """Load credentials from Snowflake secrets"""
    private_key_pem = _snowflake.get_generic_secret_string('RFID_JWT_PRIVATE_KEY')
    account = _snowflake.get_generic_secret_string('RFID_ACCOUNT')
    user = _snowflake.get_generic_secret_string('RFID_USER')
    
    return {
        'account': account,
        'user': user,
        'private_key_pem': private_key_pem,
        'database': 'SNOWFLAKE_EXAMPLE',
        'schema': 'STAGE_BADGE_TRACKING',
        'pipe': 'BADGE_EVENTS_PIPE'
    }

# Load configuration
config = load_secrets()
print(f"‚úÖ Configuration loaded for account: {config['account']}")
print(f"   User: {config['user']}")
print(f"   Target: {config['database']}.{config['schema']}.{config['pipe']}")


In [None]:
# Cell 3: JWT Authentication
# Generate JWT token for Snowflake REST API authentication

class SnowflakeAuth:
    """Handle JWT token generation for Snowflake REST API"""
    
    def __init__(self, account, user, private_key_pem):
        self.account = account
        self.user = user
        self.private_key = self._load_private_key(private_key_pem)
        self.public_key_fingerprint = self._calculate_fingerprint()
    
    def _load_private_key(self, pem_string):
        """Load private key from PEM string"""
        key_bytes = pem_string.encode() if isinstance(pem_string, str) else pem_string
        return serialization.load_pem_private_key(
            key_bytes,
            password=None
        )
    
    @staticmethod
    def _base64url_encode(data: bytes) -> str:
        """Encode bytes using base64 URL-safe encoding without padding"""
        return base64.urlsafe_b64encode(data).rstrip(b"=").decode("utf-8")
    
    def _calculate_fingerprint(self):
        """Calculate SHA256 fingerprint of public key"""
        public_key = self.private_key.public_key()
        public_key_der = public_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        sha256_hash = hashlib.sha256(public_key_der).digest()
        return 'SHA256:' + base64.b64encode(sha256_hash).decode('utf-8')
    
    def generate_jwt(self, expiration_minutes=59):
        """Generate JWT token (max 60 minutes)"""
        now = datetime.utcnow()
        qualified_username = f"{self.account}.{self.user}".upper()
        
        payload = {
            "iss": f"{qualified_username}.{self.public_key_fingerprint}",
            "sub": qualified_username,
            "iat": int(now.timestamp()),
            "exp": int((now + timedelta(minutes=expiration_minutes)).timestamp())
        }
        
        header = {"alg": "RS256", "typ": "JWT"}
        header_segment = self._base64url_encode(
            json.dumps(header, separators=(",", ":")).encode("utf-8")
        )
        payload_segment = self._base64url_encode(
            json.dumps(payload, separators=(",", ":")).encode("utf-8")
        )
        signing_input = f"{header_segment}.{payload_segment}".encode("utf-8")
        signature = self.private_key.sign(
            signing_input,
            padding.PKCS1v15(),
            hashes.SHA256()
        )
        signature_segment = self._base64url_encode(signature)
        
        return f"{header_segment}.{payload_segment}.{signature_segment}"

# Initialize authentication
auth = SnowflakeAuth(
    account=config['account'],
    user=config['user'],
    private_key_pem=config['private_key_pem']
)

# Test JWT generation
test_token = auth.generate_jwt()
print(f"‚úÖ JWT authentication initialized")
print(f"   Token preview: {test_token[:50]}...")


In [None]:
# Cell 4: Snowpipe Streaming REST API Client
# Demonstrates the complete REST API workflow

class SnowpipeStreamingClient:
    """Client for Snowflake Snowpipe Streaming REST API"""
    
    def __init__(self, auth, database, schema, pipe):
        self.auth = auth
        self.database = database
        self.schema = schema
        self.pipe = pipe
        
        # Build account URL
        account_for_url = auth.account.replace('_', '-').lower()
        self.account_url = f"https://{account_for_url}.snowflakecomputing.com"
        
        # Session state
        self.control_host = None
        self.ingest_host = None
        self.scoped_token = None
        self.continuation_token = None
    
    def get_control_host(self):
        """Step 1: Get control plane hostname"""
        jwt_token = self.auth.generate_jwt()
        
        response = requests.get(
            f"{self.account_url}/v2/streaming/hostname",
            headers={"Authorization": f"Bearer {jwt_token}"}
        )
        response.raise_for_status()
        
        self.control_host = response.text.strip('"')
        print(f"   Control host: {self.control_host}")
        return self.control_host
    
    def open_channel(self, channel_name):
        """Step 2: Open streaming channel"""
        if not self.control_host:
            self.get_control_host()
        
        jwt_token = self.auth.generate_jwt()
        url = f"https://{self.control_host}/v2/streaming/databases/{self.database}/schemas/{self.schema}/pipes/{self.pipe}:open-channel"
        
        response = requests.post(
            url,
            headers={
                "Authorization": f"Bearer {jwt_token}",
                "Content-Type": "application/json"
            },
            json={"channel_name": channel_name}
        )
        response.raise_for_status()
        
        data = response.json()
        self.ingest_host = data['ingest_host']
        self.scoped_token = data['scoped_token']
        self.continuation_token = data['continuation_token']
        
        print(f"   ‚úÖ Channel '{channel_name}' opened")
        print(f"   Ingest host: {self.ingest_host}")
        return data
    
    def insert_rows(self, channel_name, rows):
        """Step 3: Insert rows via REST API - THIS IS THE KEY DEMO!"""
        url = f"https://{self.ingest_host}/v2/streaming/databases/{self.database}/schemas/{self.schema}/pipes/{self.pipe}/channels/{channel_name}:insert-rows"
        
        response = requests.post(
            url,
            headers={
                "Authorization": f"Bearer {self.scoped_token}",
                "Content-Type": "application/json",
                "X-Snowflake-Streaming-Continuation-Token": self.continuation_token
            },
            json={"rows": rows}
        )
        response.raise_for_status()
        
        result = response.json()
        self.continuation_token = result.get('continuation_token', self.continuation_token)
        
        return result

# Initialize client
client = SnowpipeStreamingClient(
    auth=auth,
    database=config['database'],
    schema=config['schema'],
    pipe=config['pipe']
)

print("‚úÖ Snowpipe Streaming client initialized")


In [None]:
# Cell 5: RFID Badge Event Generator
# Generates realistic badge scan events

class BadgeEventGenerator:
    """Generate realistic RFID badge events"""
    
    def __init__(self, num_users=100, num_zones=20, num_readers=10):
        self.badge_ids = [f"BADGE-{str(i).zfill(5)}" for i in range(1, num_users + 1)]
        self.user_ids = [f"USR-{str(i).zfill(3)}" for i in range(1, num_users + 1)]
        self.zone_ids = [f"ZONE-{zone_type}-{i}" 
                        for zone_type in ["LOBBY", "OFFICE", "CONF", "SECURE", "PARKING"]
                        for i in range(1, (num_zones // 5) + 1)]
        self.reader_ids = [f"RDR-{str(i).zfill(3)}" for i in range(1, num_readers + 1)]
        self.directions = ["ENTRY", "EXIT"]
    
    def generate_event(self, timestamp=None):
        """Generate a single badge event"""
        if timestamp is None:
            timestamp = datetime.utcnow()
        
        user_idx = random.randint(0, len(self.user_ids) - 1)
        
        event = {
            "badge_id": self.badge_ids[user_idx],
            "user_id": self.user_ids[user_idx],
            "zone_id": random.choice(self.zone_ids),
            "event_timestamp": timestamp.isoformat() + "Z",
            "event_type": random.choice(self.directions),
            "reader_id": random.choice(self.reader_ids),
            "signal_strength": random.randint(-85, -20),  # dBm
            "direction": random.choice(self.directions)
        }
        
        return event
    
    def generate_batch(self, count=100, start_time=None):
        """Generate a batch of events"""
        if start_time is None:
            start_time = datetime.utcnow()
        
        events = []
        for i in range(count):
            # Spread events over time (0.01 seconds apart)
            timestamp = start_time + timedelta(seconds=i*0.01)
            events.append(self.generate_event(timestamp))
        
        return events

# Initialize generator
generator = BadgeEventGenerator(num_users=100, num_zones=20, num_readers=10)

# Test generation
sample_event = generator.generate_event()
print("‚úÖ Event generator initialized")
print(f"   Sample event: {json.dumps(sample_event, indent=2)}")


## üöÄ Run the Simulation

This is where the magic happens! We'll:
1. Open a streaming channel
2. Generate badge events
3. Send them via REST API POST
4. Validate they arrived in Snowflake

**This demonstrates the core value:** Direct HTTP ingestion with no middleware!


In [None]:
# Cell 6: Execute Simulation - Send Data via REST API
# This is the main demo of Snowpipe Streaming REST API!

def run_simulation(num_events=1000, batch_size=100):
    """Run RFID simulation - sends data to Snowflake REST API"""
    
    channel_name = f"rfid_channel_{int(time.time())}"
    
    print("="*70)
    print("üöÄ Starting RFID Badge Event Simulation")
    print("="*70)
    print()
    
    # Step 1: Get control host
    print("üì° Step 1: Getting control plane hostname...")
    client.get_control_host()
    print()
    
    # Step 2: Open channel
    print(f"üîì Step 2: Opening streaming channel '{channel_name}'...")
    client.open_channel(channel_name)
    print()
    
    # Step 3: Send events in batches
    print(f"üì§ Step 3: Sending {num_events} events via REST API...")
    total_sent = 0
    start_time = time.time()
    
    num_batches = (num_events + batch_size - 1) // batch_size
    
    for batch_num in range(num_batches):
        # Generate batch
        batch_count = min(batch_size, num_events - total_sent)
        events = generator.generate_batch(batch_count)
        
        # Send via REST API - THIS IS THE KEY DEMO!
        result = client.insert_rows(channel_name, events)
        
        total_sent += batch_count
        elapsed = time.time() - start_time
        rate = total_sent / elapsed if elapsed > 0 else 0
        
        print(f"   Batch {batch_num + 1}/{num_batches}: {batch_count} events sent | "
              f"Total: {total_sent} | Rate: {rate:.0f} events/sec")
        
        # Brief pause between batches
        time.sleep(0.1)
    
    elapsed = time.time() - start_time
    print()
    print("="*70)
    print(f"‚úÖ Simulation Complete!")
    print(f"   Events sent: {total_sent}")
    print(f"   Duration: {elapsed:.2f} seconds")
    print(f"   Average rate: {total_sent/elapsed:.0f} events/sec")
    print("="*70)
    print()
    
    return total_sent

# Run simulation with 1000 events
events_sent = run_simulation(num_events=1000, batch_size=100)


In [None]:
# Cell 7: Validate Data Arrived in Snowflake
# Query the table to confirm REST API ingestion worked

def validate_pipeline():
    """Check that events made it through the pipeline"""
    session = get_session()
    
    print("üîç Validating data pipeline...")
    print()
    
    # Wait a moment for ingestion to complete
    print("   Waiting 5 seconds for ingestion to complete...")
    time.sleep(5)
    
    # Check raw table
    raw_count = session.sql(
        "SELECT COUNT(*) FROM SNOWFLAKE_EXAMPLE.STAGE_BADGE_TRACKING.RAW_BADGE_EVENTS"
    ).collect()[0][0]
    
    # Check staging table
    staging_count = session.sql(
        "SELECT COUNT(*) FROM SNOWFLAKE_EXAMPLE.TRANSFORM_BADGE_TRACKING.STG_BADGE_EVENTS"
    ).collect()[0][0]
    
    # Check analytics table
    analytics_count = session.sql(
        "SELECT COUNT(*) FROM SNOWFLAKE_EXAMPLE.ANALYTICS_BADGE_TRACKING.FCT_ACCESS_EVENTS"
    ).collect()[0][0]
    
    # Check stream status
    stream_has_data = session.sql(
        "SELECT SYSTEM$STREAM_HAS_DATA('SNOWFLAKE_EXAMPLE.STAGE_BADGE_TRACKING.raw_badge_events_stream')"
    ).collect()[0][0]
    
    print("üìä Pipeline Status:")
    print("   " + "="*66)
    print(f"   {'Layer':<20} | {'Row Count':>10} | {'Status':>30}")
    print("   " + "-"*66)
    print(f"   {'RAW':<20} | {raw_count:>10,} | {'‚úÖ Data received' if raw_count > 0 else '‚ùå No data'}")
    print(f"   {'STAGING':<20} | {staging_count:>10,} | {'‚úÖ Processed' if staging_count > 0 else '‚è≥ Processing'}")
    print(f"   {'ANALYTICS':<20} | {analytics_count:>10,} | {'‚úÖ Transformed' if analytics_count > 0 else '‚è≥ Processing'}")
    print("   " + "="*66)
    print(f"   Stream Status: {'‚è≥ Processing' if stream_has_data else '‚úÖ Empty (all processed)'}")
    print()
    
    if raw_count > 0:
        print("   ‚úÖ SUCCESS! REST API ingestion is working!")
        print("   Data flowed: REST API ‚Üí Snowpipe ‚Üí RAW table")
        
        if staging_count == raw_count and analytics_count == raw_count:
            print("   ‚úÖ BONUS! Complete pipeline validated!")
            print("   Data flowed: RAW ‚Üí Streams ‚Üí Tasks ‚Üí STAGING ‚Üí ANALYTICS")
        elif staging_count > 0 or analytics_count > 0:
            print("   ‚è≥ Pipeline still processing... (wait 1-2 minutes for tasks)")
    else:
        print("   ‚ö†Ô∏è  No data in RAW table yet. Wait a few seconds and re-run.")
    
    print()
    
    # Show sample events
    if raw_count > 0:
        print("üìã Sample Events (first 5):")
        sample_df = session.sql(
            "SELECT badge_id, zone_id, event_timestamp, event_type "
            "FROM SNOWFLAKE_EXAMPLE.STAGE_BADGE_TRACKING.RAW_BADGE_EVENTS "
            "ORDER BY ingestion_time DESC LIMIT 5"
        )
        sample_df.show()

# Run validation
validate_pipeline()


## üéØ What We Just Demonstrated

This notebook showcased **Snowflake's Snowpipe Streaming REST API**:

### Key Capabilities:
1. **Native HTTP Ingestion** - No external infrastructure required
2. **JWT Authentication** - Secure key-pair auth with RS256
3. **Channel-Based Streaming** - Isolated streams with continuation tokens
4. **High Performance** - 1000+ events/sec with sub-second batching
5. **Low Latency** - <10 seconds from POST to queryable data

### The API Workflow:
```
1. GET /v2/streaming/hostname
   ‚Üí Returns control plane host

2. POST /v2/streaming/.../pipes/{PIPE}:open-channel
   ‚Üí Opens channel, returns ingest host + scoped token

3. POST /v2/streaming/.../channels/{CHANNEL}:insert-rows
   ‚Üí Sends data via HTTP POST (THIS IS THE STAR!)
   ‚Üí Includes continuation token for ordering

4. Data flows automatically:
   REST API ‚Üí PIPE ‚Üí RAW table ‚Üí Stream ‚Üí Task ‚Üí STAGING ‚Üí ANALYTICS
```

### Why This Matters:
- **Zero middleware** - RFID vendors POST directly to Snowflake
- **Snowflake-native** - No Kafka, no message queues, no external services
- **Production-ready** - GA since September 2024, supports 10 GB/sec per table
- **Cost-efficient** - Throughput-based pricing, no compute overhead

---

## üìö Next Steps

1. **View the data:**
   ```sql
   SELECT * FROM SNOWFLAKE_EXAMPLE.ANALYTICS_BADGE_TRACKING.FCT_ACCESS_EVENTS
   ORDER BY event_timestamp DESC LIMIT 100;
   ```

2. **Test with curl:**
   See `README.md#tldr` for direct curl commands to hit the REST API

3. **Explore the pipeline:**
   - Streams: `SHOW STREAMS IN DATABASE SNOWFLAKE_EXAMPLE;`
   - Tasks: `SHOW TASKS IN DATABASE SNOWFLAKE_EXAMPLE;`
   - Monitoring: Query `sql/03_monitoring/monitoring_views.sql`

4. **Customize:**
   - Modify event schema in `sql/01_setup/02_raw_table.sql`
   - Add transformations in PIPE: `sql/01_setup/03_pipe_object.sql`
   - Extend analytics model with new dimensions/facts
