In [1]:
import os
import sys
import subprocess as sp
import json
import random
import time
import hashlib
import base64
from contextlib import closing
from dataclasses import dataclass
import typing

import multiprocessing

# use multiprocessing.Manager() to get a Barrier instead for PPX
# from multiprocessing import Barrier

from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor

import mariadb
import mariadb.constants.CLIENT
import pymysql
import pymysql.constants.CLIENT


### Get a quick mariadb instance:
---

```bash

docker run --rm -it --name m105 --cpus 8 --network host -e MYSQL_ROOT_PASSWORD=bobz1234 \
--tmpfs /mem_vol/ mariadb:10.5 --datadir='/mem_vol' --port=10306 --innodb-buffer-pool-size=8GB

# sync off, ie a reporting db:
docker run --rm -it --name m105 --cpus 8 --network host -e MYSQL_ROOT_PASSWORD=bobz1234 \
--tmpfs /mem_vol/ mariadb:10.5 --datadir='/mem_vol' --port=10306 --innodb-buffer-pool-size=8GB \
--innodb-flush-log-at-trx-commit=2 --sync-binlog=0

```

In [2]:
_DB_INFO = {
    
    "host": "127.0.0.1",
    "port": 10306,
    
    # "unix_socket": "/var/run/mysqld/mysqld.sock",
    # "unix_socket": "/run/mysqld/mysqld.sock",

    "user": "root",
    "password": "bobz1234",
    
    "database": "mysql",

    # Allow cur.execute to execute multiple statements
    "client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS
}

In [30]:
pymy_conn = pymysql.connect(**_DB_INFO)
pymy_conn.autocommit(True)
print(f"pymy autocommit status: {pymy_conn.autocommit_mode}")

pymy autocommit status: True


In [31]:
pymy_cur = pymy_conn.cursor()

In [5]:
with pymy_conn.cursor() as tmp_cur:
    tmp_cur.execute('SELECT 1;')

# Some test queries

In [6]:
def exec_q(q):
    with closing(pymy_conn.cursor()) as tmp_cur:
        tmp_cur.execute(q)
        
        col_names = []
        if tmp_cur.description:
            for desc in tmp_cur.description:
                col_names.append(desc[0])
        if col_names:
            hdr = "|".join(col_names)
            print(hdr)
            print('-' * len(hdr))
        
        # rows
        results = tmp_cur.fetchall()
        if results:
            for row in results:
                print(row)
        # sep
        print("")


In [7]:
exec_q("SHOW DATABASES;")
print("")
exec_q("SELECT @@innodb_buffer_pool_size / (1024*1024) AS innodb_buffer_MB;")

Database
--------
('information_schema',)
('mysql',)
('performance_schema',)


innodb_buffer_MB
----------------
(Decimal('8192.0000'),)



# Generate sample 

In [8]:
# mariadb INT and int32 is usually ranged: -2 147 483 648 to 2 147 483 647
def get_random_int32():
    tmp = 2 * 1000 * 1000 * 1000
    return random.randint(0, tmp)

def get_random_int64():
    tmp = 80 * 1000 * 1000 * 1000 * 1000
    return random.randint(0, tmp)

def get_random_str(str_len=48):
    assert isinstance(str_len, int)
    assert str_len < 8192
    return base64.b64encode(os.urandom(str_len), altchars=b"AZ").decode('ascii')

In [9]:
# demo records
for _ in range(3):
    print((get_random_str(48), get_random_int32()))

('EtQE7bzfZiLUeJ8sM8Dp3KAPlAxL0LDuhP4CzZN6AVLXoeZkxLg8WVnZHKaw7QGk', 1728198644)
('xnrqGGvAHc7LvpECIZAA2y1ygqoccq6bwcypRXCvEhHs2WQxRmq5iFwiAGBLn0Q1', 630537093)
('LP5IUOGqUBQtAtSyfYrUuwTsiToZTqvi9AhVKVrRChUzcO2PJPuqNBZd0zBEUNQC', 171514377)


# Create BENCHMARK tables

