# **Columnar Tutorial**

In [None]:
# Test cell
print("Hello World")

### **SETUP**

In [None]:
import mysql.connector
import clickhouse_connect
import pandas as pd
import time

In [None]:
# Configuration MySQL
MYSQL_CONFIG = {
    "host": "mysql",
    "user": "user",
    "password": "password",
    "database": "testdb",
}

# Configuration ClickHouse
CLICKHOUSE_CONFIG = {
    "host": "clickhouse",
    "user": "default",
    "password": "password",
    "port": 8123,
}

### **TEST DES CONTENEURS**

In [None]:
# Connexion MySQL
def test_mysql():
    conn = mysql.connector.connect(**MYSQL_CONFIG)
    cursor = conn.cursor()

    cursor.execute("DROP TABLE IF EXISTS test_table;")
    cursor.execute("""
        CREATE TABLE test_table (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name CHAR(10),
            value INT
        );
    """)

    cursor.execute("INSERT INTO test_table (name, value) VALUES ('test', 100), ('test', 200);")
    conn.commit()

    cursor.execute("SELECT * FROM test_table LIMIT 1;")
    print("MySQL :", cursor.fetchone())

    cursor.close()
    conn.close()

In [None]:
# Connexion ClickHouse
def test_clickhouse():
    client = clickhouse_connect.get_client(**CLICKHOUSE_CONFIG)

    client.command("DROP TABLE IF EXISTS test_table;")
    client.command("""
        CREATE TABLE test_table (
            id UInt32,
            name String,
            value Int32
        ) ENGINE = MergeTree()
        ORDER BY id;
    """)

    client.insert("test_table", [[1, 'test', 100], [2, 'test', 200]])

    result = client.query("SELECT * FROM test_table LIMIT 1;")
    print("ClickHouse :", result.result_rows[0])

In [None]:
# Should print "MySQL : (1, 'test', 100)"
test_mysql()

In [None]:
# Should print "ClickHouse : (1, 'test', 100)"
test_clickhouse()

Instantiation des connecteurs / client pour les db

In [None]:
client = clickhouse_connect.get_client(**CLICKHOUSE_CONFIG)
conn = mysql.connector.connect(**MYSQL_CONFIG)

Fonction pour simplifier les requêtes mysql

In [None]:
def sql_query(query) :
    cursor = conn.cursor()
    cursor.execute(query)
    res = cursor.fetchall()
    cursor.close()
    return res

Chargement du dataset. Il comporte un ensemble d'information sur des trajets de taxi à New York.
Le dataset a été réduit à 50.000 données.

In [None]:
df = pd.read_csv(
    "./dataset/input_data.csv",
    usecols=[
        "VendorID",
        "passenger_count",
        "trip_distance",
        "fare_amount",
        "tip_amount",
        "tolls_amount",
    ],
)

## Découverte de la syntaxe

`client.command(query)` permet d'effectuer les opérations sur les tables (création, destruction)

In [None]:
client.command("DROP TABLE IF EXISTS dnd;")
client.command("""
    CREATE TABLE dnd (
        id UInt32,
        name VARCHAR(24),
        age UInt8,
        strengh Float32,
        charisma Float32,
        agility Float32,
        intelligence Float32
    ) ENGINE = MergeTree()
    ORDER BY id;
""")

`client.insert(table, data)` permet d'ajouter un ensemble de donnée à une table

`data = [(...), (...), ...]`

In [None]:
data = [
    (1, "Alice", 23, 60, 80, 50, 55),
    (2, "Bob", 24, 60, 75, 65, 45),
    (3, "Charlie", 23, 90, 60, 45, 50),
    (4, "David", 23, 70, 70, 55, 50),
    (5, "Eleanore", 22, 60, 80, 40, 65),
]
client.insert("dnd", data)

`client.query(query)` permet d'effectuer une requête pour obtenir des données

In [None]:
result = client.query("SELECT * FROM dnd")

## Comparaison 1 : Chargement des données

In [None]:
NB_ROWS_TO_INSERT = 5000

