In [4]:
# from utils import _default_inputs, _display_input_help
import os, itertools, shutil, subprocess, ast, yaml, logging,glob

In [2]:
def process_restart(recreate_folder_level=None, save_path=None, YEARS=None, datasets=None, graph_components=None, embedding_modes=None, models=None, controls=None):
    if recreate_folder_level is not None:
        recreate_folder_level = recreate_folder_level[0]
        print(f"Restart values found as: {recreate_folder_level}. Recreating the paths for the restart values")
        if recreate_folder_level == 'years':
            paths = itertools.product(YEARS)
        elif recreate_folder_level == 'datasets':
            paths = itertools.product(YEARS, datasets)
        elif recreate_folder_level == 'graph_components':
            paths = itertools.product(YEARS, datasets, graph_components)
        elif recreate_folder_level == 'embedding_modes':
            paths = itertools.product(YEARS, datasets, graph_components, embedding_modes)
        elif recreate_folder_level == 'models':
            paths = itertools.product(YEARS, datasets, graph_components, embedding_modes, models)
        elif recreate_folder_level == 'controls':
            paths = itertools.product(YEARS, datasets, graph_components, embedding_modes, models, controls)
        else:
            raise ValueError(f"Restart level: {recreate_folder_level} not recognized. Please use 'years', 'datasets', 'graph_components', 'embedding_modes', 'models', 'controls'")
        
        for comb in paths:
            path = os.path.join(save_path, *comb)
            if os.path.exists(path):
                shutil.rmtree(path)
            print(path)
           
def check_list_inputs(level, actual_input, allowed_input):
    if actual_input is None:
        if level not in ['controls', 'restart_level']:
            raise ValueError(f"Input is None. Please use ATLEAST ONE of the following: {allowed_input}")
        else :
            logging.info(f"{level} is None")
    else: 
        for item in actual_input:
            if item not in allowed_input:
                raise ValueError(f"Input {item} not recognized. Please use ATLEAST ONE of the following: {allowed_input}") 
    return None

In [3]:
def process_inputs(user_inputs):
    process_input = _default_inputs()
    for key, value in user_inputs.items():
        try:
            process_input[key] = value
        except KeyError:
            logging.error(f"Key {key} is not a valid input. Ignoring it and continuing...")
            _display_input_help()
        
    consec_yrs_gat = process_input['consec_yrs_gat']
    consec_yrs_auth = process_input['consec_yrs_auth']
    start_year = process_input['start_year']
    end_year = process_input['end_year']
    save_path = process_input['save_path']


    check_list_inputs('datasets',process_input['datasets'], ['openalex','arxiv'])
    check_list_inputs('graph_components',process_input['graph_components'], ['full','lcc'])
    check_list_inputs('embedding_modes',process_input['embedding_modes'], ['tfidf','bert'])
    check_list_inputs('models',process_input['models'], ['zeroshot','gat','gat_graph_embed'])
    check_list_inputs('controls',process_input['controls'], ['shuffle_y','shuffle_x', None])
    check_list_inputs('recreate_folder_level',process_input['recreate_folder_level'], ['years', 'datasets', 'graph_components', 'embedding_modes', 'models', 'controls', None])
    
    
    if (consec_yrs_gat is not None and (consec_yrs_gat > end_year - start_year + 1)) or (consec_yrs_auth is not None and (consec_yrs_auth > end_year - start_year + 1)):
        raise ValueError("Number of consecutive years for GAT/ common authors cannot be more than the total number of years.")

    YEARS = [str(year) for year in range(start_year, end_year+1)]
    if (consec_yrs_gat is not None and consec_yrs_gat > 1) :
        logging.info(f"Consecutive years for GAT is set to {consec_yrs_gat}. This will create all possible gat_X_X+{consec_yrs_gat-1} years")
        year_consec_gat = [f'gat_{year}_{year+consec_yrs_gat-1}' for year in range(start_year, end_year + 2 - consec_yrs_gat)]
        YEARS = YEARS + year_consec_gat

    if (consec_yrs_auth is not None and consec_yrs_auth > 1) :
        logging.info(f"Consecutive years for common authors is set to {consec_yrs_auth}. This will create all possible auth_X_X+{consec_yrs_auth-1} years")
        year_consec_auth = [f'auth_{year}_{year+consec_yrs_auth-1}' for year in range(start_year, end_year + 2 - consec_yrs_auth)]
        YEARS = YEARS + year_consec_auth

    if not os.path.exists(save_path):
        os.makedirs(save_path, exist_ok=False, mode=0o755)
        
    return process_input, YEARS


