In [1]:
import boto3
from botocore.exceptions import ClientError
import configparser

In [2]:
# Load environment variables from config.ini file
config = configparser.ConfigParser()
config.read('config.env')

access_id = config['DEFAULT']['AccessKeyID']
access_key = config['DEFAULT']['SecretAccessKey']
region = config['DEFAULT']['DefaultRegion']

In [3]:
# To create default session:
def create_aws_session():
    session = boto3.session.Session()
    #it creates the default session and can use to connect with any AWS service
    return session

# To Create customized session:
def create_customized_session(aws_access_key, aws_secret_key, region_name=None,profile_name=None):
    session = boto3.session.Session(aws_access_key_id=aws_access_key,
                                   aws_secret_access_key=aws_secret_key,
                                   region_name=region_name,
                                   profile_name = profile_name)
    # Here, region_name and prof|ile_name are optional parameters and default value is None
    return session

In [4]:
def create_bucket(session, bucket_name, region=None):
    """Create an S3 bucket if it doesn't exist"""
    s3 = session.client('s3')
    try:
        if region:
            s3.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={'LocationConstraint': region}
            )
        else:
            s3.create_bucket(Bucket=bucket_name)
        print(f"Bucket '{bucket_name}' created successfully.")
    except ClientError as e:
        if "BucketAlreadyOwnedByYou" in str(e):
            print(f"Bucket '{bucket_name}' already exists.")
        else:
            print(f"Error: {e}")
            return False
    return True

def upload_file(session, bucket_name, file_name, object_name=None):
    """Upload a file to an S3 bucket"""
    s3 = session.client('s3')
    try:
        if object_name is None:
            object_name = file_name
        s3.upload_file(file_name, bucket_name, object_name)
        print(f"File '{file_name}' uploaded successfully to '{bucket_name}/{object_name}'.")
    except ClientError as e:
        print(f"Error: {e}")
        return False
    return True

In [5]:
if __name__ == '__main__':
    bucket_name = "lam-s-bucket-1910"  # Replace with your bucket name
    file_name = "file.txt"  # Replace with the file path

    # step 0: init session
    my_session = create_customized_session(access_id, access_key, region)
    # Step 1: Create bucket if it doesn't exist
    if create_bucket(my_session, bucket_name, region):
        # Step 2: Upload file to the bucket
        upload_file(my_session, bucket_name, file_name)

Bucket 'lam-s-bucket-1910' already exists.
File 'file.txt' uploaded successfully to 'lam-s-bucket-1910/file.txt'.