In [25]:
refresh_schema_q = """
DROP TABLE IF EXISTS xb_bench1;
DROP TABLE IF EXISTS xb_bench2;

CREATE TABLE IF NOT EXISTS xb_bench1(
     brid INT NOT NULL AUTO_INCREMENT,
     val_1 CHAR(64) NOT NULL,
     num_1 INT NOT NULL,   
     PRIMARY KEY (brid)
);

CREATE TABLE IF NOT EXISTS xb_bench2(
     brid INT NOT NULL AUTO_INCREMENT,
     val_1 VARCHAR(128) NOT NULL,
     num_1 INT NOT NULL,
     PRIMARY KEY (brid)
) AUTO_INCREMENT = 1000;
"""

In [26]:
exec_q(refresh_schema_q)




---
# sample inserts

In [12]:
# ------------------------------------- execute

test_q = """ INSERT INTO xb_bench1(val_1, num_1) VALUES 
('Q8M5rzhQveJJClZAGhXtNmWCgRm2ETnSeZb2KdXy1vDVlnj3tYOlOZulh2Fc7T1p', '2120724'),
('TBNJcNiuZAbLTnDdEMGvhYsG3D2x9UjOnJ9jBEmlzs0IaawWmCN9AnsLlheQ2vpv', '2020921');
"""
with pymy_conn.cursor() as tmp_cur:
    tmp_cur.execute(test_q)

exec_q("SELECT * FROM xb_bench1;")
exec_q(refresh_schema_q)

print('----- db clean again:\n')
exec_q("SELECT * FROM xb_bench1;")

brid|val_1|num_1
----------------
(1000, 'Q8M5rzhQveJJClZAGhXtNmWCgRm2ETnSeZb2KdXy1vDVlnj3tYOlOZulh2Fc7T1p', 2120724)
(1001, 'TBNJcNiuZAbLTnDdEMGvhYsG3D2x9UjOnJ9jBEmlzs0IaawWmCN9AnsLlheQ2vpv', 2020921)


----- db clean again:

brid|val_1|num_1
----------------



In [23]:
# ------------------------------------- executemany (very fast since mariadb 10.2 or 10.3)
test_q = "INSERT xb_bench1(val_1, num_1) VALUES (%s, %s);"

records = [
    ('Y1PfeoYmPm64yXf83URekmNkq89W2uucHCrCJwcAFNudhpNZILSHbflPhP2KhZWy', '2120724'),
    ('iAWZwnoskhrJdQGqpdZMCVY0mH0NNZyYTUWh9kvGrkxMoryX38pDfZJmBZsg1lVp', '7026371'),
    ('ktpY84x1Yd11lYUUWxpyNHZlgZ8d0X8Xwwfxxd7efvMUAsAPKAtb8ohJXnrzlhXM', '3905237')
]

with pymy_conn.cursor() as tmp_cur:
    tmp_cur.executemany(test_q, records)

exec_q("SELECT * FROM xb_bench1;")
exec_q(refresh_schema_q)

print('----- db clean again:\n')
exec_q("SELECT * FROM xb_bench1;")


brid|val_1|num_1
----------------
(401000, 'Y1PfeoYmPm64yXf83URekmNkq89W2uucHCrCJwcAFNudhpNZILSHbflPhP2KhZWy', 2120724)
(401001, 'iAWZwnoskhrJdQGqpdZMCVY0mH0NNZyYTUWh9kvGrkxMoryX38pDfZJmBZsg1lVp', 7026371)
(401002, 'ktpY84x1Yd11lYUUWxpyNHZlgZ8d0X8Xwwfxxd7efvMUAsAPKAtb8ohJXnrzlhXM', 3905237)


----- db clean again:

brid|val_1|num_1
----------------



---
---
---
---
---
# Concurrent/Parallel Benchmarks:


In [14]:
@dataclass(frozen=True)
class BenchParams:
    wrkr_id: str
    db_info: dict
    start_barrier: typing.Any
    
    # benchmark 4 ips (insertion per sec)
    b4i_num_records: int


