In [1]:
# export
from collections.abc import MutableMapping
import json
import pprint
from typing import Callable, Tuple, Dict
import sqlite3

In [3]:
TEST_1 = "key_test_1"
TEST_2 = "key_test_2"

In [2]:
# export
class SQLCounter:
    def __init__(
        self, dbname, check_same_thread=False, fast=True, **kwargs,
    ):
        self.dbname = dbname
        self.conn = sqlite3.connect(
            self.dbname, check_same_thread=check_same_thread, **kwargs
        )

        with self.conn as c:

            c.execute(
                "CREATE TABLE IF NOT EXISTS Counter (key text NOT NULL PRIMARY KEY, value integer)"
            )

            if fast:
                c.execute("PRAGMA journal_mode = 'WAL';")
                c.execute("PRAGMA temp_store = 2;")
                c.execute("PRAGMA synchronous = 1;")
                c.execute(f"PRAGMA cache_size = {-1 * 64_000};")

    def incr(self, key):
        with self.conn as c:
            c.execute(
                "INSERT INTO Counter VALUES (?, 1) ON CONFLICT(key) DO UPDATE SET value = value + 1",
                (key,),
            )
        return

    def decr(self, key):
        with self.conn as c:
            c.execute(
                "INSERT INTO Counter VALUES (?, -1) ON CONFLICT(key) DO UPDATE SET value = value - 1",
                (key,),
            )
        return

    def count(self, key):
        c = self.conn.execute("SELECT value FROM Counter WHERE Key=?", (key,))
        row = c.fetchone()
        if row is None:
            return 0
        return row[0]

    def zero(self, key):
        with self.conn as c:
            c.execute(
                "INSERT INTO Counter VALUES (?, 0) ON CONFLICT(key) DO UPDATE SET value = 0",
                (key,),
            )
        return

    def delete(self, key):
        # TODO
        raise NotImplementedError
        # with self.conn as c:
        #     c.execute(
        #         "INSERT INTO Counter VALUES (?, 1) ON CONFLICT(key) DO UPDATE SET value = value + 1",
        #         (key,),
        #     )
        # return

    def vacuum(self):
        self.conn.execute("VACUUM;")

    def close(self):
        self.conn.close()

In [4]:
counter = SQLCounter(":memory:")

for _ in range(20):
    counter.incr(TEST_1)
    
assert counter.count(TEST_1) == 20

for _ in range(10):
    counter.decr(TEST_1)
    
assert counter.count(TEST_1) == 10


for _ in range(10):
    counter.decr(TEST_2)
    
assert counter.count(TEST_2) == -10

for _ in range(10):
    counter.decr(TEST_2)
    
assert counter.count(TEST_2) == -20

counter.zero(TEST_1)

assert counter.count(TEST_1) == 0

for _ in range(100):
    counter.incr(TEST_2)
    
assert counter.count(TEST_2) == 80