In [89]:
import sys
import os
import re
from multiprocessing import Pool
from itertools import repeat
import warnings
from datetime import datetime
from time import sleep
import hashlib
import pandas as pd
import numpy as np
import json

In [90]:
import warnings
warnings.filterwarnings('ignore')

In [91]:
import bucket_manager as bm

In [92]:
# Initiate timing
start = datetime.now()
# Set the source directory, bucket name, and destination directory
subdir = 'dmu3' # change subdir to sys.argv[1] in script
source_dir = f"/rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/{subdir}"
log = f"{'-'.join(source_dir.split('/')[-3:])}-files.csv"
destination_dir = f"ip005-ras81-lsst-ir-fusion/{subdir}" 
folders = []
folder_files = []
ncores = 1 # change to adjust number of CPUs (= number of concurrent connections)
perform_checksum = True
upload_checksum = False
dryrun = False

In [93]:
# Add titles to log file
with open(log, 'w') as logfile: # elsewhere open(log, 'a')
    logfile.write('LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,CHECKSUM_SIZE,CHECKSUM_KEY\n')

In [94]:
# Setup bucket
s3_host = 'echo.stfc.ac.uk'
keys = bm.get_keys(os.sep.join([os.environ['HOME'],'lsst_keys.json']))
access_key = keys['access_key']
secret_key = keys['secret_key']

In [95]:
s3 = bm.get_resource(access_key, secret_key, s3_host)

In [96]:
s3

s3.ServiceResource()

In [97]:
bucket_name = 'csd3-backup-test'
if dryrun:
    mybucket = 'dummy_bucket'

In [98]:
bucket_list = bm.bucket_list(s3)

In [99]:
bucket_list

['LSST-IR-FUSION',
 'LSST-IR-FUSION_gen3_conversion',
 'dmu4',
 'lsst-dac',
 'lsst-drp-config',
 'lsst-test',
 'lsst-test-3',
 'lsst-test2']

In [100]:
if bucket_name not in bucket_list:
    if not dryrun:
            s3.create_bucket(Bucket=bucket_name)
            print(f'Added bucket: {bucket_name}')
else:
    if not dryrun:
        print(f'Bucket exists: {bucket_name}')
        #sys.exit('Bucket exists.')
    else:
        print(f'Bucket exists: {bucket_name}')
        print('dryrun = True, so continuing.')

Added bucket: csd3-backup-test


In [101]:
bucket = s3.Bucket(bucket_name)

In [102]:
bm.print_objects(bucket)

In [103]:
def upload_to_bucket(s3_host,access_key,secret_key,bucket_name,folder,filename,object_key,perform_checksum,upload_checksum,dryrun):
    s3 = bm.get_resource(access_key, secret_key, s3_host)
    bucket = s3.Bucket(bucket_name)
    file_data = open(filename, 'rb')
    if perform_checksum:
        """
        - Create checksum object
        """
        checksum = hashlib.md5(file_data.read()).hexdigest().encode('utf-8')
        if upload_checksum and not dryrun:
            checksum_key = object_key + '.checksum'
            #create checksum object
            bucket.put_object(body=checksum,ContentEncoding='utf-8',Key=checksum_key)
    """
    - Upload the file to the bucket
    """
    if not dryrun:
        bucket.upload_fileobj(file_data,object_key)

    """
        report actions
        CSV formatted
        header: LOCAL_FOLDER,LOCAL_PATH,FILE_SIZE,BUCKET_NAME,DESTINATION_KEY,CHECKSUM,CHECKSUM_SIZE,CHECKSUM_KEY
    """
    return_string = f'{folder},{filename},{os.stat(filename).st_size},{bucket_name},{object_key}'
    if perform_checksum and upload_checksum:
        return_string += f',{checksum},{len(checksum)},{checksum_key}'
    elif perform_checksum:
        return_string += f',{checksum},n/a,n/a'
    else:
        return_string += ',n/a,n/a,n/a'
    return return_string

In [104]:
def print_stats(log,folder,file_count,total_size,folder_start,folder_end,upload_checksum):
    elapsed = folder_end - folder_start
    print(f'Finished folder {folder}, elapsed time = {elapsed}')
    elapsed_seconds = elapsed.seconds + elapsed.microseconds / 1e6
    avg_file_size = total_size / file_count / 1024**2
    if not upload_checksum:
        print(f'{file_count} files (avg {avg_file_size:.2f} MiB/file) uploaded in {elapsed_seconds:.2f} seconds, {elapsed_seconds/file_count:.2f} s/file',flush=True)
        print(f'{total_size / 1024**2:.2f} MiB uploaded in {elapsed_seconds:.2f} seconds, {total_size / 1024**2 / elapsed_seconds:.2f} MiB/s',flush=True)
    if upload_checksum:
        checksum_size = 32*file_count # checksum byte strings are 32 bytes
        total_size += checksum_size
        file_count *= 2
        print(f'{file_count} files (avg {avg_file_size:.2f} MiB/file) uploaded (including checksum files) in {elapsed_seconds:.2f} seconds, {elapsed_seconds/file_count:.2f} s/file',flush=True)
        print(f'{total_size / 1024**2:.2f} MiB uploaded (including checksum files) in {elapsed_seconds:.2f} seconds, {total_size / 1024**2 / elapsed_seconds:.2f} MiB/s',flush=True)

