In [10]:
import boto3
from botocore.exceptions import ClientError
import os
from pyspark.sql import SparkSession
import pyspark 

In [16]:
print(f"Using PySpark version: {pyspark.__version__}") # Should print 3.1.3

spark = SparkSession.builder \
        .appName("MyS3App") \
        .config("spark.local.dir", "/opt/workspace/spark-tmp") \
        .config("spark.hadoop.fs.s3a.buffer.dir", "/opt/workspace/s3a-buffer") \
        .getOrCreate()

Using PySpark version: 3.1.3


25/05/25 18:22:53 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [8]:
minio_endpoint_url = os.getenv('MINIO_ENDPOINT_URL', 'http://minio:9000')
# Replace fallback values ONLY if not using environment variables
minio_access_key = os.getenv('MINIO_ROOT_USER', 'YOUR_MINIO_ACCESS_KEY')
minio_secret_key = os.getenv('MINIO_ROOT_PASSWORD', 'YOUR_MINIO_SECRET_KEY')
# Use consistent bucket naming
bucket_name = os.getenv('MINIO_DEFAULT_BUCKET', 'your-bucket-name')

In [9]:
print(f"Configuring S3 client for endpoint: {minio_endpoint_url}")
s3_client = boto3.client(
    's3',
    endpoint_url=minio_endpoint_url,
    aws_access_key_id=minio_access_key,
    aws_secret_access_key=minio_secret_key,
)

# --- Check and Create Bucket ---
print(f"Ensuring bucket '{bucket_name}' exists...")
try:
    # Check if bucket exists. head_bucket throws an exception if it doesn't exist.
    s3_client.head_bucket(Bucket=bucket_name)
    print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
    # Check if the error is specifically a "Not Found" or "NoSuchBucket" error
    error_code = e.response.get('Error', {}).get('Code')
    # MinIO might return 404 or NoSuchBucket depending on configuration/version
    if error_code == '404' or 'NoSuchBucket' in str(e):
        print(f"Bucket '{bucket_name}' does not exist. Attempting to create...")
        try:
            # Create the bucket
            s3_client.create_bucket(Bucket=bucket_name)
            print(f"Bucket '{bucket_name}' created successfully.")
        except ClientError as creation_error:
            print(f"Error creating bucket '{bucket_name}': {creation_error}")
            # Decide if you want to stop execution if bucket creation fails
            raise creation_error
    else:
        # Handle other potential errors (permissions, network issues, etc.)
        print(f"Error checking bucket status for '{bucket_name}': {e}")
        raise e

print("Bucket check/creation process finished.")

Configuring S3 client for endpoint: http://minio:9000
Ensuring bucket 'test-bucket' exists...
Bucket 'test-bucket' already exists.
Bucket check/creation process finished.


In [11]:
# 1. Create Sample Data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)

print("Sample DataFrame created:")
df.show()

Sample DataFrame created:
+-------+---+
|   name| id|
+-------+---+
|  Alice|  1|
|    Bob|  2|
|Charlie|  3|
+-------+---+



In [12]:
# 2. Define S3A Path
output_path = f"s3a://{bucket_name}/test-data"

In [13]:
# 3. Write DataFrame to MinIO
print(f"Attempting to write DataFrame to: {output_path}")
try:
    df.write.mode("overwrite").parquet(output_path)
    print("Successfully wrote data to MinIO.")
except Exception as e:
    print(f"Error writing to MinIO: {e}")
    # Print stack trace for more details if needed
    import traceback
    traceback.print_exc()
    spark.stop()
    raise e # Re-raise exception to stop the cell execution clearly

Attempting to write DataFrame to: s3a://test-bucket/test-data


25/05/25 18:22:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Successfully wrote data to MinIO.


In [14]:
# 4. Read Data back from MinIO
print(f"Attempting to read data back from: {output_path}")
try:
    df_read = spark.read.parquet(output_path)
    print("Successfully read data back from MinIO:")
    df_read.show()
except Exception as e:
    print(f"Error reading from MinIO: {e}")



Attempting to read data back from: s3a://test-bucket/test-data
Successfully read data back from MinIO:
+-------+---+
|   name| id|
+-------+---+
|Charlie|  3|
|  Alice|  1|
|    Bob|  2|
+-------+---+



In [15]:
# 5. Stop SparkSession
spark.stop()
print("Test finished.")

Test finished.


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

# --- Spark Session ---
print("Creating SparkSession...")
spark = SparkSession.builder \
    .appName("SparkSQL_InMemory_Test") \
    .master("spark://jupyter-spark-master:7077") \
    .getOrCreate()

print("SparkSession created.")

# --- Create In-Memory Data ---
# Sample data representing employees
data = [
    (1001, "Alice", "Smith", date(2020, 1, 15)),
    (1002, "Bob", "Johnson", date(2019, 5, 20)),
    (1003, "Charlie", "Williams", date(2021, 8, 1)),
    (1004, "David", "Brown", date(2020, 1, 10)),
    (1005, "Eve", "Davis", date(2022, 3, 25))
]

# Define the schema for the DataFrame
schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("hire_date", DateType(), True)
])

# --- Create DataFrame ---
print("Creating DataFrame from in-memory data...")
employee_df = spark.createDataFrame(data=data, schema=schema)

print("Original DataFrame:")
employee_df.show()
employee_df.printSchema()

# --- Register DataFrame as SQL View ---
view_name = "employees_mem_view"
employee_df.createOrReplaceTempView(view_name)
print(f"DataFrame registered as temporary view: '{view_name}'")

# --- Run Spark SQL Query ---
# Example: Select employees hired in 2020
print("Running Spark SQL query...")
sql_query = f"""
SELECT emp_id, first_name, last_name
FROM {view_name}
WHERE year(hire_date) = 2020
ORDER BY emp_id
"""
result_df = spark.sql(sql_query)

print("Query results (Employees hired in 2020):")
result_df.show()

# --- Stop SparkSession ---
spark.stop()
print("\nSpark SQL in-memory test finished.")

Creating SparkSession...


25/04/28 17:59:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


SparkSession created.
Creating DataFrame from in-memory data...
Original DataFrame:


                                                                                

+------+----------+---------+----------+
|emp_id|first_name|last_name| hire_date|
+------+----------+---------+----------+
|  1001|     Alice|    Smith|2020-01-15|
|  1002|       Bob|  Johnson|2019-05-20|
|  1003|   Charlie| Williams|2021-08-01|
|  1004|     David|    Brown|2020-01-10|
|  1005|       Eve|    Davis|2022-03-25|
+------+----------+---------+----------+

root
 |-- emp_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- hire_date: date (nullable = true)

DataFrame registered as temporary view: 'employees_mem_view'
Running Spark SQL query...
Query results (Employees hired in 2020):
+------+----------+---------+
|emp_id|first_name|last_name|
+------+----------+---------+
|  1001|     Alice|    Smith|
|  1004|     David|    Brown|
+------+----------+---------+


Spark SQL in-memory test finished.


In [None]:
Error writing to MinIO: An error occurred while calling o85.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 4 times, most recent failure: Lost task 1.3 in stage 2.0 (TID 15) (172.19.0.6 executor 0): org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:463)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
	at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 33 more
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for s3ablock-0001-
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:463)
	at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:477)
	at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:213)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:575)
	at org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811)
	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190)
	at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:781)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:149)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:111)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)