In [None]:
%%sql
show tables;

In [None]:
%%sql
CREATE TABLE s3_table (
    ID BIGINT,
    Name VARCHAR(50),
    Price FLOAT
);

In [None]:
%%sql
CREATE TABLE kafka_events (
    registertime BIGINT,
    userid VARCHAR(255),
    regionid VARCHAR(255),
    gender VARCHAR(10)
);

In [None]:
%%sql
CREATE TABLE sample_entries (
    ID BIGINT,
    Name VARCHAR(50),
    Price FLOAT
);

In [None]:
%%sql
Show tables;

# Ingesting data via S3

In [None]:
%%sql

CREATE OR REPLACE PIPELINE s3_pipeline AS LOAD DATA
S3 's3://brendan-s2-test/sample_csv_file.csv'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "{{aws_access_key_id}}", "aws_secret_access_key": "{{aws_secret_access_key}}", "aws_session_token":"{{aws_session_token}}"}'
INTO TABLE s3_table
FORMAT CSV
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
IGNORE 1 LINES;

# Starting S3 pipeline

In [None]:
%%sql
test pipeline s3_pipeline LIMIT 5;

In [None]:
%%sql
select * from s3_table;

In [None]:
%%sql
start pipeline s3_pipeline;

In [None]:
%%sql
select * from s3_table;

In [None]:
%%sql
stop pipeline s3_pipeline;

In [None]:
%%sql
drop pipeline s3_pipeline;

# Ingesting data via Kafka

In [None]:
CONFLUENT_KAFKA_TOPIC_NAME = 'your topic name'
CONFLUENT_KAFKA_CLIENT_ID = 'your client id'
CONFLUENT_CLUSTER_BOOTSTRAP_SERVER = 'your bootstrap server'
CONFLUENT_API_KEY = 'your api key'
CONFLUENT_API_SECRET = 'your api secret'
CONFLUENT_SCHEMA_REGISTRY_URL = 'your schema registry url'
CONFLUENT_SCHEMA_REGISTRY_KEY = 'your schema registry key'
CONFLUENT_SCHEMA_REGISTRY_SECRET = 'your schema registry secret'

In [None]:
%%sql
CREATE OR REPLACE PIPELINE kafka_pipeline
AS LOAD DATA KAFKA '{{CONFLUENT_CLUSTER_BOOTSTRAP_SERVER}}/{{CONFLUENT_KAFKA_TOPIC_NAME}}'
CONFIG '{ \"client.id\": \"{{CONFLUENT_KAFKA_CLIENT_ID}}\",\n         \"sasl.username\": \"{{CONFLUENT_API_KEY}}\",\n         \"sasl.mechanism\": \"PLAIN\",\n         \"security.protocol\": \"SASL_SSL\",\n         \"ssl.ca.location\": \"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem\",\n \"schema.registry.username\": \"{{CONFLUENT_SCHEMA_REGISTRY_KEY}}\"}'
CREDENTIALS '{\"sasl.password\": \"{{CONFLUENT_API_SECRET}}\",\n \"schema.registry.password\": \"{{CONFLUENT_SCHEMA_REGISTRY_SECRET}}\"}'
DISABLE OFFSETS METADATA GC
INTO TABLE kafka_events
FORMAT AVRO
SCHEMA REGISTRY '{{CONFLUENT_SCHEMA_REGISTRY_URL}}'
(
    registertime  <-  registertime,
    userid  <-  userid,
    regionid  <-  regionid,
    gender  <-  gender
);

# Starting Kafka pipeline



In [None]:
%%sql
test pipeline kafka_pipeline LIMIT 5;

In [None]:
%%sql
select * from kafka_events;

In [None]:
%%sql
start pipeline kafka_pipeline;

In [None]:
%%sql
select * from kafka_events LIMIT 50;

In [None]:
%%sql
stop pipeline kafka_pipeline;

In [None]:
%%sql
drop pipeline kafka_pipeline;

# Ingesting data via CSV

In [None]:
%%sql
CREATE OR REPLACE PIPELINE csv_pipeline AS
LOAD DATA STAGE 'sample_csv_file.csv'
INTO TABLE sample_entries
FORMAT CSV
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
IGNORE 1 LINES;

# Starting CSV pipeline

In [None]:
%%sql
test pipeline csv_pipeline LIMIT 5;

In [None]:
%%sql
select * from sample_entries;

In [None]:
%%sql
start pipeline csv_pipeline;

In [None]:
%%sql
select * from sample_entries;

In [None]:
%%sql
drop pipeline csv_pipeline;