In [1]:
# function to write object and tags
from minio import Minio
from minio.commonconfig import Tags
from io import BytesIO
import os


def write_to_s3(bucket, filepath, tags={}, meta={}):
    # create Minio client and init bucket if necessary
    client = Minio(
        os.getenv("JUPYTERLAB_S3_ENDPOINT").replace("http://", ""),
        secure=False,
        access_key=os.getenv("JUPYTERLAB_S3_ACCESS_KEY_ID"),
        secret_key=os.getenv("JUPYTERLAB_S3_SECRET_ACCESS_KEY"),
    )
    found = client.bucket_exists(bucket)
    if not found:
        print("Bucket does not exist: %s" % (bucket))
        return False  # failure

    # init tags
    t = Tags(for_object=True)
    for k, v in tags.items():
        t[k] = v

    with open(filepath, mode="rb") as source_file:
        contents = source_file.read()

    client.put_object(
        bucket,
        filepath,
        data=BytesIO(contents),
        length=len(contents),
        tags=t,
        metadata=meta,
        content_type='application/csv'
    )
    return True  # success

In [2]:
write_to_s3(
    bucket="XXXX", # Use correct bucket
    filepath="filtest.txt",
    tags={"color": "red", "size": "small"},
    meta={"cat": "meow", "dog": "woof", "horse": "neigh"})

True

In [4]:

def read_from_s3(bucket, filepath):
    """
    Read a file from an S3 bucket.

    :param bucket: The S3 bucket name.
    :param filepath: The path to the file within the bucket.
    :return: The file's contents or None if the file does not exist.
    """
    # Create Minio client
    client = Minio(
        os.getenv("JUPYTERLAB_S3_ENDPOINT").replace("http://", ""),
        secure=False,
        access_key=os.getenv("JUPYTERLAB_S3_ACCESS_KEY_ID"),
        secret_key=os.getenv("JUPYTERLAB_S3_SECRET_ACCESS_KEY"),
    )

    # Check if the bucket exists
    found = client.bucket_exists(bucket)
    if not found:
        print("Bucket does not exist: %s" % (bucket))
        return None  # Failure

    # Check if the object exists
    try:
        obj = client.get_object(bucket, filepath)
        return obj.read()
    except Exception as e:
        print(f"Error reading file {filepath} from bucket {bucket}: {str(e)}")
        return None  # Failure


In [5]:

# Example usage:
bucket_name = "demo-funktionalitet"
file_path_in_s3 = "filtest.txt"
contents = read_from_s3(bucket_name, file_path_in_s3)

if contents:
    print(f"Contents of {file_path_in_s3}:\n")
    print(contents.decode())
else:
    print(f"Failed to read {file_path_in_s3} from {bucket_name}.")

Contents of filtest.txt:

här är innehållet