In [15]:
@dataclass(frozen=True)
class BenchResult:
    wrkr_id: str
    
    wrkr_start_time: float
    wrkr_end_time: float
    
    # benchmark for ips
    b4i_start_time: float
    b4i_end_time: float
    
    b4i_num_records: int
    b4i_ips: int
    
    def __str__(self):
        _deltaT = self.b4i_end_time - self.b4i_start_time
        _ips = int(self.b4i_num_records / _deltaT)
        _start = self.b4i_start_time
        return f"wrkr_id: {self.wrkr_id} -- b4i_start: {_start:,.4f} -- deltaT: {_deltaT:.4f} -- ips: {_ips}"

In [32]:
class IPS_Benchmark:
    def __init__(self, bp: BenchParams):
        
        self.bp = bp
        
        # ----- generate sample data:
        self.b4i_data = []
        for _ in range(bp.b4i_num_records):
            # almost no diff between 48 or just 4 in ips
            tmp_record = (get_random_str(48), get_random_int32())
            self.b4i_data.append(tmp_record)
        
        # ----- connect to db
        # NOTE pg vs mariadb autocommit is different.
        self.db_conn = pymysql.connect(**bp.db_info)
        self.db_conn.autocommit(True)
        
        print(f"{bp.wrkr_id}: init compelete. time: {time.time():,.5f}\n", end="")
        
    def run_b4i(self):
        
        with self.db_conn.cursor() as tmp_cur:
            _q = "INSERT INTO xb_bench1(val_1, num_1) VALUES (%s, %s);"
            tmp_cur.executemany(_q, self.b4i_data)
        

In [33]:
def wrkr_entry(bp: BenchParams):
    
    _wrkr_start_time = time.time()
    # print(f"{bp.wrkr_id}: wrkr_entry() called. Time: {_wrkr_start_time:,.5f}\n", end="")
    bench = IPS_Benchmark(bp)
    
    # comment/uncomment sleep to simulate additional init delay
    # time.sleep(1)
    # synchronize all benchmarks to start at the same time. ie using a barrier
    bp.start_barrier.wait()
    
    print(f"{bp.wrkr_id} awoken. worker is passed barrier. time: {time.time():,.5f}\n", end="")
    _b4i_start_time = time.time()
    bench.run_b4i()
    _b4i_end_time = time.time()
    
    # calculate some benchmark stats for the returning result
    _b4i_num_records = len(bench.b4i_data)
    _b4i_ips = _b4i_num_records / (_b4i_end_time - _b4i_start_time)
    
    
    bres = BenchResult(wrkr_id=bp.wrkr_id, wrkr_start_time=_wrkr_start_time, wrkr_end_time=time.time(),
                       b4i_start_time=_b4i_start_time, b4i_end_time=_b4i_end_time,
                       b4i_num_records=_b4i_num_records, b4i_ips=_b4i_ips)
    
    return bres

---
---
---
# ProcessPoolExecutor


In [46]:
# ------------------------------------------------ lets go w/ option1: map and iterate results
# option 2 would be
# futures.append( ... submit() ...)
# wait(futures)

# ----------------------------------------- benchmark knobs
ppx_worker_count = 16
ppx_num_rows = 800 * 1000

# prep db 
with pymy_conn.cursor() as tmp_cur:
    tmp_cur.execute(refresh_schema_q)

print('db refreshed. rdy 4 benchmark.')

mgr = multiprocessing.Manager()
ppx_bar = mgr.Barrier(ppx_worker_count)
# ppx_bar = None

worker_args = []
for i in range(ppx_worker_count):
    bp = BenchParams(wrkr_id=f'w{i}', b4i_num_records=ppx_num_rows, db_info=_DB_INFO, start_barrier=ppx_bar)
    worker_args.append(bp)

# results
ppx_waited_results = []

# make ppx and run 
# master time is worthless, we are generating mock data in init, dont measure it, needless complexity
with ProcessPoolExecutor(max_workers=ppx_worker_count) as ppx:
    map_res_iterator = ppx.map(wrkr_entry, worker_args)
    for res in map_res_iterator:
        ppx_waited_results.append(res)

# 
print("")
print('# ' + '-' * 97)
print('# ' + '-' * 97)