In [4]:
def shell_script_cpu(save_path, commands,job_name):
    sbatch_script = f"""#!/bin/bash
#SBATCH --job-name={job_name}
#SBATCH --nodes=1
#SBATCH --cluster=htc
#SBATCH --array=1-{str(len(commands))}
#SBATCH -t 0-06:00:00
#SBATCH --output={save_path}/slurm_outs/{job_name}/%x_%A_%a.out
#SBATCH --ntasks=16
#SBATCH --cpus-per-task=4
#SBATCH --mem-per-cpu=10G
##SBATCH --mail-user=swk25@pitt.edu
##SBATCH --mail-type=END,FAIL

# module purge
# module load gcc/8.2.0
# module load python/anaconda3.10-2022.10
# source activate /ix/djishnu/Swapnil/.conda/envs/coauth_env/

"""
    return sbatch_script

def shell_script_gpu(save_path, commands,job_name):
    sbatch_script = f"""#!/bin/bash
#SBATCH --job-name={job_name}
#SBATCH --nodes=1
#SBATCH --gres=gpu:1
#SBATCH --cluster=gpu
#SBATCH --ntasks=1
#SBATCH --cpus-per-task={1 if '3_' in job_name else 4}
#SBATCH --mem-per-cpu=40G ### Will allocate 40*4 to each gpu ## To preven cuda oom
#SBATCH --partition=l40s,a100,gtx1080,a100_nvlink
#SBATCH --array=1-{str(len(commands))}
#SBATCH -t 0-06:00:00
#SBATCH --output={save_path}/slurm_outs/{job_name}/%x_%A_%a.out
##SBATCH --mail-user=swk25@pitt.edu
##SBATCH --mail-type=END,FAIL

# module purge
# module load gcc/8.2.0
# module load python/anaconda3.10-2022.10
# source activate /ix/djishnu/Swapnil/.conda/envs/coauth_env/

"""
    return sbatch_script

In [5]:
with open('input.yaml', 'r') as file:
    user_inputs = yaml.safe_load(file)

FINAL_INPUTS, YEARS = process_inputs(user_inputs)
read_path = FINAL_INPUTS['read_path']
save_path =FINAL_INPUTS['save_path']
years = YEARS
datasets = FINAL_INPUTS['datasets']
graph_components = FINAL_INPUTS['graph_components']
embedding_modes = FINAL_INPUTS['embedding_modes']
models = FINAL_INPUTS['models']
controls = FINAL_INPUTS['controls']

all_combinations = list(itertools.product(years, datasets, graph_components, embedding_modes, models, controls))
if FINAL_INPUTS['recreate_folder_level'][0] is not None:
        process_restart(FINAL_INPUTS['recreate_folder_level'], save_path,years, datasets, graph_components, embedding_modes, models, controls)
        for comb in all_combinations:
            path = os.path.join(save_path, *comb)
            if not os.path.exists(path):
                os.makedirs(path, exist_ok=False, mode=0o755)

def develop_chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]
    return lst

