Describe the bug
I'm trying to setup a local environment with minio, unity_catalog and spark to read and write delta tables.
I can get it to work with Polars but I'm unable to with pyspark.
I'm using spark installed manually on MacOs, version 4.0.1
My pyproject has the following dependencies:
dependencies = [
"delta-spark==4.0.0",
"ibis-framework[duckdb,polars,pyspark]>=10.5.0",
"minio>=7.2.15",
"pyiceberg[s3fs,sql-postgres,sql-sqlite,pyiceberg-core]>=0.9.1",
"polars>=1.37.0",
"deltalake>=1.3.1",
"loguru>=0.7.3",
"pydantic-settings>=2.9.1",
"duckdb>=1.4.3",
"setuptools>=80.9.0",
"marimo>=0.17.7",
"pysail==0.4.6",
"pyspark[connect,sql]==4.0.0",
"psycopg2-binary>=2.9.11",
]
Unity catalog is spin up from a locally cloned repo, on branch main running docker compose up --build -d
Here's the server.properties
## S3 Storage Config (Multiple configs can be added by incrementing the index)
s3.bucketPath.0=s3://warehouse
s3.region.0=
s3.awsRoleArn.0=
# Optional (If blank, it will use DefaultCredentialsProviderChain)
s3.accessKey.0=minioadmin
s3.secretKey.0=miniopassword
# Test Only (If you provide a session token, it will just use those session creds, no downscoping)
s3.sessionToken.0=
Here's the hibernate.properties. It's pointing to a postgres db running on another compose with Minio.
hibernate.connection.driver_class=org.postgresql.Driver
hibernate.connection.url=jdbc:postgresql://postgres_db:5432/catalog_db
hibernate.connection.user=user
hibernate.connection.password=password
To Reproduce
Here's a MRE:
from datetime import UTC, datetime
import polars as pl
from pyspark.sql import DataFrame, SparkSession
catalog_name = "unity"
spark = (
SparkSession.builder
.appName("s3-uc-test")
.master("local[*]")
# Packages - need to be set via spark.jars.packages
.config("spark.jars.packages", ",".join([
"org.apache.hadoop:hadoop-aws:3.4.0",
"io.delta:delta-spark_2.13:4.0.0",
"io.unitycatalog:unitycatalog-spark_2.13:0.3.0"
]))
# Delta extension
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
# Map s3:// to s3a filesystem
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# spark_catalog config
.config("spark.sql.catalog.spark_catalog", "io.unitycatalog.spark.UCSingleCatalog")
# Unity Catalog config
.config(f"spark.sql.catalog.{catalog_name}", "io.unitycatalog.spark.UCSingleCatalog")
.config(f"spark.sql.catalog.{catalog_name}.uri", "http://localhost:8080")
.config(f"spark.sql.catalog.{catalog_name}.token", "")
.config(f"spark.sql.catalog.{catalog_name}.renewCredential.enabled", "true")
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.sql.defaultCatalog", catalog_name)
.getOrCreate()
)
spark.sql("CREATE SCHEMA IF NOT EXISTS demo")
spark.catalog.setCurrentDatabase("demo")
data = pl.DataFrame(
{
"datetime": [
datetime(2023, 1, 1, 12, 0, tzinfo=UTC),
datetime(2023, 1, 2, 12, 0, tzinfo=UTC),
datetime(2023, 1, 3, 12, 0, tzinfo=UTC),
],
"symbol": ["AAPL", "GOOGL", "MSFT"],
"bid": [150.0, 2800.0, 300.0],
"ask": [151.0, 2805.0, 305.0],
"details": [
{"created_by": "user1"},
{"created_by": "user2"},
{"created_by": None},
],
},
)
spark_df: DataFrame = spark.createDataFrame(data.to_pandas())
spark_df.write.format("delta").mode("overwrite").option("path", "s3://warehouse/prova").saveAsTable("unity.demo.test_table")
Expected behavior
Here's the stacktrace I get:
Py4JJavaError: An error occurred while calling o91.saveAsTable.
: io.unitycatalog.client.ApiException: generateTemporaryPathCredentials call failed with: 400 - {"error_code":"FAILED_PRECONDITION","details":[{"reason":"FAILED_PRECONDITION","metadata":{},"@type":"google.rpc.ErrorInfo"}],"stack_trace":null,"message":"S3 bucket configuration not found."}
at io.unitycatalog.client.api.TemporaryCredentialsApi.getApiException(TemporaryCredentialsApi.java:78)
at io.unitycatalog.client.api.TemporaryCredentialsApi.generateTemporaryPathCredentialsWithHttpInfo(TemporaryCredentialsApi.java:192)
at io.unitycatalog.client.api.TemporaryCredentialsApi.generateTemporaryPathCredentials(TemporaryCredentialsApi.java:170)
at io.unitycatalog.spark.UCSingleCatalog.createTable(UCSingleCatalog.scala:113)
at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:172)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
at scala.util.Try$.apply(Try.scala:217)
at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
at org.apache.spark.sql.classic.DataFrameWriter.saveAsTable(DataFrameWriter.scala:514)
at org.apache.spark.sql.classic.DataFrameWriter.saveAsTable(DataFrameWriter.scala:442)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
at java.base/java.lang.Thread.run(Thread.java:1583)
Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
at io.unitycatalog.client.api.TemporaryCredentialsApi.getApiException(TemporaryCredentialsApi.java:78)
at io.unitycatalog.client.api.TemporaryCredentialsApi.generateTemporaryPathCredentialsWithHttpInfo(TemporaryCredentialsApi.java:192)
at io.unitycatalog.client.api.TemporaryCredentialsApi.generateTemporaryPathCredentials(TemporaryCredentialsApi.java:170)
at io.unitycatalog.spark.UCSingleCatalog.createTable(UCSingleCatalog.scala:113)
at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:172)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$2(QueryExecution.scala:155)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:155)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:654)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$eagerlyExecute$1(QueryExecution.scala:154)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:169)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$3.applyOrElse(QueryExecution.scala:164)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:360)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:356)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:37)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:446)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyCommandExecuted$1(QueryExecution.scala:126)
at scala.util.Try$.apply(Try.scala:217)
at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1378)
at org.apache.spark.util.LazyTry.tryT$lzycompute(LazyTry.scala:46)
at org.apache.spark.util.LazyTry.tryT(LazyTry.scala:46)
... 18 more
System [please complete the following information]:
MacOs Tahoe.
Additional context
Describe the bug
I'm trying to setup a local environment with minio, unity_catalog and spark to read and write delta tables.
I can get it to work with Polars but I'm unable to with pyspark.
I'm using spark installed manually on MacOs, version 4.0.1
My pyproject has the following dependencies:
Unity catalog is spin up from a locally cloned repo, on branch main running
docker compose up --build -dHere's the
server.propertiesHere's the
hibernate.properties. It's pointing to a postgres db running on another compose with Minio.To Reproduce
Here's a MRE:
Expected behavior
Here's the stacktrace I get:
System [please complete the following information]:
MacOs Tahoe.
Additional context