In [6]:
from multiprocessing import Process, Queue
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool
from configparser import ConfigParser
from tqdm.auto import tqdm, trange
import uuid
import time

In [7]:
def config(filename='database.ini', mode="test"):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)
    section='postgresql'
    if mode == "production":
        section = "cah_production"
    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))
    return db

In [8]:
def dump_3m(j, workers, engine, jobtype, cycles, queue, path, dataset):
    engine.dispose()
    with engine.connect() as eng:
        conn = engine.raw_connection()
        for i in range(cycles):
            file = uuid.uuid4()
            # clipped out
            if jobtype == "clipped":
                if dataset ==  "en":
                    select_stmt1 = f"""BEGIN;
                                    SET work_mem = '1GB';
                                    -- query --
                                    COPY (
                                        DELETE FROM dataset_en WHERE sampleid in (
                                            select sampleid from dataset_en where status = 2 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                            ) RETURNING *
                                        ) TO '{path}/clipped/ok-en-{file}.csv' DELIMITER '|' CSV HEADER;
                                    SET work_mem = default;
                                    COMMIT;"""
                else:
                    select_stmt1 = f"""BEGIN;
                                    SET work_mem = '1GB';
                                    -- query --
                                    COPY (
                                        DELETE FROM dataset_{dataset} WHERE sampleid in (
                                            select sampleid from dataset_{dataset} where status = 2 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                            ) RETURNING *
                                        ) TO '{path}/clipped/ok-{dataset}-{file}.csv' DELIMITER '|' CSV HEADER;
                                    SET work_mem = default;
                                    COMMIT;"""
            # rejected out
            elif jobtype == "rejected":
                if dataset ==  "en":
                    select_stmt1 = f"""BEGIN;
                                    SET work_mem = '1GB';
                                    -- query --
                                    COPY (
                                        DELETE FROM dataset_en WHERE sampleid in (
                                            select sampleid from dataset_en where status > 8 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                            ) RETURNING *
                                        ) TO '{path}/rejected/bad-en-{file}.csv' DELIMITER '|' CSV HEADER;
                                    SET work_mem = default;
                                    COMMIT;"""
                else:
                    select_stmt1 = f"""BEGIN;
                                    SET work_mem = '1GB';
                                    -- query --
                                    COPY (
                                        DELETE FROM dataset_{dataset} WHERE sampleid in (
                                            select sampleid from dataset_{dataset} where status > 8 order by sampleid limit 5000000 FOR UPDATE SKIP LOCKED
                                            ) RETURNING *
                                        ) TO '{path}/rejected/bad-{dataset}-{file}.csv' DELIMITER '|' CSV HEADER;
                                    SET work_mem = default;
                                    COMMIT;"""

            else:
                continue
            try:
                cur = conn.cursor()
                cur.execute(select_stmt1)
                conn.commit()
            except Exception as e:
                print(f"error: {e}")
            queue.put(1)
    return

In [32]:
mode = "production"
dataset = "nolang"
params = config(mode=mode)
engine = create_engine(f'postgresql://{params["user"]}:{params["password"]}@{params["host"]}:5432/{params["database"]}', pool_pre_ping=True, poolclass=NullPool)
threads1 = 2
threads2 = 3
cycles = 5

path = "/home/cah"
if mode == "production":
    path = "/mnt/md1/export"

workers = []
for _ in range(threads1):
    workers.append("clipped")
for _ in range(threads2):
    workers.append("rejected")

iterations = len(workers) * cycles
processes = []
pbars = []
pbar = tqdm(total=iterations)
q = Queue()

for i, worker in enumerate(workers):
    print(f"[{i}] {worker}")
    time.sleep(10)
    j = 0
    num = 0
    if worker == "clipped":
        j = i
        num = threads1
    elif worker == "rejected":
        j = i - threads1
        num = threads2
    else:
        pass

    p = Process(target=dump_3m, args = [j, num, engine, worker, cycles, q, path, dataset], daemon=False)
    try:
        p.start()
        processes.append(p)
    except:
        pass

progress = 0
while progress < iterations:
    if not q.empty():
        q.get()
        pbar.update(1)
        progress += 1
    time.sleep(0.2)

for proc in processes:
    proc.join()

print (f"Job ended")


  0%|          | 0/25 [00:00<?, ?it/s]

[0] clipped
[1] clipped
[2] rejected
[3] rejected
[4] rejected
Job ended