def step1(arguments, job_name):
    if len(arguments) >500:
        chunks = develop_chunks(arguments, 500)
    else:
        chunks = [arguments]
    for i, chunk in enumerate(chunks):
        if os.path.exists(f'{save_path}/slurm_scripts/{job_name}_{i}.sh'):
            os.remove(f'{save_path}/slurm_scripts/{job_name}_{i}.sh')
        os.makedirs(f'{save_path}/slurm_outs/{job_name}_{i}', exist_ok=True, mode=0o755)
        with open(f'{save_path}/slurm_scripts/{job_name}_{i}.sh', "w", encoding="utf-8") as f:
            f.write(shell_script_cpu(save_path, chunk, job_name = f"{job_name}_{i}" ))
            f.write("\n")
            f.write("commands=(\n")
            for combo in chunk:
                f.write(f"\t \"python 1_data_downloading.py --parameters \"\\\"\"({YEARS},'{combo}')\"\\\"\" --read_path '{read_path}' --save_path '{save_path}' \" \n ")
            f.write(")\n")
            f.write("\n")
            f.write("eval ${commands[$SLURM_ARRAY_TASK_ID-1]}\n")
            f.write("\ncrc-job-stats\n")
    return

def step2(arguments, job_name):
    if len(arguments) >500:
        chunks = develop_chunks(arguments, 500)
    else:
        chunks = [arguments]
    for i, chunk in enumerate(chunks):
        if os.path.exists(f'{save_path}/slurm_scripts/{job_name}_{i}.sh'):
            os.remove(f'{save_path}/slurm_scripts/{job_name}_{i}.sh')
        os.makedirs(f'{save_path}/slurm_outs/{job_name}_{i}', exist_ok=True, mode=0o755)
        with open(f'{save_path}/slurm_scripts/{job_name}_{i}.sh', "w", encoding="utf-8") as f:
            f.write(shell_script_gpu(save_path, chunk, job_name = f"{job_name}_{i}" ))
            f.write("\n")
            f.write("commands=(\n")
            for combo in chunk:
                f.write(f"\t \"python 2_embedding_datasets.py --parameters \"\\\"\"{combo}\"\\\"\" --save_path '{save_path}' \" \n ")
            f.write(")\n")
            f.write("\n")
            f.write("eval ${commands[$SLURM_ARRAY_TASK_ID-1]}\n")
            f.write("\ncrc-job-stats\n")
    return

def step3(arguments, job_name):
    if len(arguments) >500:
        chunks = develop_chunks(arguments, 500)
    else:
        chunks = [arguments]
    for i, chunk in enumerate(chunks):
        if os.path.exists(f'{save_path}/slurm_scripts/{job_name}_{i}.sh'):
            os.remove(f'{save_path}/slurm_scripts/{job_name}_{i}.sh')
        os.makedirs(f'{save_path}/slurm_outs/{job_name}_{i}', exist_ok=True, mode=0o755)
        with open(f'{save_path}/slurm_scripts/{job_name}_{i}.sh', "w", encoding="utf-8") as f:
            f.write(shell_script_gpu(save_path, chunk, job_name = f"{job_name}_{i}" ))
            f.write("\n")
            f.write("commands=(\n")
            for combo in chunk:
                f.write(f"\t \"python 3_preparing_objects_network.py --parameters \"\\\"\"{combo}\"\\\"\" --save_path '{save_path}' \" \n ")
            f.write(")\n")
            f.write("\n")
            f.write("eval ${commands[$SLURM_ARRAY_TASK_ID-1]}\n")
            f.write("\ncrc-job-stats\n")
    return

