# Consumer

In [27]:
!pip3 -q install confluent_kafka



In [None]:
%%sql
import os
import json
from confluent_kafka import Consumer, KafkaError
import singlestoredb as s2
from datetime import datetime
import logging
import socket
import time
from typing import Dict, List, Optional, Any

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [28]:
TARGET_HOST = s2.manage_workspaces().organizations.current.get_secret('host').value
TARGET_USER = s2.manage_workspaces().organizations.current.get_secret('user').value
TARGET_PASSWORD = s2.manage_workspaces().organizations.current.get_secret('ad_pwd').value
TARGET_DATABASE = "f1"

# Create connection
conn = s2.connect(
        host=TARGET_HOST,
        port='3306',
        user=TARGET_USER,
        password=TARGET_PASSWORD,
        database=TARGET_DATABASE
    )

In [29]:
class F1SingleStorePipeline:
    def __init__(self, conn, max_retries=3, retry_delay=5):
        """Initialize pipeline with an existing connection"""
        self.conn = conn
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self._init_data_types()
        self.driver_timestamps = {}
        self._load_config()
        
    def _ensure_connection(self):
        """Verify connection is active"""
        try:
            with self.conn.cursor() as cur:
                return cur.is_connected()
        except Exception:
            return False

    def _init_data_types(self):
        """Initialize data type configurations"""
        self.data_types = {
            'car': {
                'topic': 'topic_1',
                'table': 'f1_car_data',
                'pipeline_name': 'f1_car_pipeline',
                'schema': """
                    CREATE TABLE IF NOT EXISTS f1_car_data (
                        id VARCHAR(255) PRIMARY KEY,
                        brake INT,
                        drs INT,
                        n_gear INT,
                        rpm INT,
                        speed INT,
                        throttle INT,
                        driver_number INT,
                        date DATETIME,
                        session_key INT,
                        meeting_key INT,
                        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
                    )
                """,
                'pipeline_fields': """
                    `id` <- payload::id,
                    `brake` <- payload::brake,
                    `drs` <- payload::drs,
                    `n_gear` <- payload::n_gear,
                    `rpm` <- payload::rpm,
                    `speed` <- payload::speed,
                    `throttle` <- payload::throttle,
                    `driver_number` <- payload::driver_number,
                    `date` <- payload::date,
                    `session_key` <- payload::session_key,
                    `meeting_key` <- payload::meeting_key
                """
            }
            # Other data types can be added back as needed
        }

    # Set your own values ( these values no longer work ;) )
    def _load_config(self):
        """Load Kafka configuration"""
        self.kafka_config = {
            'bootstrap.servers': 'pkc-p11xm.us-east-1.aws.confluent.cloud:9092',
            'security.protocol': 'SASL_SSL',
            'sasl.mechanisms': 'PLAIN',
            'sasl.username': '7G3VW2EN5K6RSYO3',
            'sasl.password': 'lKDw9j5VJhg+QYsufy8x82HOUIy2K7Bl92/N3fMg1FNbUc/rb2oRjBRXQvGo/LAm',
            'group.id': 'f1_consumer_group',
            'auto.offset.reset': 'earliest',
            'session.timeout.ms': 45000,
            'client.id': 'ccloud-python-client-609ae157-eaef-4276-ae9a-a3a81742c6a2'
        }

    def create_tables(self, data_types: List[str]):
        """Create tables for specified data types"""
        if not self._ensure_connection():
            raise ConnectionError("Database connection is not active")
            
        cursor = self.conn.cursor()
        for data_type in data_types:
            try:
                cursor.execute(self.data_types[data_type]['schema'])
                logger.info(f"Table {self.data_types[data_type]['table']} created/verified successfully")
            except Exception as e:
                logger.error(f"Failed to create table for {data_type}: {e}")
                raise

    def setup_pipeline(self, data_type: str):
        """Set up SingleStore pipeline for a data type"""
        if not self._ensure_connection():
            raise ConnectionError("Database connection is not active")
            
        try:
            config = self.data_types[data_type]
            cursor = self.conn.cursor()

            # Drop existing pipeline
            drop_pipeline_query = f"DROP PIPELINE IF EXISTS {config['pipeline_name']}"
            cursor.execute(drop_pipeline_query)
            logger.info(f"Dropped existing pipeline {config['pipeline_name']}")

            # Create pipeline
            create_pipeline_query = f"""
            CREATE PIPELINE {config['pipeline_name']}
            AS LOAD DATA KAFKA '{self.kafka_config['bootstrap.servers']}/{config['topic']}'
            CONFIG '{{
                "client.id": "{self.kafka_config['client.id']}",
                "sasl.username": "{self.kafka_config['sasl.username']}",
                "sasl.mechanism": "PLAIN",
                "security.protocol": "SASL_SSL",
                "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"
            }}'
            CREDENTIALS '{{
                "sasl.password": "{self.kafka_config['sasl.password']}"
            }}'
            SKIP DUPLICATE KEY ERRORS
            INTO TABLE {config['table']}
            FORMAT JSON
            (
                {config['pipeline_fields']}
            )
            """
            print(create_pipeline_query)
            cursor.execute(create_pipeline_query)
            logger.info(f"Created pipeline {config['pipeline_name']}")

        except Exception as e:
            logger.error(f"Failed to setup pipeline for {data_type}: {e}")
            raise

    def start_pipeline(self, data_type: str):
        """Start a SingleStore pipeline"""
        if not self._ensure_connection():
            raise ConnectionError("Database connection is not active")
            
        try:
            cursor = self.conn.cursor()
            pipeline_name = self.data_types[data_type]['pipeline_name']
            cursor.execute(f"START PIPELINE {pipeline_name}")
            logger.info(f"Started pipeline {pipeline_name}")
        except Exception as e:
            logger.error(f"Failed to start pipeline {pipeline_name}: {e}")
            raise


    def trigger_pipeline(self, data_type: str):
        """Create, start, and monitor a single pipeline"""
        if not self._ensure_connection():
            raise ConnectionError("Database connection is not active")
            
        try:
            logger.info(f"Setting up pipeline for {data_type}...")
            
            # Create table
            self.create_tables([data_type])
            
            # Setup and start pipeline
            self.setup_pipeline(data_type)
            self.start_pipeline(data_type)
            
            logger.info(f"Pipeline for {data_type} started successfully")
            
            # Return initial status
            return 1
            
        except Exception as e:
            logger.error(f"Failed to trigger pipeline for {data_type}: {e}")
            raise
    
    def monitor_pipeline(self, data_type: str):
            """Monitor pipeline status"""
            if not self._ensure_connection():
                raise ConnectionError("Database connection is not active")
                
            try:
                cursor = self.conn.cursor()
                pipeline_name = self.data_types[data_type]['pipeline_name']
                cursor.execute(f"""
                    SELECT 
                        pipeline_state as status,
                        error_message as error,
                        inserted_rows as records_inserted,
                        processed_rows as records_processed
                    FROM information_schema.pipelines_summary 
                    WHERE pipeline_name = '{pipeline_name}'
                """)
                status = cursor.fetchone()
                if status:
                    logger.info(f"Pipeline {pipeline_name}: Status={status[0]}, "
                              f"Records Inserted={status[2]}, Records Processed={status[3]}")
                    if status[1]:  # Error
                        logger.error(f"Pipeline error: {status[1]}")
                return status
            except Exception as e:
                logger.error(f"Failed to monitor pipeline {pipeline_name}: {e}")
                raise