print('=' * 20 + " benchmark results: ")
total_inserted_records = sum([res.b4i_num_records for res in ppx_waited_results])
b4i_workers_ips_sum = int(sum([res.b4i_ips for res in ppx_waited_results]))

# (includes record generation, db connect, sync, ...)
print(f"num wrkrs: {ppx_worker_count}")
print(f"records per wrkr: {ppx_num_rows:,}")

with pymy_conn.cursor() as tmp_cur:
    tmp_cur.execute('SELECT count(*) FROM xb_bench1;')
    print(f"Actual records in xb_bench1 table: {tmp_cur.fetchone()[0]}")
    
    # drop tables so it doesnt go and try to resolve WAL and what not.
    tmp_cur.execute(refresh_schema_q)

# print("\n")
# exec_q('SELECT count(*) FROM xb_bench1;')

# 
print(f"\n*** Sum of syncd wrkr ips  : {b4i_workers_ips_sum:,}")

print('\n' + '=' * 20 + " worker info: ")
for res in ppx_waited_results:
    print(res)

db refreshed. rdy 4 benchmark.
w3: init compelete. time: 1,604,573,845.49063
w9: init compelete. time: 1,604,573,845.60427
w0: init compelete. time: 1,604,573,845.61847
w1: init compelete. time: 1,604,573,845.78462
w5: init compelete. time: 1,604,573,845.80739
w15: init compelete. time: 1,604,573,845.83226
w10: init compelete. time: 1,604,573,845.85471
w13: init compelete. time: 1,604,573,845.97487
w7: init compelete. time: 1,604,573,845.98565
w8: init compelete. time: 1,604,573,846.01328
w2: init compelete. time: 1,604,573,846.01429
w14: init compelete. time: 1,604,573,846.03797
w6: init compelete. time: 1,604,573,846.04583
w4: init compelete. time: 1,604,573,846.05845
w11: init compelete. time: 1,604,573,846.06418
w12: init compelete. time: 1,604,573,846.13721
w12 awoken. worker is passed barrier. time: 1,604,573,846.13954
w7 awoken. worker is passed barrier. time: 1,604,573,846.13989
w0 awoken. worker is passed barrier. time: 1,604,573,846.13971
w14 awoken. worker is passed barrier.

---
# PPX Results Dump

