# FlinkSQL vs MySQL 

Implement the following FlinkSQL query using mysql and python. The goal is emulating FlinkSQL behavior as close as possible. You need to test different ingestion rates and batching strategy for the ingestion, and focus on the implementation of the reporting.

-  you can tune the ingestion rate
-  you can implement the window semantics in the where clause or by micro batching
-  you can use triggers or any SQL construct that you know
-  you can assume that processing time and event time coincide
-  you can create as many supporting structure 
-  DON'T FORGET TO REMOVE OLD DATA (USING ANOTHER THREAD?)



## FLINK SQL Query

You can run the FlinkSQL query below by ssh to the sql-client container

```docker exec -it stream_sql-client_1 ./sql-client.sh```

```sql 
SELECT 
TUMBLE_START(ts, INTERVAL '1' MINUTE) AS openW, 
TUMBLE_END(ts, INTERVAL '1' MINUTE) AS closeW, 
country, 
COUNT(*) AS cnt 
FROM Clicks 
GROUP BY country, TUMBLE(ts, INTERVAL '1' MINUTE);
```

<img src="flinkout.png" alt="5" border="0">

## Data Ingestion

For the ingestion, Flink will read directly from Apache Kafka topic ```clicks```. On the other hand, MySQL ingestion will make use of an SQL query.

Before getting started, enure that the following python libraries are installed.

In [None]:
! pip install mysql-connector-python 

In [None]:
! pip install confluent_kafka

## Kafka Ingestion for Flink

This producer is relevant for Apache Flink

In [54]:
from confluent_kafka import Producer
import sys, json
topic = "clicks"
brokers = "kafka:9092" # Brokers act as cluster entripoints
conf = {'bootstrap.servers': brokers}

In [55]:
p = Producer(**conf)

In [56]:
def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)

In [57]:
from datetime import datetime

def to_kafka(ms):
    for m in ms:
        k = {}
        k['user']=m[0]
        k['country']=m[1]
        k['eventTime']=datetime.utcfromtimestamp(m[2]).strftime('%Y-%m-%dT%H:%M:%SZ')
        p.produce(topic, json.dumps(k), callback=delivery_callback)
    p.poll(0)
    p.flush()

## MySQL ingestion

The code below sets up the ingestion in MySQL.

A database named stream is created and will be used for the querying.

In [58]:
import mysql.connector

mydb = mysql.connector.connect(
  host="mysql",
  user="root",
  password="pass1234"
)


mycursor = mydb.cursor()

In [59]:
mycursor.execute("CREATE DATABASE stream")

DatabaseError: 1007 (HY000): Can't create database 'stream'; database exists

In [60]:
mycursor.execute("SHOW DATABASES")
for x in mycursor:
  print(x) 

('db',)
('information_schema',)
('mysql',)
('performance_schema',)
('stream',)
('sys',)


In [61]:
mycursor.execute("USE stream")

In [62]:
mycursor.execute("CREATE TABLE clicks (user VARCHAR(255), country VARCHAR(255), timestamp INT)")
for x in mycursor:
  print(x) 

ProgrammingError: 1050 (42S01): Table 'clicks' already exists

In [63]:
sql = "INSERT INTO clicks (user, country, timestamp) VALUES (%s, %s, %s)"
def to_my_sql(val):
    mycursor.executemany(sql, val)
    mydb.commit()

## Data Generator
The data generator here sends data to mysql and a kafka topic for Flink testing

In [64]:
userNames = ["Andy", "Bob", "Carl", "Dave", "Esther", "Fanny", "Gabe", "Imogen", "John", "Louis", "Monica"];

In [65]:
regionNames = ["America", "Europe", "Asia", "Australia", "Africa"];

In [66]:
import random
import sys
import time

## These are the parameter to control in order to test the ingestion rate

In [67]:
batch_window= 60 # seconds
batches_per_window= 1000
batch_size = 500

The datagen below automatically ingest in Kafka and MySQL using the functions defined above.

In [None]:
while True:
    for i in range(0,batches_per_window):
        val = []
        for i in range(0,batch_size):
            ts = int(time.time())
            val.append((random.sample(userNames, 1)[0],random.sample(regionNames, 1)[0], ts))
        to_my_sql(val) # Send the data to mysql
        to_kafka(val) # Send the data to Flink via Kafka
    time.sleep(batch_window)

# COPY THIS NOTEBOOK TO CONTINUE INGESTING IN PARALLEL

<img src="copy.png" alt="5" border="0" width='500pt'>

In [None]:
import mysql.connector

mydb = mysql.connector.connect(
  host="mysql",
  user="root",
  password="pass1234"
)

mycursor = mydb.cursor()
mycursor.execute("USE stream")

In [None]:
mycursor.execute("SELECT * FROM clicks LIMIT 10")
for x in mycursor:
  print(x) 