In [None]:
from dept import *

# AWS: boto3

In [None]:
import boto3

# read config file
aws_config = read_file('configs/aws.json')
aws_session = boto3.Session(
    aws_access_key_id=aws_config.get('aws_access_key_id'),
    aws_secret_access_key=aws_config.get('aws_secret_access_key')
    )

## boto3: S3 client

In [None]:
# instantiate S3 client
s3_client = aws_session.client('s3')

In [None]:
# create a new bucket
s3_client.create_bucket(Bucket=aws_config.get('bucket_name'))

In [None]:
# list S3 buckets
print_dict(s3_client.list_buckets())

In [None]:
# create a test file
file_path = norm_path('sandbox/test_file.txt')
content = 'this is a test file.'

write_file(content=content, file_path=file_path)

In [None]:
# upload file
file_key='test-upload-file'

s3_client.upload_file(
    Filename=file_path,
    Bucket=aws_config.get('bucket_name'),
    Key=file_key
    )

In [None]:
# download file from S3
destination_file_path = norm_path('sandbox/test_file_download.txt')

s3_client.download_file(
    Filename=destination_file_path,
    Bucket=aws_config.get('bucket_name'),
    Key=file_key
    )

In [None]:
# delete S3 object
s3_client.delete_object(
    Bucket=aws_config.get('bucket_name'),
    Key=file_key
    )

## boto3: S3 resource

In [None]:
# instantiate S3 resource connector
s3 = aws_session.resource('s3')

In [None]:
# create a bucket via S3 resource
s3.create_bucket(Bucket=aws_config.get('bucket_name'))

In [None]:
# interact with S3 bucket 
bucket = s3.Bucket(name=aws_config.get('bucket_name'))

# upload file
bucket.upload_file(
    Filename=file_path,
    Key=file_key
)

In [None]:
# list all objects
print(*bucket.objects.all())

In [None]:
# download file from a bucket
bucket.download_file(
    Filename=destination_file_path,
    Key=file_key
)

In [None]:
# filter bucket objects
print(*bucket.objects.filter(Prefix='test'))

## boto3: deleting S3 buckets

In [21]:
# get S3 bucket 
bucket = s3.Bucket(name=aws_config.get('bucket_name'))

# delete all S3 bucket objects -> only an empty bucket can be deleted
bucket.objects.all().delete()

# delete empty bucket
bucket.delete()

{'ResponseMetadata': {'RequestId': 'R2W3NXQCMF3HJ7CC',
  'HostId': 'OHipU/RgXsy1C20D0mr66ZNo6ZtSMiHCx4YQqIruNy88GnrA1wqUUEZndtNdRK9kFbRPCke8H6E=',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'x-amz-id-2': 'OHipU/RgXsy1C20D0mr66ZNo6ZtSMiHCx4YQqIruNy88GnrA1wqUUEZndtNdRK9kFbRPCke8H6E=',
   'x-amz-request-id': 'R2W3NXQCMF3HJ7CC',
   'date': 'Thu, 28 Dec 2023 16:24:49 GMT',
   'server': 'AmazonS3'},
  'RetryAttempts': 0}}

## boto3: S3 paginators

- provides a mean to list objects in chunks (pages)

In [None]:
# get paginator -> S3 client is required!
paginator = s3_client.get_paginator('list_objects_v2')
results = paginator.paginate(
    Bucket=aws_config.get('bucket_name')
)

In [None]:
# iterate over results
for id, item in enumerate(results.search('Contents')):
    print(f"index: {id}")
    print_dict(item)

## boto3: waiters

- regularly checks for a specific condition to be met

In [None]:
# define waiter parameters
waiter = s3_client.get_waiter('bucket_exists')
wait_config = {
    'Delay': 10,             # check every 10 seconds 
    'MaxAttempts': 6         # exit after 6 unsuccessful attemps
}

# testing bucket name
test_bucket_name = 'very-weird-bucket-to-check'

In [None]:
print(f'waiting for a bucket: {test_bucket_name}')
waiter.wait(
    Bucket=test_bucket_name,
    WaiterConfig=wait_config
    )

## boto3: file sharing

- using presigned URLs

In [None]:
url = s3_client.generate_presigned_url(
    ClientMethod='get_object',
    Params={
        'Bucket': aws_config.get('bucket_name'), 
        'Key': file_key,
        },
    ExpiresIn=120                 # expires in 120 seconds
)