In [None]:
import json
import glob


from dotenv import dotenv_values
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
import threading
from tqdm import tqdm
import pickle
import copyreg
import types
import functools
import multiprocessing

from concurrent.futures import ProcessPoolExecutor

import os
import shutil

from papermage.recipes import CoreRecipe
from papermage.magelib import Document

import sqlalchemy
from sqlalchemy import create_engine

from  cord19_plus.data_model.database_setup import *

import logging

# Set the logging level to a value higher than CRITICAL to suppress all logs
logging.disable(logging.FATAL + 1)

In [None]:
from cord19_plus.data_model.database_setup import setup_engine_session
from cord19_plus.data_model import model  

db_vals = dotenv_values("/workspaces/CORD19_Plus/.env")

session = setup_engine_session(db_vals['USER'], db_vals['PASSWORD'], db_vals['ADDRESS'], db_vals['PORT'], db_vals['DB'])
results = session.query(model.Table).all()

In [None]:
root_parse_path = "/workspaces/CORD19_Plus/data/clean/pub_json2/*.json"
table_root_path = "/workspaces/CORD19_Plus/data/clean/tab_json2"
json_paths = sorted(glob.glob(root_parse_path))[:500]


In [None]:
paths = sort_paths_by_file_size(json_paths)

In [None]:
# Fix for pickling instance methods
def _pickle_method(method):
    func_name = method.__func__.__name__
    obj = method.__self__
    cls = method.__self__.__class__
    return getattr, (obj, func_name)

copyreg.pickle(types.MethodType, _pickle_method)

# Moved extract_data function outside the parallel_extract_data function
def extract_data(json_path, table_root_path):
    if os.path.isdir(json_path):
        raise IsADirectoryError(f"Expected a file but found a directory: {json_path}")
    return extract_data_from_json(json_path, table_root_path)

if __name__ == "__main__":
    def parallel_extract_data(json_paths, table_root_path, model, session, num_workers=None):
        # Set multiprocessing context to 'spawn' to avoid issues on some platforms
        ctx = multiprocessing.get_context("spawn")
        with tqdm(total=len(json_paths)) as pbar:
            i = 0
            while i < len(json_paths):
                batch_end = min(i + 100, len(json_paths))
                batch_paths = json_paths[i:batch_end]

                with ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx) as executor:
                    extract_data_partial = functools.partial(extract_data, table_root_path)
                    data_batch = list(tqdm(executor.map(extract_data_partial, batch_paths), total=len(batch_paths), desc="Extracting data", leave=False))

                for data in tqdm(data_batch, desc="Preparing session", leave=False):
                    create_model_objects(data, model, session)

                session.commit()
                i += 100

In [None]:
parallel_extract_data(table_root_path,json_paths, model, session, num_workers=4)