In [105]:
def process_files(s3_host,access_key,secret_key, bucket_name, current_objects, source_dir, destination_dir, ncores, perform_checksum, upload_checksum, dryrun, log):
    i = 0
    #processed_files = []
    with Pool(ncores) as pool: # use 4 CPUs by default - very little speed-up, might drop multiprocessing and parallelise at shell level
        #recursive loop over local folder
        for folder,subfolders,files in os.walk(source_dir):
            # check folder isn't empty
            if len(files) > 0:
                # all files within folder
                folder_files = [ os.sep.join([folder,filename]) for filename in files ]
                # keys to files on s3
                object_names = [ os.sep.join([destination_dir, os.path.relpath(filename, source_dir)]) for filename in folder_files ]
                print(f'folder_files: {folder_files}')
                print(f'object_names: {object_names}')
                init_len = len(object_names)
                # remove current objects - avoids reuploading
                # could provide overwrite flag if this is desirable
                print(f'current_objects: {current_objects}')
                if all([obj in current_objects for obj in object_names]):
                    #all files in this subfolder already in bucket
                    print(f'Skipping subfoler - all files exist.')
                    continue
                for oni,on in enumerate(object_names):
                    if on in current_objects:
                        object_names.remove(on)
                        del folder_files[oni]
                file_count = len(object_names)
                if init_len - file_count > 0:
                    print(f'Skipping {init_len - file_count} existing files.')
                print(f'folder_files: {folder_files}')
                print(f'object_names: {object_names}')
                folder_start = datetime.now()
                
                print('check for symlinks')
                for f in files:
                    if os.path.islink(f):
                        print(os.path.islink(f))
                        raise Exception("Not dealing with symlinks here yet.")
                # upload files in parallel and log output
                print(f'Uploading {file_count} files from {folder} using {ncores} processes.')
                with open(log, 'a') as logfile:
                    for result in pool.starmap(upload_to_bucket, zip(repeat(s3_host),repeat(access_key),repeat(secret_key), repeat(bucket_name), repeat(folder), folder_files, object_names, repeat(perform_checksum), repeat(upload_checksum), repeat(dryrun))):
                        logfile.write(f'{result}\n')
                folder_end = datetime.now()
                folder_files_size = np.sum(np.array([os.path.getsize(filename) for filename in folder_files]))
                print_stats(log, folder, file_count, folder_files_size, folder_start, folder_end, upload_checksum)

                # testing - stop after 1 folders
                # i+=1
                # if i == 100:
                #     break
            else:
                print(f'Skipping subfoler - empty.')
    # Upload log file
    if not dryrun:
        upload_to_bucket(s3_host,access_key,secret_key,bucket_name, '/', log, os.path.basename(log), False, False, False)

# Go!

In [106]:
a = [1,2,3,4,5]
b = [1,2,3,4,5,6,7]
c = [5,6,7]

In [107]:
[x in b for x in a]

[True, True, True, True, True]

In [108]:
all([x in c for x in a])

False

In [109]:
s3.Object(bucket_name,'ip005-ras81-lsst-ir-fusion/dmu3/data/XMMFULL_DR2_MASKVISTA_HSC-I_2.0as_IRAC2.8as_2020_05_26.fits').delete()

{'ResponseMetadata': {'RequestId': 'tx0000000000000077cbeb5-0065dc830b-2d5dc2782-default',
  'HostId': '',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-request-id': 'tx0000000000000077cbeb5-0065dc830b-2d5dc2782-default',
   'date': 'Mon, 26 Feb 2024 12:24:43 GMT'},
  'RetryAttempts': 0}}

In [110]:
# Process the files in parallel
current_objects = bm.object_list(bucket)
current_objects
print(f'Starting processing at {datetime.now()}, elapsed time = {datetime.now() - start}')
with warnings.catch_warnings():
    warnings.filterwarnings('ignore')
    process_files(s3_host,access_key,secret_key, bucket_name, current_objects, source_dir, destination_dir, ncores, perform_checksum, upload_checksum, dryrun, log)

# Complete
print(f'Finished at {datetime.now()}, elapsed time = {datetime.now() - start}')

Starting processing at 2024-02-26 12:24:43.792610, elapsed time = 0:00:01.686462
folder_files: ['/rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3/readme.md']
object_names: ['ip005-ras81-lsst-ir-fusion/dmu3/readme.md']
current_objects: []
folder_files: ['/rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3/readme.md']
object_names: ['ip005-ras81-lsst-ir-fusion/dmu3/readme.md']
check for symlinks
Uploading 1 files from /rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3 using 1 processes.
Finished folder /rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3, elapsed time = 0:00:00.494064
1 files (avg 0.00 MiB/file) uploaded in 0.49 seconds, 0.49 s/file
0.00 MiB uploaded in 0.49 seconds, 0.00 MiB/s
folder_files: ['/rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3/data/XMMFULL_DR2_MASKVISTA_HSC-I_2.0as_IRAC2.8as_2020_05_26.fits', '/rds/project/rds-rPTGgs6He74/ras81/lsst-ir-fusion/dmu3/data/XMMFULL_DR2_MASKVISTA_Ks_2.0as_IRAC2.8as_2020_06_01.fits']
object_names: ['ip005-ras