# AWS S3 CRUD w. Boto3 & Python

In [None]:
import os, boto3

In [None]:
# Retrieve the AWS credentials and region from environment variables
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_session_token = os.getenv("AWS_SESSION_TOKEN")
region_name = os.getenv("AWS_DEFAULT_REGION")

print(f"AWS Access Key ID: {aws_access_key_id}")
print(f"AWS Secret Access Key: {aws_secret_access_key}")
print(f"AWS Session Token: {os.getenv('AWS_SESSION_TOKEN')}")
print(f"AWS Default Region: {region_name}")
print(f"Region: {region_name}")

endpoint_url = f"https://s3.{region_name}.amazonaws.com"
# instantiate boto3 resource for AWS S3 and name bucket

s3 = None

if aws_session_token:
    s3 = boto3.resource(
        "s3",
        region_name=region_name,
        endpoint_url=endpoint_url,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
    )
else:
    s3 = boto3.resource(
        "s3",
        region_name=region_name,
        endpoint_url=endpoint_url,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
    )         

bucket_name = "aws-s3-crud-bucket-test"

In [None]:
# check if bucket exists
# create the bucket if it does not exists
all_my_buckets = [bucket.name for bucket in s3.buckets.all()]
print({"all_my_buckets": all_my_buckets})

if bucket_name not in all_my_buckets:
    bucket = s3.create_bucket(Bucket=bucket_name) 
    print(f"Bucket {bucket_name} created successfully")
else:
    print(f"Bucket {bucket_name} already exists")

In [None]:
# upload a file to the bucket
file_path = "sample.txt"
file_name = os.path.basename(file_path)
s3.Bucket(bucket_name).upload_file(file_path, file_name)
print(f"File {file_name} uploaded to bucket {bucket_name}")

In [None]:
# read and print the contents of the file
obj = s3.Object(bucket_name, file_name)
response = obj.get()
data = response["Body"].read().decode("utf-8")
print(f"Contents of file {file_name}: {data}")

In [None]:
# update content of sample.txt with new content from update-sample.txt
new_content_file_path = "update-sample.txt"
s3.Object(bucket_name, file_name).put(Body=open(new_content_file_path, "rb"))
print(f"Contents of file {file_name} updated successfully")

In [None]:
# read and print the contents of the file
response = obj.get()
data = response["Body"].read().decode("utf-8")
print(f"Contents of file {file_name}: {data}")

In [None]:
# delete the file from the bucket
obj.delete()
print(f"File {file_name} deleted successfully")

In [None]:
# delete the bucket
bucket = s3.Bucket(bucket_name)
bucket.objects.all().delete()
bucket.delete()
print(f"Bucket {bucket_name} deleted successfully")