In [1]:
import os
import re
import requests
import json
import boto3
from datetime import datetime
from ratelimit import limits, RateLimitException, sleep_and_retry
from backoff import on_exception, expo
import local.config as conf

In [30]:
class semanticScholar():
    
    def __init__(self, release_id=None):
        
        api_key = conf.semanticScholar.api_key
        
        self.headers = headers= {"X-API-KEY": api_key}
        self.release_id = release_id
        
        if release_id is None:
            print("no release id provided, using latest")
            self.release_id = max(self.query_dataset_api())
        
    @sleep_and_retry
    @limits(calls=10, period=32)
    def query_dataset_api(self, dataset=None):
    
        if self.release_id is None:
            url = "https://api.semanticscholar.org/datasets/v1/release/"
        elif dataset is None:
            url = f"https://api.semanticscholar.org/datasets/v1/release/{self.release_id}"
        else:
            url = f"https://api.semanticscholar.org/datasets/v1/release/{self.release_id}/dataset/{dataset}"
    
        response = requests.get(f"{url}", headers=self.headers).json()
        
        return response
    
    def loop_api(self):
        
        results = []
        self.query_date = datetime.today().strftime('%Y-%m-%d')
        
        datasets = self.query_dataset_api()
        
        for r in datasets["datasets"]:
            result = self.query_dataset_api(dataset=r["name"])
            results.append(result)
        
        return results
    
    def results_to_json(self, results):
        
        root = conf.links.local_root
        path = f"{root}/s2/release_id={self.release_id}/query_date={self.query_date}"
        
        if not os.path.exists(f"{path}"):
            os.makedirs(f"{path}")

        with open(f"{path}/results.json", "w") as f:
            f.write(json.dumps(results, indent=2))
            
    def write_results(self):
        
        results = self.loop_api()
        self.results_to_json(results=results)
    
    def download_datasets(self, results):
        
        for dataset in results:
            dataset_name = dataset["name"]
            
            for file in dataset["files"]:
                with requests.get(test_file, stream=True) as r:
                    filename = re.search(r"\/([^\/]+)\?", test_file).group(1)
                    r.raise_for_status()
                    with open(f"local/test_files/{filename}", 'wb') as f:
                        for chunk in r.iter_content(chunk_size=50000000):
                            f.write(chunk)
        
        

In [32]:
test = semanticScholar()
results = test.loop_api()

no release id provided, using latest


In [68]:
test_file = "https://ai2-s2ag.s3.amazonaws.com/staging/2023-01-03/abstracts/20230106_081034_00086_idkfw_0e606fbd-510c-49ab-9720-21572904cdc3.gz?AWSAccessKeyId=ASIA5BJLZJPW4DLZLWEK&Signature=JNUypOy9t4LD%2FtdCV8lR0UOmJ24%3D&x-amz-security-token=IQoJb3JpZ2luX2VjEE4aCXVzLXdlc3QtMiJHMEUCIFOWVBDGybVfP%2BxPvLgaYZMoognLXQTWqQBdC4H1kX96AiEAnDqpgPqUin0q7V%2B2pTC9t3vVuCTXQ%2B7VZEzareBcTSIqiAQIp%2F%2F%2F%2F%2F%2F%2F%2F%2F%2F%2FARAAGgw4OTYxMjkzODc1MDEiDIlFp8kPUysl0NiqYSrcA%2FGu%2BJbM%2FPngLKPkofHom9XTqH%2BlFVsqCbChJUVDV1%2BipDMUqIqC%2BPIzkZkcm1Okb7jRjwHybBsnHchTEQAfoZDFAqSUU2vMmYMDXVlo4Zj5vd4Mq%2Fn%2FEm0ZvW1zK78pMW3V5zXbx%2BEnrKKQFH4f1%2BIw8tP%2BQqOXcNJfbQ%2BkvrvDWp%2FvSRv9idkUZ1fFlWS7fdGOdO0ZvNOMgwYiohIf1GiMKh%2BEPUSFTXD4Ts4Sm2R5vAzxMc6SsRbgiJ6jz77YzD3usw%2Bae6NhuuZuufvbNH8QGj%2Bk2mgPm9S3e1sNBxN%2BrniCAwGMIuY3wqgd%2FQP4kRLyjT2bsXHr9dRqlYH%2Bj0bGC1OFAyo2pwDsrvKZ9MAmd5qrLWAZztkjIJOtq0BBFVGtAbUafJdvDqK6XdtKOj%2FBrI%2BaIwm7jXORpQDSSp6d8oRTspWaAG4r%2BT7k3yMNJIa56l0zvN%2BFpCY%2FcrBbkUXPtSCocSRoANcgqs4Bf5kpt1zbaaqZnga2xdUVfnEp6wgNoZDa9eMuD17Sge9C6mNoQF8l%2FizLAX62of%2FbMoNsYkSrKXP8fg8%2BHP9AIJGD9hR7Bd5pIpvtd2fKiX%2F2v0rF27cAKb%2BBVdI%2FziSP5a%2Bvo1joRcr6pYyy%2FA8iMMT9%2Bp0GOqUB0eiHfVBLIJx7KMViqUWorUm8GkHM8zLLVU4xmiZiAyq2d97ZUVw9ZPcPJ0s6cr%2BzK2sd9bpdb4RVrtrHcVPbFXfL1YwZ0fFabK0v%2FGRQGYxzU8TnizbfcNh5chiBJPYvONl9ppqpTevR4fQ5xRruWsrdXEsiOV9pctdcPuAEeOOL1y%2Bxzq2HTj%2BRqO6kRyea78AnRTeV8xmBFbxZA3TYKhF7bZ1x&Expires=1674052895"

