This notebook demonstrates how to do a real-time data ingest data from Kafka into MemSQL and immediately run a ML model on the ingested data.
	
#### Prerequisites
To run code in this notebook you'll need:
* A MemSQL instance.  You can get a **free** trial cloud cluster at https://portal.memsql.com/
* A Kafka instance you plan to ingest data from
* Pandas, NumPy, and XGBoost libraires installed (see `README.md` file at the root of this repository)

In [1]:
import xgboost as xgb
import pandas as pd
import memsql
import numpy as np

### Reading Data

We use the same dataset as in Amazon's Sagemaker [tutorial](https://aws.amazon.com/getting-started/hands-on/build-train-deploy-machine-learning-model-sagemaker/).

In [2]:
all_data = pd.read_csv('data/bank_clean.csv', index_col=0)
print("number of rows:", len(all_data.index))
all_data.head()

number of rows: 41188


Unnamed: 0,age,campaign,pdays,previous,no_previous_contact,not_working,job_admin,job_blue_collar,job_entrepreneur,job_housemaid,...,day_of_week_fri,day_of_week_mon,day_of_week_thu,day_of_week_tue,day_of_week_wed,poutcome_failure,poutcome_nonexistent,poutcome_success,y_no,y_yes
0,56,1,999,0,1,0,0,0,0,1,...,0,1,0,0,0,0,1,0,1,0
1,57,1,999,0,1,0,0,0,0,0,...,0,1,0,0,0,0,1,0,1,0
2,37,1,999,0,1,0,0,0,0,0,...,0,1,0,0,0,0,1,0,1,0
3,40,1,999,0,1,0,1,0,0,0,...,0,1,0,0,0,0,1,0,1,0
4,56,1,999,0,1,0,0,0,0,0,...,0,1,0,0,0,0,1,0,1,0


Splitting data as 80% train set, 20% as test set.

In [3]:
train_data=all_data.sample(frac=0.8, random_state=200) #random state is a seed value
test_data=all_data.drop(train_data.index)

In [4]:
print("number of rows in train set:", len(train_data.index))
print("number of rows in  test set:", len( test_data.index))

number of rows in train set: 32950
number of rows in  test set: 8238


Converting from pandas dataframe to NumPy matrix for the XGBoost

In [5]:
X_train = train_data.drop(['y_no', 'y_yes'], axis=1).to_numpy()
y_train = train_data['y_yes'].to_numpy()
X_test = test_data.drop(['y_no', 'y_yes'], axis=1).to_numpy()
y_test = test_data['y_yes'].to_numpy()

### Training the Model

In [6]:
clf = xgb.XGBClassifier()
clf.fit(X_train, y_train, eval_set=[(X_test, y_test)], eval_metric='error', early_stopping_rounds=10, verbose=0)
booster = clf.get_booster()
booster.feature_names = list(train_data.drop(['y_yes', 'y_no'], axis=1).columns)

### Deploying the Model to MemSQL

First, connect to MemSQL.  See `README.md` at the top of this repository about getting all the necessary values. Please, enter your credentials into the call below

In [7]:
from memsql.common import database
memsql_host="<enter_your_host>"
memsql_port=3306
memsql_user="root"
memsql_password=""

memsql_conn = database.connect(
    host=memsql_host, port=memsql_port, 
    user=memsql_user, password=memsql_password)

memsql_conn.query('CREATE DATABASE IF NOT EXISTS testsm')
memsql_conn.query('USE testsm');

Deploy the model:

In [8]:
import lib.memsql_udf as udf_tool
udf_tool.upload_xgb_to_memsql(booster, memsql_conn, 'apply_trees', allow_overwrite=True)

Lets also load the data to into the `bank` table:

In [9]:
import lib.memsql_csv as csv_tool
memsql_conn.query("DROP TABLE IF EXISTS bank")
csv_tool.load_csv_to_table("data/bank_clean.csv", "bank", ["id"] + list(all_data.columns), memsql_conn)

