In [30]:
import requests
import boto3
import json
import os
import sys
import argparse
import importlib
import transformers
import torch
import pathlib
import smart_open
from IPython.display import display
from sagemaker.huggingface.processing import HuggingFaceProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.session import get_execution_role

# Adding ../01_modules or ./01_modules to the system path so that we can load modules from 
# there as well
if '__file__' in globals():
    script_dir = pathlib.Path(__file__).parent.resolve()
else:
    script_dir = pathlib.Path().absolute()
modules_path_in_dev = os.path.abspath(os.path.join(script_dir, '..', '01_modules'))
modules_path_in_prod = os.path.abspath(os.path.join(script_dir, '01_modules'))
if os.path.exists(modules_path_in_dev):
    sys.path.append(modules_path_in_dev)
if os.path.exists(modules_path_in_prod):
    sys.path.append(modules_path_in_prod)


# # Jupyter only reads a local module the first time after 
# # kernel start. Re-running a cell with 
# # "from mymodulename import *" would not change
# # anything, even if the imported module has since changed.
# # As a workaround, we need to directly load the module, 
# # use importlib.reload to reload it and then import * 
import utils
_ = importlib.reload(utils)
import config
_ = importlib.reload(config) 

utils.py loaded: v0.2.12
config.py loaded: v0.1


In [58]:
s3_client = boto3.client('s3')
openalex_works_source_prefix = f'{config.OPENALEX_S3_RAW_DATA_PREFIX}/data/works_unpartitioned/'
openalex_works_source_path = f's3://{config.DEFAULT_S3_BUCKET_NAME}/{openalex_works_source_prefix}'
openalex_works_target_prefix = f'{config.OPENALEX_S3_RAW_DATA_PREFIX}/data/works_reduced/'
openalex_works_target_path = f's3://{config.DEFAULT_S3_BUCKET_NAME}/{openalex_works_target_prefix}'

_ = """
for key, content in smart_open.s3.iter_bucket(
    bucket_name=config.DEFAULT_S3_BUCKET_NAME,
    prefix=openalex_works_source_prefix,
    accept_key=None,
    key_limit=16,
    workers=16,
    retries=3,
):
    print(key, round(len(content) / 2**10))
"""

file_counter = 0
file_limit = 20
line_limit = 10
total_record_counter = 0
total_record_limit = 15
files = s3_client.list_objects(Bucket=config.DEFAULT_S3_BUCKET_NAME, Prefix='01_data/01_raw/openalex/data/works_unpartitioned/', Delimiter='/', MaxKeys=30)
# print(files)
# print('/n')

def line_has_error(line:str)->bool:
    known_keys = [
        'id', 'doi', 'doi_registration_agency', 'display_name', 'title', 
        'publication_year', 'publication_date', 'language', 'ids', 'primary_location', 
        'best_oa_location', 'type', 'open_access', 'authorships', 'corresponding_author_ids',
        'corresponding_institution_ids', 'cited_by_count', 'summary_stats', 'biblio',
        'is_retracted', 'is_paratext', 'concepts', 'mesh', 'locations_count', 'locations',
        'referenced_works', 'referenced_works_count', 'sustainable_development_goals',
        'grants', 'apc_list', 'apc_paid', 'related_works', 'abstract_inverted_index',
        'counts_by_year', 'cited_by_api_url', 'updated_date', 'created_date', 'updated',
        'authors_count', 'concepts_count', 'type_crossref', 'institutions_distinct_count'
    ]
    if (

        line.find('prospective')>-1 and
        line.find('percutaneous')>-1 and
        line.find('transluminal')>-1 and
        line.find('angioplasty')>-1 and
        line.find('prospective')>-1
    ):
        return 'Might be the problematic file'
    try:
        line_dict = json.loads(line)
    except:
        return 'Could not json.load() line'
    else:
        unknown_keys = list(set(line_dict.keys()) - set(known_keys))
        if len(unknown_keys):
            return f'Unknown keys: {unknown_keys}'
    return False