with requests.get(test_file, stream=True) as r:
    filename = re.search(r"\/([^\/]+)\?", test_file).group(1)
    r.raise_for_status()
    with open(f"/Users/josepherlanger/Projects/mcc/pgETL/downloadData/local/test_files/{filename}", 'wb') as f:
        for chunk in r.iter_content(chunk_size=50000000):
            f.write(chunk)

In [78]:
import gzip
import shutil

local_file_locations = ["local/test_files/0230106_081034_00086_idkfw_0e606fbd-510c-49ab-9720-21572904cdc3.gz"]

for file in local_file_locations:
    with gzip.open(file, 'rb') as f_in:
        with open('local/test_files/test.json', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

In [82]:
local_file_locations = ["local/test_files/0230106_081034_00086_idkfw_0e606fbd-510c-49ab-9720-21572904cdc3.gz"]

for file in local_file_locations:
    
    print(re.sub(r'.gz$','.json',file))

local/test_files/0230106_081034_00086_idkfw_0e606fbd-510c-49ab-9720-21572904cdc3.json


In [None]:
#class openAlex(self):
    #todo can probably just sync

client = boto3.client('s3')

results = client.list_objects_v2(Bucket="openalex", Prefix= "data/", Delimiter="/")
latestResults = []

datasets = []
for r in results["CommonPrefixes"]:
    datasets.append(r["Prefix"])

for d in datasets:
    results = client.list_objects_v2(Bucket="openalex", Prefix= d, Delimiter="/")
    
    dates = []
    for r in results["CommonPrefixes"]:
        dates.append(r["Prefix"])
    
    latest = {"dataset": d,
              "newest": max(dates)
             }
    latestResults.append(latest)

print(latestResults)

[{'dataset': 'data/authors/', 'newest': 'data/authors/updated_date=2022-12-21/'}, {'dataset': 'data/concepts/', 'newest': 'data/concepts/updated_date=2022-12-21/'}, {'dataset': 'data/institutions/', 'newest': 'data/institutions/updated_date=2022-12-21/'}, {'dataset': 'data/merged_ids/', 'newest': 'data/merged_ids/works/'}, {'dataset': 'data/venues/', 'newest': 'data/venues/updated_date=2022-12-21/'}, {'dataset': 'data/works/', 'newest': 'data/works/updated_date=2022-12-21/'}]


In [None]:
#class openAlex(self):
    #todo can probably just sync

client = boto3.client('s3')

results = client.list_objects_v2(Bucket="openalex", Prefix= "data/", Delimiter="/")
latestResults = []

for r in results["CommonPrefixes"]:
    for d in r["Prefix"]:
        results = client.list_objects_v2(Bucket="openalex", Prefix= d, Delimiter="/")
        latestResults.append(results)
    
print(latestResults)

In [None]:
s3_client = boto3.client('s3')

def sync_dir(prefix, local, bucket, client=s3_client):
    """
    params:
    - prefix: pattern to match in s3
    - local: local path to folder in which to place files
    - bucket: s3 bucket with target contents
    - client: initialized s3 client object
    """
    keys = []
    dirs = []
    next_token = ''
    base_kwargs = {
        'Bucket':bucket,
        'Prefix':prefix,
    }
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update({'ContinuationToken': next_token})
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    #for d in dirs:
    #    dest_pathname = os.path.join(local, d)
    #    if not os.path.exists(os.path.dirname(dest_pathname)):
    #        os.makedirs(os.path.dirname(dest_pathname))
    #for k in keys:
    #    dest_pathname = os.path.join(local, k)
    #    if not os.path.exists(os.path.dirname(dest_pathname)):
    #        os.makedirs(os.path.dirname(dest_pathname))
    #    client.download_file(bucket, k, dest_pathname)
    return keys, dirs

k, d = sync_dir(prefix="data/", local="", bucket="openalex")

print(d)
print(k)

In [170]:
s3_client = boto3.client('s3')

def read_s3_contents(bucket_name, key):
    
    response = s3_client.get_object(Bucket=bucket_name, Key=key)   
    return response['Body'].read()

def get_manifest(prefix, local, bucket, client=s3_client):

    keys = []
    dirs = []
    next_token = ''
    base_kwargs = {
        'Bucket':bucket,
        'Prefix':prefix,
    }
    
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update({'ContinuationToken': next_token})
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if (k[-1] != '/') & (bool(re.search("\/manifest$",k))):
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
     
    manifest = [json.loads(read_s3_contents(bucket_name=bucket, key=k).decode("UTF-8")) for k in keys]
    return manifest

def build_download_dict(manifest):
    return [row["url"] for item in manifest for row in item["entries"]]
    

manifest = get_manifest(prefix="data/", local="", bucket="openalex")

In [172]:
test = [row["url"] for item in manifest for row in item["entries"]]

In [174]:
map(re.search("s3:\/\/(.+)").group(1),test)

TypeError: search() missing 1 required positional argument: 'string'

In [184]:
def regex_path(row, root_path="/var/data/"):
    path = re.compile("s3:\/\/(.+)")
    return root_path+re.search(path,row).group(1)

list(map(regex_path, test))

['/var/data/openalex/data/authors/updated_date=2022-07-19/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-15/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-16/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-18/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-19/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-19/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_002.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-21/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-21/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_0

In [183]:
list(result)

['/var/data/openalex/data/authors/updated_date=2022-07-19/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-15/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-16/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-18/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-19/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-19/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-20/part_002.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-21/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-21/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_000.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_001.gz',
 '/var/data/openalex/data/authors/updated_date=2022-08-22/part_0