On charge {NB_ROWS_TO_INSERT} données une par une pour étudier le temps mis par chaque système pour les ajouter.

In [None]:
cursor = conn.cursor()

cursor.execute("DROP TABLE IF EXISTS nyc_taxi;")

cursor.execute("""
    CREATE TABLE nyc_taxi (
        id INT AUTO_INCREMENT PRIMARY KEY,
        vendor_id INT,
        passenger_count INT,
        trip_distance FLOAT,
        fare_amount FLOAT,
        tip_amount FLOAT,
        tolls_amount FLOAT
    );
""")

conn.commit()

t0 = time.time()
for i, row in df.iterrows():
    cursor.execute("""
        INSERT INTO nyc_taxi (vendor_id, passenger_count, trip_distance, fare_amount, tip_amount, tolls_amount)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, tuple(row))
    conn.commit()
    if i == NB_ROWS_TO_INSERT :
        break
t1 = time.time()
print("Time required to add %d rows one by one : "%(NB_ROWS_TO_INSERT), t1 - t0)

cursor.close()

In [None]:
client.command("DROP TABLE IF EXISTS nyc_taxi;")

client.command("""
    CREATE TABLE nyc_taxi (
        id UInt32,
        vendor_id UInt8,
        passenger_count UInt8,
        trip_distance Float32,
        fare_amount Float32,
        tip_amount Float32,
        tolls_amount Float32
    ) ENGINE = MergeTree()
    ORDER BY id;
""")

t0 = time.time()
data = []
for i, row in df.iterrows():
    client.insert("nyc_taxi", [(i, row["VendorID"], row["passenger_count"], row["trip_distance"], row["fare_amount"], row["tip_amount"], row["tolls_amount"])])
    if i == NB_ROWS_TO_INSERT :
        break
t1 = time.time()
print("Time required to add %d rows one by one : "%(NB_ROWS_TO_INSERT), t1 - t0)

On observe que clickhouse est beaucoup plus lent que mysql pour l'insertion des données.
C'est évidemment dû à la différence de stockage.
- Pour MySQL, on ajoute 1 liste de N éléments
- Pour ClickHouse, on ajoute 1 élement dans N listes

Afin de pallier ce problème, on utilise la fonction `insert` avec un tableau de donnée pour ajouter un paquet.
Ajout maintenant l'ensemble des données dans les deux base de données.

In [None]:
cursor = conn.cursor()

cursor.execute("DROP TABLE IF EXISTS nyc_taxi;")

cursor.execute("""
    CREATE TABLE nyc_taxi (
        id INT AUTO_INCREMENT PRIMARY KEY,
        vendor_id INT,
        passenger_count INT,
        trip_distance FLOAT,
        fare_amount FLOAT,
        tip_amount FLOAT,
        tolls_amount FLOAT
    );
""")

conn.commit()

for i, row in df.iterrows():
    cursor.execute("""
        INSERT INTO nyc_taxi (vendor_id, passenger_count, trip_distance, fare_amount, tip_amount, tolls_amount)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, tuple(row))

conn.commit()
cursor.close()

In [None]:
data = []
for i, row in df.iterrows():
    data.append((i, row["VendorID"], row["passenger_count"], row["trip_distance"], row["fare_amount"]))

client.command("DROP TABLE IF EXISTS nyc_taxi;")

client.command("""
    CREATE TABLE nyc_taxi (
        id UInt32,
        vendor_id UInt8,
        passenger_count UInt8,
        trip_distance Float32,
        fare_amount Float32,
        tip_amount Float32,
        tolls_amount Float32
    ) ENGINE = MergeTree()
    ORDER BY id;
""")

data = []
for i, row in df.iterrows():
    data.append([i, row["VendorID"], row["passenger_count"], row["trip_distance"], row["fare_amount"], row["tip_amount"], row["tolls_amount"]])

client.insert("nyc_taxi", data)

On voit ici qu'il y a un réel gain de temps lors de l'ajout par batch.
L'ajout progressif des données est une limitation importante de ClickHouse et des bases de données orientés colonnes en général.

