In [1]:
import json
import os
import csv
from time import time
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
import datamule as dm

def process_company_concept_file(file_path, output_dir):
    try:
        with open(file_path, 'r') as f:
            company_concepts = json.load(f)

        parsed_data = dm.parse_company_concepts(company_concepts)

        cik = os.path.basename(file_path).replace('CIK', '').replace('.json', '').lstrip('0')
        cik_dir = os.path.join(output_dir, cik)
        os.makedirs(cik_dir, exist_ok=True)

        # Write metadata.csv with file information
        with open(os.path.join(cik_dir, 'metadata.csv'), 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=['cik', 'category', 'fact', 'label', 'description', 'unit', 'file'])
            writer.writeheader()
            for index, item in enumerate(parsed_data, start=1):
                row = {k: v for k, v in item.items() if k != 'table'}
                if item['table']:
                    row['file'] = f"{index:04d}.csv"
                else:
                    row['file'] = ''
                writer.writerow(row)

        # Write table CSVs
        for index, item in enumerate(parsed_data, start=1):
            if item['table']:
                table_file = f"{index:04d}.csv"
                with open(os.path.join(cik_dir, table_file), 'w', newline='') as csvfile:
                    if item['table']:
                        fieldnames = item['table'][0].keys()
                        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                        writer.writeheader()
                        writer.writerows(item['table'])

        return True
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return False

def main():
    start_time = time()

    company_concepts_dir = 'company_concepts'
    output_dir = 'company_concept_csv'
    os.makedirs(output_dir, exist_ok=True)

    json_files = [f for f in os.listdir(company_concepts_dir) if f.endswith('.json')]
    total_files = len(json_files)
    print(f"Total files to process: {total_files}")

    # Use ProcessPoolExecutor for multiprocessing
    with ProcessPoolExecutor() as executor:
        file_paths = [os.path.join(company_concepts_dir, filename) for filename in json_files]
        results = list(tqdm(
            executor.map(process_company_concept_file, file_paths, [output_dir]*len(file_paths)),
            total=total_files,
            desc="Processing files"
        ))

    successful = sum(results)
    print(f"Successfully processed {successful} out of {total_files} files")
    print(f"Processing completed in {time() - start_time:.2f} seconds")

if __name__ == "__main__":
    main()

Total files to process: 7975


Processing files:   0%|          | 0/7975 [00:00<?, ?it/s]


AttributeError: Can't pickle local object '_patch_asyncio.<locals>.run'