In [11]:
import boto3
from botocore.exceptions import ClientError
import os
from pyspark.sql import SparkSession
import pyspark 

In [12]:

print(f"Using PySpark version: {pyspark.__version__}") # Should print 3.1.3

spark = SparkSession.builder \
    .appName("MinioS3AAutoConfTest") \
    .master("spark://jupyter-spark-master:7077") \
    .getOrCreate()

Using PySpark version: 3.1.3


In [13]:

minio_endpoint_url = os.getenv('MINIO_ENDPOINT_URL', 'http://minio:9000')
# Replace fallback values ONLY if not using environment variables
minio_access_key = os.getenv('MINIO_ROOT_USER', 'YOUR_MINIO_ACCESS_KEY')
minio_secret_key = os.getenv('MINIO_ROOT_PASSWORD', 'YOUR_MINIO_SECRET_KEY')
# Use consistent bucket naming
bucket_name = os.getenv('MINIO_DEFAULT_BUCKET', 'your-bucket-name')

In [14]:
print(f"Configuring S3 client for endpoint: {minio_endpoint_url}")
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint_url,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key,
)

# --- Check and Create Bucket ---
print(f"Ensuring bucket '{bucket_name}' exists...")
try:
    # Check if bucket exists. head_bucket throws an exception if it doesn't exist.
    s3_client.head_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
    # Check if the error is specifically a "Not Found" or "NoSuchBucket" error
    error_code = e.response.get('Error', {}).get('Code')
    # MinIO might return 404 or NoSuchBucket depending on configuration/version
    if error_code == '404' or 'NoSuchBucket' in str(e):
        print(f"Bucket '{bucket_name}' does not exist. Attempting to create...")
        try:
            # Create the bucket
            s3_client.create_bucket(Bucket=bucket_name)
            print(f"Bucket '{bucket_name}' created successfully.")
        except ClientError as creation_error:
            print(f"Error creating bucket '{bucket_name}': {creation_error}")
            # Decide if you want to stop execution if bucket creation fails
            raise creation_error
    else:
        # Handle other potential errors (permissions, network issues, etc.)
        print(f"Error checking bucket status for '{bucket_name}': {e}")
        raise e

print("Bucket check/creation process finished.")

Configuring S3 client for endpoint: http://minio:9000
Ensuring bucket 'test-bucket' exists...
Bucket 'test-bucket' already exists.
Bucket check/creation process finished.


In [15]:
# 1. Create Sample Data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)

print("Sample DataFrame created:")
df.show()

Sample DataFrame created:
+-------+---+
|   name| id|
+-------+---+
|  Alice|  1|
|    Bob|  2|
|Charlie|  3|
+-------+---+



In [16]:
# 2. Define S3A Path
output_path = f"s3a://{bucket_name}/test-data"

In [17]:
# 3. Write DataFrame to MinIO
print(f"Attempting to write DataFrame to: {output_path}")
try:
    df.write.mode("overwrite").parquet(output_path)
    print("Successfully wrote data to MinIO.")
except Exception as e:
    print(f"Error writing to MinIO: {e}")
    # Print stack trace for more details if needed
    import traceback
    traceback.print_exc()
    spark.stop()
    raise e # Re-raise exception to stop the cell execution clearly

Attempting to write DataFrame to: s3a://test-bucket/test-data


[Stage 2:>                                                          (0 + 4) / 4]

Successfully wrote data to MinIO.


                                                                                

In [18]:
# 4. Read Data back from MinIO
print(f"Attempting to read data back from: {output_path}")
try:
    df_read = spark.read.parquet(output_path)
    print("Successfully read data back from MinIO:")
    df_read.show()
except Exception as e:
    print(f"Error reading from MinIO: {e}")



Attempting to read data back from: s3a://test-bucket/test-data
Successfully read data back from MinIO:
+-------+---+
|   name| id|
+-------+---+
|Charlie|  3|
|  Alice|  1|
|    Bob|  2|
+-------+---+



In [19]:
# 5. Stop SparkSession
spark.stop()
print("Test finished.")

Test finished.