In [None]:
result = sql_query("SELECT COUNT(*) FROM nyc_taxi;")
print("MySQL Total Rows:", result[0][0])

result = client.query("SELECT COUNT(*) FROM nyc_taxi;")
print("ClickHouse Total Rows:", result.result_rows[0][0])

## Comparaison 2 : Requête (global)

On définit requête pour étudier la différence dans le temps d'execution des requêtes selon la base de donnée. 

In [None]:
query_A = "SELECT * FROM nyc_taxi;"
query_B = "SELECT * FROM nyc_taxi ORDER BY passenger_count ASC, fare_amount DESC;"
query_C = "SELECT SUM(trip_distance) FROM nyc_taxi;"
query_D = "SELECT COUNT(*) FROM nyc_taxi;"
query_E = "SELECT COUNT(passenger_count) FROM nyc_taxi;"

In [None]:
iterations = 25
for query in [query_A, query_B, query_C, query_D, query_E] :
    t0 = time.time()
    for i in range(iterations) :
        resultsql = sql_query(query)
    t1 = time.time()
    for i in range(iterations) :
        resultcol = client.query(query)
    t2 = time.time()
    print("Requête : ", query)
    print("MySQL :      ", (t1 - t0)/iterations)
    print("ClickHouse : ", (t2 - t1)/iterations)
    print(" ")

Try it yourself !

In [None]:
query = "SELECT * FROM nyc_taxi;"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("Requête : ", query)
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

## Comparaison 3 : Requête (specific)

In [None]:
query = "SELECT * FROM nyc_taxi WHERE id=1523"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

In [None]:
query = "SELECT * FROM nyc_taxi WHERE id=30523 OR id=28645"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

Les accès de lignes précises sont bien plus longue avec ClickHouse qu'avec MySQL !

In [None]:
query = "SELECT * FROM nyc_taxi WHERE id = 30523 OR tip_amount > 0"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

In [None]:
query = "SELECT * FROM nyc_taxi WHERE fare_amount > 1 AND fare_amount < 12 "
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

A l'inverse, le filtrage concernant une colonne est bien plus rapide avec ClickHouse.

In [None]:
query = "SELECT * FROM nyc_taxi WHERE fare_amount = 1.0"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)

## Comparaison 4 : Requête (join)

In [None]:
data = [(1, 0, 0.0),
        (2, 1, 1.0),
        (3, 2, 3.5),
        (5, 3, 6.0),
        (8, 4, 10.5)]

In [None]:
cursor = conn.cursor()

cursor.execute("DROP TABLE IF EXISTS special_rule;")

cursor.execute("""
    CREATE TABLE special_rule (
        id INT PRIMARY KEY,
        passenger INT,
        reduction FLOAT
    );
""")

conn.commit()

for row in data:
    cursor.execute("""
        INSERT INTO special_rule (id, passenger, reduction)
        VALUES (%s, %s, %s)
    """, row)

conn.commit()
cursor.close()

client.command("DROP TABLE IF EXISTS special_rule;")

client.command("""
    CREATE TABLE special_rule (
        id UInt32,
        passenger UInt8,
        reduction Float32
    ) ENGINE = MergeTree()
    ORDER BY id;
""")

client.insert("special_rule", data)

In [None]:
resultsql = sql_query("SELECT * FROM special_rule")
print(resultsql)
resultcol = client.query("SELECT * FROM special_rule")
print(resultcol.result_rows)

In [None]:
query = "SELECT * FROM nyc_taxi JOIN special_rule ON nyc_taxi.passenger_count = special_rule.passenger WHERE tip_amount < 10 * fare_amount"
iterations = 25
t0 = time.time()
for i in range(iterations) :
    resultsql = sql_query(query)
t1 = time.time()
for i in range(iterations) :
    resultcol = client.query(query)
t2 = time.time()
print("MySQL :      ", (t1 - t0)/iterations)
print("ClickHouse : ", (t2 - t1)/iterations)