In [1]:
from multiprocessing import Process, Queue
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool
from configparser import ConfigParser
from tqdm.auto import tqdm, trange
import uuid
import time

In [2]:
def config(filename='database.ini', mode="test"):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)
    section='postgresql'
    if mode == "production":
        section = "cah_production"
    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))
    return db

In [6]:
def dump_3m(j, workers, engine, jobtype, cycles, queue, path):
    engine.dispose()
    with engine.connect() as eng:
        conn = engine.raw_connection()
        for i in range(cycles):
            file = uuid.uuid4()
            # clipped out
            if jobtype == "clipped":
                select_stmt1 = f"""BEGIN;
                                SET work_mem = '1GB';
                                -- query --
                                COPY (
                                    DELETE FROM dataset_en WHERE sampleid in (
                                        select sampleid from dataset_en where status = 2 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/clipped/ok-en-{file}.csv' DELIMITER '|' CSV HEADER;
                                COPY (
                                    DELETE FROM dataset_intl WHERE sampleid in (
                                        select sampleid from dataset_en where status = 2 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/clipped/ok-intl-{file}.csv' DELIMITER '|' CSV HEADER;
                                SET work_mem = default;
                                COMMIT;"""
            # rejected out
            elif jobtype == "rejected":
                select_stmt1 = f"""BEGIN;
                                SET work_mem = '1GB';
                                -- query --
                                COPY (
                                    DELETE FROM dataset_en WHERE sampleid in (
                                        select sampleid from dataset_en where status > 8 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/rejected/bad-en-{file}.csv' DELIMITER '|' CSV HEADER;
                                COPY (
                                    DELETE FROM dataset_intl WHERE sampleid in (
                                        select sampleid from dataset_en where status > 8 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/rejected/bad-intl-{file}.csv' DELIMITER '|' CSV HEADER;
                                SET work_mem = default;
                                COMMIT;"""
            # todo nolang out
            elif jobtype == "todo_nolang":
                select_stmt1 = f"""BEGIN;
                                SET work_mem = '1GB';
                                -- query --
                                COPY (
                                    DELETE FROM dataset_nolang WHERE sampleid in (
                                        select sampleid from dataset where status = 0 and language = '' order by sampleid limit 10000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/todo/nolang/nolang-{file}.csv' DELIMITER '|' CSV HEADER;
                                SET work_mem = default;
                                COMMIT;"""
            # todo intl out
            elif jobtype == "todo_intl":
                select_stmt1 = f"""BEGIN;
                                SET work_mem = '1GB';
                                -- query --
                                COPY (
                                    DELETE FROM dataset_intl WHERE sampleid in (
                                        select sampleid from dataset where status = 0 and language not in ('','en') order by sampleid limit 10000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/todo/intl/intl-{file}.csv' DELIMITER '|' CSV HEADER;
                                SET work_mem = default;
                                COMMIT;"""
            # todo english out
            elif jobtype == "todo_en":
                select_stmt1 = f"""BEGIN;
                                SET work_mem = '1GB';
                                -- query --
                                COPY (
                                    DELETE FROM dataset_en WHERE sampleid in (
                                        select sampleid from dataset where status = 0 and language = 'en' order by sampleid limit 10000000 FOR UPDATE SKIP LOCKED
                                        ) RETURNING *
                                    ) TO '{path}/todo/english/eng-{file}.csv' DELIMITER '|' CSV HEADER;
                                SET work_mem = default;
                                COMMIT;"""
            else:
                continue
            try:
                cur = conn.cursor()
                cur.execute(select_stmt1)
                conn.commit()
            except Exception as e:
                print(f"error: {e}")
            queue.put(1)
    return

In [5]:
mode = "production"
params = config(mode=mode)
engine = create_engine(f'postgresql://{params["user"]}:{params["password"]}@{params["host"]}:5432/{params["database"]}', pool_pre_ping=True, poolclass=NullPool)
threads1 = 3
threads2 = 5
threads3 = 0
threads4 = 0
cycles = 4

path = "/home/cah"
if mode == "production":
    path = "/mnt/md1/export"

workers = []
for _ in range(threads1):
    workers.append("clipped")
for _ in range(threads2):
    workers.append("rejected")
for _ in range(threads3):
    workers.append("todo_nolang")
for _ in range(threads4):
    workers.append("todo_intl")
#for j in range(todoen_proc):
#    workers.append("todo_en")

iterations = len(workers) * cycles
processes = []
pbars = []
pbar = tqdm(total=iterations)
q = Queue()

for i, worker in enumerate(workers):
    print(f"[{i}] {worker}")
    time.sleep(10)
    j = 0
    num = 0
    if worker == "clipped":
        j = i
        num = threads1
    elif worker == "rejected":
        j = i - threads1
        num = threads2
    elif worker == "todo_nolang":
        j = i - threads1 - threads2
        num = threads3
    elif worker == "todo_intl":
        j = i - threads1 - threads2 - threads3
        num = threads4
    else:
        pass

    p = Process(target=dump_3m, args = [j, num, engine, worker, cycles, q, path], daemon=False)
    try:
        p.start()
        processes.append(p)
    except:
        pass

progress = 0
while progress < iterations:
    if not q.empty():
        q.get()
        pbar.update(1)
        progress += 1
    time.sleep(0.2)

for proc in processes:
    proc.join()

print (f"Job ended")


  0%|          | 0/32 [00:00<?, ?it/s]

[0] clipped
[1] clipped
[2] clipped
[3] rejected
[4] rejected
[5] rejected
[6] rejected
[7] rejected
error: relation "dataset" does not exist
LINE 10:                                     DELETE FROM dataset WHER...
                                                         ^

error: current transaction is aborted, commands ignored until end of transaction block

error: current transaction is aborted, commands ignored until end of transaction block

error: current transaction is aborted, commands ignored until end of transaction block

error: relation "dataset" does not exist
LINE 10:                                     DELETE FROM dataset WHER...
                                                         ^

error: current transaction is aborted, commands ignored until end of transaction block

error: current transaction is aborted, commands ignored until end of transaction block

error: relation "dataset" does not exist
LINE 10:                                     DELETE FROM dataset WHER.