```text

# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
# ---------------------------------------------- Ryzen 2700X, 32GB RAM, pg13 on dkr on tmpfs PGDATA


# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 1
records per wrkr: 800,000
Actual records in xb_bench1 table: 800000

*** Sum of syncd wrkr ips  : 108,672

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,550,061.4414 -- deltaT: 7.3616 -- ips: 108672


# NOTES:
- CPU usage peaked 60% 



# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 2
records per wrkr: 800,000
Actual records in xb_bench1 table: 1600000

*** Sum of syncd wrkr ips  : 205,187

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,572,419.0069 -- deltaT: 7.7702 -- ips: 102957
wrkr_id: w1 -- b4i_start: 1,604,572,419.0067 -- deltaT: 7.8255 -- ips: 102229


# NOTES:
- CPU usage peaked 125% 



# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 3
records per wrkr: 800,000
Actual records in xb_bench1 table: 2400000

*** Sum of syncd wrkr ips  : 284,952

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,572,491.0739 -- deltaT: 8.3825 -- ips: 95436
wrkr_id: w1 -- b4i_start: 1,604,572,491.0736 -- deltaT: 8.4238 -- ips: 94968
wrkr_id: w2 -- b4i_start: 1,604,572,491.0738 -- deltaT: 8.4613 -- ips: 94547


# NOTES:
- CPU usage peaked 190% 



# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 4
records per wrkr: 800,000
Actual records in xb_bench1 table: 3200000

*** Sum of syncd wrkr ips  : 319,463

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,572,543.2921 -- deltaT: 10.0073 -- ips: 79941
wrkr_id: w1 -- b4i_start: 1,604,572,543.2922 -- deltaT: 10.0070 -- ips: 79943
wrkr_id: w2 -- b4i_start: 1,604,572,543.2923 -- deltaT: 9.6003 -- ips: 83331
wrkr_id: w3 -- b4i_start: 1,604,572,543.2920 -- deltaT: 10.4922 -- ips: 76246


# NOTES:
- CPU usage peaked 280% 



# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 6
records per wrkr: 800,000
Actual records in xb_bench1 table: 4800000

*** Sum of syncd wrkr ips  : 340,144

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,572,714.6970 -- deltaT: 13.9595 -- ips: 57308
wrkr_id: w1 -- b4i_start: 1,604,572,714.6981 -- deltaT: 14.1477 -- ips: 56546
wrkr_id: w2 -- b4i_start: 1,604,572,714.6978 -- deltaT: 13.6630 -- ips: 58552
wrkr_id: w3 -- b4i_start: 1,604,572,714.6983 -- deltaT: 14.2618 -- ips: 56093
wrkr_id: w4 -- b4i_start: 1,604,572,714.6985 -- deltaT: 14.4819 -- ips: 55241
wrkr_id: w5 -- b4i_start: 1,604,572,714.6979 -- deltaT: 14.1839 -- ips: 56401


# NOTES:
- CPU usage peaked 450%



# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
==================== benchmark results: 
num wrkrs: 8
records per wrkr: 800,000
Actual records in xb_bench1 table: 6400000

*** Sum of syncd wrkr ips  : 338,231

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,572,837.3461 -- deltaT: 19.0272 -- ips: 42044
wrkr_id: w1 -- b4i_start: 1,604,572,837.3462 -- deltaT: 18.6290 -- ips: 42943
wrkr_id: w2 -- b4i_start: 1,604,572,837.3450 -- deltaT: 19.1973 -- ips: 41672
wrkr_id: w3 -- b4i_start: 1,604,572,837.3458 -- deltaT: 18.7701 -- ips: 42621
wrkr_id: w4 -- b4i_start: 1,604,572,837.3459 -- deltaT: 18.8317 -- ips: 42481
wrkr_id: w5 -- b4i_start: 1,604,572,837.3465 -- deltaT: 18.9809 -- ips: 42147
wrkr_id: w6 -- b4i_start: 1,604,572,837.3481 -- deltaT: 19.0360 -- ips: 42025
wrkr_id: w7 -- b4i_start: 1,604,572,837.3464 -- deltaT: 18.9151 -- ips: 42294


# NOTES:
- CPU usage peaked 620% 

num wrkrs: 16
records per wrkr: 800,000
Actual records in xb_bench1 table: 12800000

*** Sum of syncd wrkr ips  : 243,142

==================== worker info: 
wrkr_id: w0 -- b4i_start: 1,604,573,846.1418 -- deltaT: 52.7176 -- ips: 15175
wrkr_id: w1 -- b4i_start: 1,604,573,846.1434 -- deltaT: 52.9106 -- ips: 15119
wrkr_id: w2 -- b4i_start: 1,604,573,846.1621 -- deltaT: 53.0726 -- ips: 15073
wrkr_id: w3 -- b4i_start: 1,604,573,846.1420 -- deltaT: 52.9379 -- ips: 15112
..... truncated .....

# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------
Conclusions:
- pg13 is a lot faster at ips than mariadb 10.5 at max IPS given the ryzen 2700x box.
- its also more efficient on CPU. It reached higher IPS for same CPU usage.

the best i got from mariadb was 340k ips w/ 4 workers. (w/ more workers equaling less perf)
pg13 got upto 440k ips from 8 workers. (at which point its highly likely i was already long 
out of spare CPU, and deep into contention. there were 8 cores, 16 hyperthreads, but I was
using 8 cores w. python and OS is using a ton to move all this data through TCP ....

and this whole thing is unfair because pg maybe syncing to tmpfs way more often 
w/ 128MB shared buffers but innodb had its own 8GB innodb buffer pool which was never
exceeded. tmpfs is memory and should be very fast but still, mdb might have an advantage
here.

- the ips fall off was pretty bad for both of them but maybe worse for mdb at 16 worker.
mdb  down to 243k ips (from 340k peak)
pg13 down to 342k ips (from 440k peak)



```


## ThreadPoolExeccutor