In [1]:
import os
import boto3
import logging
from botocore.exceptions import NoCredentialsError, PartialCredentialsError

In [2]:
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_S3_ENDPOINT = os.getenv("AWS_S3_ENDPOINT")
AWS_VERIFY_SSL = False

In [11]:
file_name = "test_file.txt"
file_content = "Hello this is a test file for S3 read and write operations"
s3_key = file_name

In [12]:
with open(file_name, 'w') as f:
    f.write(file_content)

In [13]:
s3_client = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    endpoint_url=AWS_S3_ENDPOINT,
    verify=AWS_VERIFY_SSL
)

In [14]:
def upload_file():
    try:
        s3_client.upload_file(file_name, AWS_S3_BUCKET, s3_key)
        print(f"Successfully uploaded {file_name} to {AWS_S3_BUCKET}/{s3_key}.")
    except (NoCredentialsError, PartialCredentialsError) as e:
        logging.error("AWS Credentials not found or incomplete.", exc_info=True)
    except Exception as e:
        logging.error("Error uploading file", exc_info=True)

In [15]:
def read_file():
    try:
        response = s3_client.get_object(Bucket=AWS_S3_BUCKET, Key=s3_key)
        content = response['Body'].read().decode("utf-8")
        print(f"Read from {AWS_S3_BUCKET}/{s3_key}: {content}")
    except s3_client.exceptions.NoSuchKey:
        logging.error("The object does not exist.", exc_info=True)
    except Exception as e:
        logging.error("Error reading file", exc_info=True)

In [16]:
upload_file()



Successfully uploaded test_file.txt to parko-wb-bucket/test_file.txt.


In [17]:
read_file()



Read from parko-wb-bucket/test_file.txt: Hello this is a test file for S3 read and write operations
