In [None]:
# Load data from ./data/ to S3 exploration notebook

In [5]:
# Imports
import os
import boto3
import zipfile
import shutil
import progressbar

# Notebook specific
from IPython.display import clear_output

# Exploring data
import json
from pathlib import Path

In [25]:
# Create connections
region = 'us-east-1'
s3_client = boto3.client('s3', region_name=region)

In [26]:
# Retrieve the list of existing buckets
response = s3_client.list_buckets()

In [27]:
# Output the bucket names
print('Existing buckets:')
for bucket in response['Buckets']:
    print(f'  {bucket["Name"]}')

Existing buckets:
  arxiv-etl
  jbma-data-engineering-us-east


In [28]:
# Check if bucket already exists
bucket_name = 'arxiv-etl'
bucket_exists = False
for obj in response['Buckets']:
    if obj['Name'] == bucket_name:
        bucket_exists = True
print(f"Bucket exists: {bucket_exists}")

Bucket exists: True


In [None]:
# Create bucket if it doesn't exist
if not bucket_exists:
    s3_client.create_bucket(Bucket=bucket_name)
    print(f"Bucket {bucket_name} created")
else: 
    print(f"Bucket {bucket_name} already exists")

In [None]:
# Unzip and upload data to bucket - replace existing files as needed

In [20]:
#Unzip files to new folder
# https://stackoverflow.com/questions/3451111/unzipping-files-in-python
path_to_zip_file = '../data/arxiv.zip'
directory_to_extract_to = '../data/loading/'
with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
    zip_ref.extractall(directory_to_extract_to)

In [8]:
# Lets look at those files using ijson to avoid loading entire files
folder_name = '../data/loading/'
directory = os.fsencode(folder_name)
max_lines_to_load = 2

# https://www.aylakhan.tech/?p=27

def load_data(full_name, lines_to_load = 2):
    with open(full_name, 'r') as f:
        data  = []
        lines = 0
        for line in f: 
            data.append(json.loads(line))
            lines += 1
            if lines >= lines_to_load:
                return data
            
for file in os.listdir(directory):
    file_name = os.fsdecode(file)
    full_name = os.path.join(folder_name, file_name)
    file_size = Path(full_name).stat().st_size / 1024 / 1024 # Mb
    if file_name.endswith(".json") and file_size > 500: # Only open large files 
        print("Filename: ", file_name, "\n")
        data = load_data(full_name, max_lines_to_load)
        for line in data:
            print(line)
        print('\n')

Filename:  arxiv-metadata-oai-snapshot.json 

{'id': '0704.0001', 'submitter': 'Pavel Nadolsky', 'authors': "C. Bal\\'azs, E. L. Berger, P. M. Nadolsky, C.-P. Yuan", 'title': 'Calculation of prompt diphoton production cross sections at Tevatron and\n  LHC energies', 'comments': '37 pages, 15 figures; published version', 'journal-ref': 'Phys.Rev.D76:013009,2007', 'doi': '10.1103/PhysRevD.76.013009', 'abstract': '  A fully differential calculation in perturbative quantum chromodynamics is\npresented for the production of massive photon pairs at hadron colliders. All\nnext-to-leading order perturbative contributions from quark-antiquark,\ngluon-(anti)quark, and gluon-gluon subprocesses are included, as well as\nall-orders resummation of initial-state gluon radiation valid at\nnext-to-next-to-leading logarithmic accuracy. The region of phase space is\nspecified in which the calculation is most reliable. Good agreement is\ndemonstrated with data from the Fermilab Tevatron, and predictions a

In [21]:
# Create sample for large file to work on subset initially for testing
for file in os.listdir(directory):
    file_name = os.fsdecode(file)
    full_name = os.path.join(folder_name, file_name)
    file_size = Path(full_name).stat().st_size / 1024 / 1024 # Mb
    if file_name.endswith(".json") and file_size > 500: # Only open large files 
        print("Filename: ", file_name)
        data = load_data(full_name, 150000) # Roughly 10 % of original
        with open(full_name, "w") as outfile:
            for item in data:
                outfile.write(f"{str(item)}\n")
        print('Sample created for {file_name}')

Filename:  arxiv-metadata-oai-snapshot.json
Sample created for {file_name}


In [22]:
# Copy the classification data to loading folder to group it, which lets us sync the folder, and we can delete it after to save space
classification_src = '../data/subject-classifications.csv'
classification_dst = '../data/loading/subject-classifications.csv'
shutil.copyfile(classification_src, classification_dst)

'../data/loading/subject-classifications.csv'

In [23]:
def upload_file(s3_client, folder_name, file_name, bucket_name):
    # https://stackoverflow.com/questions/41827963/track-download-progress-of-s3-file-using-boto3-and-callbacks
    full_name = os.path.join(folder_name, file_name)
    s3_path = f'staging/{file_name}'

    statinfo = os.stat(full_name)

    up_progress = progressbar.progressbar.ProgressBar(maxval=statinfo.st_size)

    up_progress.start()

    def upload_progress(chunk):
        clear_output(wait = True) # Only for IPython (Notebook)
        up_progress.update(up_progress.currval + chunk)

    response = s3_client.upload_file(full_name, bucket_name, s3_path, Callback=upload_progress)

    up_progress.finish()

    return response

In [30]:
# Sync data/loaded folder to s3
# https://dev.to/razcodes/how-to-copy-files-to-s3-using-boto3-41fp

folder_name = '../data/loading/'
directory = os.fsencode(folder_name)

# https://stackoverflow.com/questions/10377998/how-can-i-iterate-over-files-in-a-given-directory

for file in os.listdir(directory):
    file_name = os.fsdecode(file)
    if file_name.endswith(".json") or file_name.endswith(".csv"): 
        print("Uploading", file_name)
        #full_name = os.path.join(folder_name, file_name)
        #response = s3_client.upload_file(full_name, bucket_name, f'staging/{file_name}')
        response = upload_file(s3_client, folder_name, file_name, bucket_name)
        if response is not None:
            print("HTTPStatusCode:", response['ResponseMetadata']['HTTPStatusCode'])

100% |########################################################################|100% |########################################################################|


In [31]:
# Delete data/loaded folder 
shutil.rmtree(folder_name)