_ = """
for file_ref in files['Contents']:
    if file_counter < file_limit:
        line_counter = 0
        source_filepath = file_ref['Key']
        source_filename = source_filepath.split('/')[-1]
        with smart_open.open(f's3://{config.DEFAULT_S3_BUCKET_NAME}/{source_filepath}') as fl:
            for line in fl:
                if line_counter >= line_limit or total_record_counter >= total_record_limit:
                    break
                line_error = line_has_error(line)
                if line_error:
                    print(f'{source_filename}:{line_counter:06}|{line_error}|{line}')
                else:
                    print(f'{source_filename}:{line_counter:06}|--')
                line_counter += 1
                total_record_counter += 1
        file_counter += 1
"""

def reduce_line(source_filepath:str, line_counter:int, line:str)->str:
    try:
        line_dict = json.loads(line)
    except:
        print(f'ERROR: Could not parse JSON from [{source_filepath}]:{line_counter}: {line}\n')
        return ''
    else:
        reduced_line = {}
        if 'id' in line_dict:
            reduced_line['id_openalex_long'] = line_dict['id'] # https://openalex.org/W2079861989
            try:
                reduced_line['id_openalex_short'] = line_dict['id'].split('/')[-1][1:] # 2079861989
            except:
                print(f'Warning: could not split {key} in [{source_filepath}]:{line_counter}: {line}')
        else:
            print(f'ERROR: Line does not contain ID [{source_filepath}]:{line_counter}: {line}\n')
            return ''
            
        key = 'doi'
        if key in line_dict:
            reduced_line['id_doi_long'] = line_dict[key]
            try:
                reduced_line['id_doi_short'] = None if line_dict[key] is None else '/'.join(line_dict[key].split('/')[3:])
            except:
                print(f'Warning: could not split {key} in [{source_filepath}]:{line_counter}: {line}')
            
        key = 'display_name'
        if key in line_dict:
            reduced_line[key] = line_dict[key]
            
        key = 'title'
        if key in line_dict:
            reduced_line[key] = line_dict[key]
            
        key = 'language'
        if key in line_dict:
            reduced_line[key] = line_dict[key]
            
        key = 'publication_year'
        if key in line_dict:
            reduced_line[key] = line_dict[key]
            
        key = 'ids'
        if key in line_dict:
            subkey = 'pmid'
            if subkey in line_dict[key]:
                reduced_line[f'id_{subkey}_long'] = line_dict[key][subkey]
                try:
                    reduced_line[f'id_{subkey}_short'] = None if line_dict[key][subkey] is None else '/'.join(line_dict[key][subkey].split('/')[3:])
                except:
                    print(f'Warning: could not split {key}_{subkey} in [{source_filepath}]:{line_counter}: {line}')
            subkey = 'mag'
            if subkey in line_dict[key]:
                reduced_line[f'id_{subkey}'] = line_dict[key][subkey]
        
        key = 'type'
        if key in line_dict:
            reduced_line['item_type'] = line_dict[key]

        key = 'primary_topic'
        if key in line_dict:
            subkey = 'id'
            if subkey in line_dict[key]:
                reduced_line[f'{key}_long_id'] = line_dict[key][subkey] # https://openalex.org/T10062
                try:
                    reduced_line[f'{key}_short_id'] = None if line_dict[key][subkey] is None else line_dict[key][subkey].split('/')[-1] # T10062
                except:
                    print(f'Warning: could not split {key}_{subkey} in [{source_filepath}]:{line_counter}: {line}')
            subkey = 'display_name'
            if subkey in line_dict[key]:
                reduced_line[f'{key}_{subkey}'] = line_dict[key][subkey]
            subkey = 'score'
            if subkey in line_dict[key]:
                reduced_line[f'{key}_{subkey}'] = line_dict[key][subkey]
            subkey = 'subfield'
            if subkey in line_dict[key]:
                subsubkey = 'id'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_long_id'] = line_dict[key][subkey][subsubkey] # https://openalex.org/subfields/1306
                    try:
                        reduced_line[f'{key}_{subkey}_short_id'] = None if line_dict[key][subkey][subsubkey] is None else line_dict[key][subkey][subsubkey].split('/')[-1] # 1306
                    except:
                        print(f'Warning: could not split {key}_{subkey}_{subsubkey} in [{source_filepath}]:{line_counter}: {line}')
                subkey = 'display_name'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_{subsubkey}'] = line_dict[key][subkey][subsubkey]
            subkey = 'field'
            if subkey in line_dict[key]:
                subsubkey = 'id'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_long_id'] = line_dict[key][subkey][subsubkey] # https://openalex.org/subfields/1306
                    try:
                        reduced_line[f'{key}_{subkey}_short_id'] = None if line_dict[key][subkey][subsubkey] is None else line_dict[key][subkey][subsubkey].split('/')[-1] # 1306
                    except:
                        print(f'Warning: could not split {key}_{subkey}_{subsubkey} in [{source_filepath}]:{line_counter}: {line}')
                subkey = 'display_name'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_{subsubkey}'] = line_dict[key][subkey][subsubkey]
            subkey = 'domain'
            if subkey in line_dict[key]:
                subsubkey = 'id'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_long_id'] = line_dict[key][subkey][subsubkey] # https://openalex.org/subfields/1306
                    try:
                        reduced_line[f'{key}_{subkey}_short_id'] = None if line_dict[key][subkey][subsubkey] is None else line_dict[key][subkey][subsubkey].split('/')[-1] # 1306
                    except:
                        print(f'Warning: could not split {key}_{subkey}_{subsubkey} in [{source_filepath}]:{line_counter}: {line}')
                subkey = 'display_name'
                if subsubkey in line_dict[key][subkey]:
                    reduced_line[f'{key}_{subkey}_{subsubkey}'] = line_dict[key][subkey][subsubkey]
    return json.dumps(reduced_line, default=str)