In [None]:
%%sql
-- This ensure that pipeline is updated under 
ALTER PIPELINE f1_car_pipeline SET BATCH_INTERVAL = '1 second';

In [30]:
%%sql
# Clean up historical data
Truncate table f1_car_data;

In [16]:
# Create pipeline instance
pipeline = F1SingleStorePipeline(conn)

# Trigger the car pipeline
data_type = 'car'
pipeline.trigger_pipeline(data_type)
print("Pipeline triggered")

2025-02-24 15:38:19,685 - __main__ - INFO - Setting up pipeline for car...
2025-02-24 15:38:19,750 - __main__ - INFO - Table f1_car_data created/verified successfully
2025-02-24 15:38:19,821 - __main__ - INFO - Dropped existing pipeline f1_car_pipeline



            CREATE PIPELINE f1_car_pipeline
            AS LOAD DATA KAFKA 'pkc-p11xm.us-east-1.aws.confluent.cloud:9092/topic_1'
            CONFIG '{
                "client.id": "ccloud-python-client-609ae157-eaef-4276-ae9a-a3a81742c6a2",
                "sasl.username": "7G3VW2EN5K6RSYO3",
                "sasl.mechanism": "PLAIN",
                "security.protocol": "SASL_SSL",
                "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"
            }'
            CREDENTIALS '{
                "sasl.password": "lKDw9j5VJhg+QYsufy8x82HOUIy2K7Bl92/N3fMg1FNbUc/rb2oRjBRXQvGo/LAm"
            }'
            SKIP DUPLICATE KEY ERRORS
            INTO TABLE f1_car_data
            FORMAT JSON
            (
                
                    `id` <- payload::id,
                    `brake` <- payload::brake,
                    `drs` <- payload::drs,
                    `n_gear` <- payload::n_gear,
                    `rpm` <- payload::rpm,
                 

2025-02-24 15:38:21,202 - __main__ - INFO - Created pipeline f1_car_pipeline
2025-02-24 15:38:21,271 - __main__ - INFO - Started pipeline f1_car_pipeline
2025-02-24 15:38:21,272 - __main__ - INFO - Pipeline for car started successfully


Pipeline triggered


In [40]:
%%sql
# Check the created table 
Select count(*) from f1_car_data;

Unnamed: 0,count(*)
0,71


In [23]:
%%sql
Select * from f1_car_data limit 10;

Unnamed: 0,id,brake,drs,n_gear,rpm,speed,throttle,driver_number,date,session_key,meeting_key,created_at
0,ee150dc0-7fc1-48ba-8774-3441bcce79bc,0,8,3,7222,87,57,63,2023-09-16 13:06:40,9161,1219,2025-02-24 15:38:24
1,f2cc99d8-67b7-4139-b254-8c629916964b,0,8,3,6585,91,9,63,2023-09-16 13:07:40,9161,1219,2025-02-24 15:38:24
2,e932e3ad-52a8-4cb4-b455-bfd1103bf423,0,8,2,5487,62,13,63,2023-09-16 13:08:09,9161,1219,2025-02-24 15:38:24
3,fe029d67-d9e1-4e20-94e6-eb0aa40daa98,100,8,6,9237,199,0,63,2023-09-16 13:08:53,9161,1219,2025-02-24 15:38:24
4,c0b5c700-ce5c-40c1-af64-ef455a474e02,0,8,3,9638,126,89,63,2023-09-16 13:05:51,9161,1219,2025-02-24 15:38:24
5,b3e195be-5c35-4052-909f-68d5c2028348,0,12,8,10907,295,100,63,2023-09-16 13:06:04,9161,1219,2025-02-24 15:38:24
6,11153300-89cd-4021-9455-59b686626e7d,0,8,7,11193,274,4,63,2023-09-16 13:07:14,9161,1219,2025-02-24 15:38:24
7,cb7d3243-8b82-44fd-9ccd-3e931ab53c28,0,8,3,6530,92,0,63,2023-09-16 13:07:22,9161,1219,2025-02-24 15:38:24
8,6a46e9fa-94b9-437b-8997-28d042adc706,0,8,3,6755,92,13,63,2023-09-16 13:07:41,9161,1219,2025-02-24 15:38:24
9,2740aa14-bb7a-46e1-bf56-a6aa4d6f139a,100,8,4,6360,103,0,63,2023-09-16 13:07:57,9161,1219,2025-02-24 15:38:24


## Stop the simluation and cleanup resources

In [7]:
%%sql
Stop pipeline f1_car_pipeline;