def step4(arguments, job_name):
    if len(arguments) >500:
        chunks = develop_chunks(arguments, 500)
    else:
        chunks = [arguments]
    for i, chunk in enumerate(chunks):
        if os.path.exists(f'{save_path}/slurm_scripts/{job_name}_{i}.sh'):
            os.remove(f'{save_path}/slurm_scripts/{job_name}_{i}.sh')
        os.makedirs(f'{save_path}/slurm_outs/{job_name}_{i}', exist_ok=True, mode=0o755)
        with open(f'{save_path}/slurm_scripts/{job_name}_{i}.sh', "w", encoding="utf-8") as f:
            f.write(shell_script_gpu(save_path, chunk, job_name = f"{job_name}_{i}" ))
            f.write("\n")
            f.write("commands=(\n")
            for combo in chunk:
                if 'zeroshot' in combo:
                    f.write(f"\t \"python 4.1_zeroshot.py --parameters \"\\\"\"{combo}\"\\\"\" --save_path '{save_path}' \" \n ")
                elif 'gat' in combo or 'gat_graph_embed' in combo:
                    f.write(f"\t \"python 4.2_gat_gat_embed.py --parameters \"\\\"\"{combo}\"\\\"\" --save_path '{save_path}' \" \n ")
            f.write(")\n")
            f.write("\n")
            f.write("eval ${commands[$SLURM_ARRAY_TASK_ID-1]}\n")
            f.write("\ncrc-job-stats\n")
    return

# def step5(arguments, job_name):
#     job_name = '5.1_openalex_full'
#     os.makedirs(f'{save_path}/slurm_outs/{job_name}', exist_ok=True, mode=0o755)
#     for combo in arguments:
#         command = f"""\t "python 5.1_openalex_full.py --parameters "{combo}" \n" """
#         script_name = '_'.join(map(str, combo)) + '.sh'
#         with open(script_name, "w") as f:
#             f.write(shell_script_gpu(save_path, command, job_name = job_name))

#     subprocess.run(["sbatch", script_name])
#
        

FileNotFoundError: [Errno 2] No such file or directory: 'input.yaml'

In [8]:
def run_job(step):
    os.makedirs(f'{save_path}/slurm_scripts/', exist_ok=True, mode=0o755)
    if step == 1:
        job_name = '1_data_downloading'
        arguments = FINAL_INPUTS['datasets']
        step1(arguments,job_name)
    elif step == 2:
        job_name = '2_embedding_datasets'
        arguments = list(itertools.product(YEARS, datasets, graph_components, embedding_modes,[None], [None]))
        step2(arguments,job_name)
    elif step == 3:
        job_name = '3_preparing_objects_network'
        arguments = list(itertools.product(YEARS, datasets, graph_components, embedding_modes, models, [None]))
        arguments = arguments + list(itertools.product(YEARS, datasets, graph_components, embedding_modes, models, controls))
        step3(arguments,job_name)
    elif step == 4:
        job_name = '4_running_models'
        arguments = list(itertools.product(YEARS, datasets, graph_components, embedding_modes, models, [None]))
        arguments = arguments + list(itertools.product(YEARS, datasets, graph_components, embedding_modes, models, controls))
        step4(arguments,job_name)
    # elif step == 5:
    #     job_name = '5_compiling_results'
    #     arguments = list(itertools.product(YEARS, datasets, graph_components, embedding_modes, models, [None]))
    #     step5(arguments,job_name)
    else:
        raise ValueError(f"Step {step} not recognized. Please use 1,2,3,4")
    
    script_files = glob.glob(f'{save_path}/slurm_scripts/{job_name}*.sh')
    for script_file in script_files:
        subprocess.run(["sbatch", script_file])
        logging.info(f"Submitted {script_file}")

In [12]:
run_job(2)

Submitted batch job 884433 on cluster gpu


In [None]:
import os, urllib, json, duckdb, logging, shutil, multiprocessing as mp
import urllib.request