To simulate real-time ingest we need to first populate Kafka topic using [SELECT ... INTO KAFKA ...](https://docs.memsql.com/v7.1/reference/sql-reference/data-manipulation-language-dml/select/#select--into-kafka-) 

#### Please, enter your Kafka connection information into the call below

<b>kafka_topic_endpoint</b> is the list of Kafka brokers, followed by the topic to which MemSQL will publish messages. For each broker in the list, specify its host and port.

<b>kafka_config</b> is the Kafka configuration properties, specified in JSON format, that are used while MemSQL publishes messages to a topic. The configuration properties are the same properties that are stored in the server.properties file on each Kafka broker.

<b>kafka_credentials</b> is the credentials, in JSON format, used to connect to Kafka.

<b>Example 1:</b>
```python
kafka_topic_endpoint = "host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/test-topic"

kafka_config = '''
{
    "security.protocol": "ssl",
    "ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
    "ssl.key.location": "/var/private/ssl/client_memsql_client.key",
    "ssl.ca.location": "/var/private/ssl/ca-cert.pem"
}
'''
    
kafka_credentials = '''
{
    "ssl.key.password": "abcdefgh"
}
'''
```
<b>Example 2:</b>
```python
kafka_topic_endpoint = "host.confluent.cloud:9092/topic"

kafka_config = '''
{
    "sasl.mechanism": "PLAIN",
    "security.protocol": "SASL_SSL",
    "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"
}
'''

kafka_credentials = '''
{
    "sasl.username": "1234ASDSAD",
    "sasl.password": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXAAAAAAAAAAAAAAAAAAAAAAAAA"
}
'''
```

In [10]:
kafka_topic_endpoint = "<enter_your_endpoint>"
kafka_config = "<enter_your_config>"
kafka_credentials = "<enter_your_credentials>"

memsql_conn.query(
    f'''
    SELECT * FROM bank INTO KAFKA '{kafka_topic_endpoint}'
    CONFIG '{kafka_config}'
    CREDENTIALS '{kafka_credentials}'
    '''
);

Now for the actual ingest from Kafka topic to MemSQL using [Kafka pipelines](https://docs.memsql.com/v6.8/concepts/pipelines/kafka-extractor/) and computing the UDF inside of the stored procedure

In [11]:
memsql_conn.query("DROP TABLE IF EXISTS res")
memsql_conn.query("CREATE TABLE res(age DOUBLE NOT NULL, campaign DOUBLE NOT NULL, pdays DOUBLE NOT NULL, not_working DOUBLE NOT NULL, expected DOUBLE NOT NULL, predicted DOUBLE NOT NULL)")
memsql_conn.query(
    f'''
    CREATE OR REPLACE PROCEDURE process_kafka_data(pipe query({", ".join([f"{f} DOUBLE NOT NULL" for f in ["id"] + list(all_data.columns)])})) AS
    BEGIN
        INSERT INTO res(age, campaign, pdays, not_working, expected, predicted) 
        SELECT y_yes, apply_trees({", ".join(booster.feature_names)})
        FROM pipe;
    END
    '''
)
memsql_conn.query("DROP PIPELINE IF EXISTS process_kafka_data")
memsql_conn.query(
    f'''
    CREATE PIPELINE process_kafka_data 
    AS LOAD DATA KAFKA '{kafka_topic_endpoint}'
    CONFIG '{kafka_config}'
    CREDENTIALS '{kafka_credentials}'
    INTO PROCEDURE `process_kafka_data`
    '''
)
memsql_conn.query("START PIPELINE process_kafka_data");

Wait while all data will be loaded.

In [12]:
import time
while memsql_conn.query("SELECT COUNT(*) AS size FROM res")[0]["size"] < len(all_data):
    time.sleep(0.5)

Let's select a few rows of the result:

In [13]:
rows = memsql_conn.query("SELECT * FROM res LIMIT 5")
pd.DataFrame([dict(r) for r in rows]).head()

Unnamed: 0,expected,predicted
0,0.0,0.027024
1,0.0,0.027967
2,0.0,0.03007
3,0.0,0.029483
4,0.0,0.021706