for file_ref in files['Contents']:
    if file_counter < file_limit:
        line_counter = 0
        source_filepath = file_ref['Key']
        target_filepath = source_filepath.replace('works_unpartitioned', 'works_reduced')
        source_filename = source_filepath.split('/')[-1]
        target_file_location = f's3://{config.DEFAULT_S3_BUCKET_NAME}/{target_filepath}'
        with smart_open.open(f's3://{config.DEFAULT_S3_BUCKET_NAME}/{source_filepath}') as file_source:
            with smart_open.open(target_file_location, 'w') as file_target:
                for line in file_source:
                    if line_counter >= line_limit or total_record_counter >= total_record_limit:
                        break
                    reduced_line = reduce_line(source_filepath, line_counter, line)
                    if reduced_line != '':
                        file_target.write(reduced_line)
    
                    # print(f'{source_filename}:{line_counter:06}|')
                    # print(reduced_line)
                    # print('\n\n')
                    line_counter += 1
                    total_record_counter += 1
        print(f'done: {source_filename}, {line_counter}/{total_record_counter}')
        file_counter += 1


done: 000000.gz, 1/1
done: 000001.gz, 2/3
done: 000002.gz, 3/6
done: 000003.gz, 1/7
done: 000004.gz, 4/11
done: 000005.gz, 2/13
done: 000006.gz, 1/14
done: 000007.gz, 1/15
done: 000008.gz, 0/15
done: 000009.gz, 0/15
done: 000010.gz, 0/15
done: 000011.gz, 0/15
done: 000012.gz, 0/15
done: 000013.gz, 0/15
done: 000014.gz, 0/15
done: 000015.gz, 0/15
done: 000016.gz, 0/15
done: 000017.gz, 0/15
done: 000018.gz, 0/15
done: 000019.gz, 0/15
