# EC2 Kafka + SingleStore Ingestion

### Database Configuration

In [None]:
%%sql
CREATE DATABASE IF NOT EXISTS ec2_kafka;

In [None]:
%%sql
USE ec2_kafka;

### Table Creation

In [None]:
%%sql
CREATE TABLE vehicle_log (
    event_id VARCHAR(50), 
    timestamp DATETIME,
    event_type VARCHAR(50),
    description VARCHAR(50),
    related_vehicle_id VARCHAR(50),
    additional_info JSON
);

CREATE TABLE browser_log (
    event_id VARCHAR(50),
    timestamp DATETIME,
    event_type VARCHAR(50),
    page VARCHAR(50),
    browser VARCHAR(50),
    metadata JSON
);

CREATE TABLE user_info (
    user_id VARCHAR(50),
    sign_up DATETIME,
    user_type VARCHAR(50),
    email VARCHAR(50),
    phone_number VARCHAR(50)
);

### Pipeline Creation

In [None]:
%%sql
-- Table for procedure
CREATE TABLE pipeline_config_table (
    pipeline_name TEXT,
    topic_name TEXT,
    table_statement TEXT
);

In [None]:
%%sql
-- Main procedure to connect with Confluent server
CREATE OR REPLACE PROCEDURE call_create_pipelines_from_table() AS
DECLARE
    KAFKA_SERVER TEXT = '<SERVER_IP>';
    pipeline_data QUERY(pipeline_name TEXT, topic_name TEXT, table_statement TEXT) = 
        SELECT pipeline_name, topic_name, table_statement 
        FROM pipeline_config_table;
BEGIN
    FOR x IN COLLECT(pipeline_data) LOOP
        CALL create_pipelines(KAFKA_SERVER, x.pipeline_name, x.topic_name, x.table_statement);
    END LOOP;
END

In [None]:
%%sql
CREATE OR REPLACE PROCEDURE create_pipelines(
    EC2_PUBLIC_IP TEXT,
    S2_PIPELINE_NAME TEXT,
    TOPIC_NAME TEXT,
    TABLE_STATEMENT TEXT
)
AS
BEGIN    
    EXECUTE IMMEDIATE CONCAT("CREATE OR REPLACE PIPELINE ", S2_PIPELINE_NAME ," AS LOAD DATA KAFKA '", EC2_PUBLIC_IP, "/", TOPIC_NAME, "' INTO TABLE ", TABLE_STATEMENT, " FORMAT JSON;");
END;

In [None]:
-- Paste INSERT statements if schema mapping used
NSERT INTO pipeline_config_table VALUES('vehicle_log_pipeline', 'vehicle_topic', 'vehicle_log(event_id <- event_id, timestamp <- timestamp, event_type <- event_type, description <- description, related_vehicle_id <- related_vehicle_id, additional_info <- additional_info)'),('browser_log_pipeline', 'browser_topic', 'browser_log (event_id <- event_id, timestamp <- timestamp, event_type <- event_type, page <- page, browser <- browser, metadata <- metadata)'),('user_info_pipeline', 'user_topic', 'user_info(user_id <- user_id, sign_up <- sign_up, user_type <- user_type, email <- email, phone_number <- phone_number)')

In [None]:
%%sql
-- Looking at the pipeline_config_table
SELECT * FROM pipeline_config_table LIMIT 5;

In [None]:
%%sql
-- Provisions all the pipelines
CALL call_create_pipelines_from_table();

In [None]:
%%sql
SHOW PIPELINES;

In [None]:
%%sql
START ALL PIPELINES;

### Verify

In [None]:
%%sql
SELECT COUNT(*) FROM vehicle_log;

In [None]:
%%sql
SELECT * FROM vehicle_log LIMIT 5;

In [None]:
%%sql
-- Check for errors
SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, PARTITION, BATCH_SOURCE_PARTITION_ID,
   ERROR_KIND, ERROR_CODE, ERROR_MESSAGE, LOAD_DATA_LINE_NUMBER, LOAD_DATA_LINE
FROM information_schema.PIPELINES_ERRORS;

In [None]:
%%sql
-- Checking for pipeline errors
SELECT * FROM information_schema.PIPELINES_BATCHES  WHERE BATCH_PARTITION_STATE="Failed";

### Cleanup

In [None]:
%%sql
STOP ALL PIPELINES;

In [None]:
%%sql
DROP DATABASE ec2_kafka;