def get_data(url, record_count, save_path):
    s3_path = url
    save_name = url.split('/')[-2] + '_' + url.split('/')[-1].split('.')[0]
    if not os.path.exists(f"{save_path}/openalex_raw_data/{save_name}.json"):
        logging.info(f"Downloading data from {url} with {record_count} records...")
        conn = duckdb.connect()
        conn.execute("INSTALL aws; INSTALL httpfs; LOAD aws; LOAD httpfs; LOAD 's3';")
        conn.execute("CREATE SECRET (TYPE S3, PROVIDER CREDENTIAL_CHAIN);")
        conn.execute("SET threads TO 4; SET enable_progress_bar = false;") ## To avoid oom error
        conn.execute("SET temp_directory = '{save_path}/temp';")
        
        # order FROM(identifying the data source) -> WHERE clause (applying filters) ->SELECT clause (choosing which columns to display). 
        # cannot select columns based on the results of a filter
        # because the columns to be returned are determined before the WHERE clause filters are applied.
        query= f"""
                COPY (
                SELECT  id,
                        display_name,
                        publication_year,
                        publication_date,
                        primary_location,
                        open_access,
                        indexed_in,
                        institutions_distinct_count,
                        authorships
                FROM read_json('{s3_path}')
                WHERE   type = 'article' AND
                        is_paratext = false AND
                        is_retracted = false AND
                        publication_year >=2000 AND
                        publication_year <=2023
                ) TO 
                '{save_path}/openalex_raw_data/{save_name}.json' (FORMAT JSON);
                """
        papers = 0
        if record_count > 0:
            try:
                query_result = conn.execute(query).fetchall()
                papers = query_result[0][0]
            except:
                # query returned no result. Might be because filtered out based on filters or some error
                papers = -1
                logging.error(f"Query returned no result for {url}")
        else:
            papers = record_count
        logging.info(f"Downloaded {papers} papers from {url}")
        conn.close()
        return (url, papers)
    else:
        logging.warning(f"Data already exists for {url}. Skipping download...")
        return (url, -1)

def download_openalex(save_path, force_download=False):
    try:
        if os.path.exists(f'{save_path}, openalex_raw_data') and force_download == True:
            logging.warning(f"Data already exists in {save_path}/openalex_raw_data. force_download is set to True. Deleting existing data... ")
            shutil.rmtree(f"{save_path}/openalex_raw_data")
            logging.info(f"Deleted existing data in {save_path}/openalex_raw_data. Downloading fresh data...")
            os.makedirs(f"{save_path}/openalex_raw_data", exist_ok=True)
            urllib.request.urlretrieve("https://openalex.s3.amazonaws.com/data/works/manifest", f"{save_path}/openalex_raw_data/manifest")
            with open(f"{save_path}/openalex_raw_data/manifest", 'r') as file:
                manifest_data = json.load(file)
            url_count_list = [(entry['url'], entry['meta']['record_count'],save_path) for entry in manifest_data['entries']]

            with mp.Pool(processes=16) as pool:
                results = pool.starmap(get_data, url_count_list)
            logging.info("Data download step completed. Creating metadata...")
            urls_parsed_dict = {result[0]: result[1] for result in results}
        elif os.path.exists(f'{save_path}/openalex_raw_data') and force_download == False:
            urls_parsed_dict = {'NA': -1}
            logging.warning(f"Data already exists in {save_path}/openalex_raw_data. force_download is set to False. Skipping download...")
        else:
            logging.info(f"Downloading data from OpenAlex...")
            os.makedirs(f"{save_path}/openalex_raw_data", exist_ok=True)
            urllib.request.urlretrieve("https://openalex.s3.amazonaws.com/data/works/manifest", f"{save_path}/openalex_raw_data/manifest")
            with open(f"{save_path}/openalex_raw_data/manifest", 'r') as file:
                manifest_data = json.load(file)
            url_count_list = [(entry['url'], entry['meta']['record_count'],save_path) for entry in manifest_data['entries']]

            with mp.Pool(processes=16) as pool:
                results = pool.starmap(get_data, url_count_list)
            logging.info("Data download step completed. Creating metadata...")
            urls_parsed_dict = {result[0]: result[1] for result in results}
    except Exception as e:
        urls_parsed_dict = {'NA': -1}
        logging.error(f'Error: {e}')
    return urls_parsed_dict

NameError: name 'save_path' is not defined