diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 48ed4b72781f2..92e04eb03f4c2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -35,6 +35,12 @@ import java.util.stream.Stream; import static io.trino.parquet.reader.MetadataReader.createParquetMetadata; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_DATA_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED; import static io.trino.plugin.iceberg.util.ParquetUtil.footerMetrics; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static java.lang.String.format; @@ -106,19 +112,46 @@ public long getMemoryUsage() @Override public void appendRows(Page dataPage) { - parquetFileWriter.appendRows(dataPage); + try { + parquetFileWriter.appendRows(dataPage); + } + catch (TrinoException e) { + if (e.getErrorCode() == HIVE_WRITER_DATA_ERROR.toErrorCode()) { + throw new TrinoException(ICEBERG_WRITER_DATA_ERROR, e); + } + throw e; + } } @Override public Closeable commit() { - return parquetFileWriter.commit(); + try { + return parquetFileWriter.commit(); + } + catch (TrinoException e) { + if (e.getErrorCode() == HIVE_WRITER_CLOSE_ERROR.toErrorCode()) { + throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error committing write parquet to Iceberg", e); + } + else if (e.getErrorCode() == HIVE_WRITE_VALIDATION_FAILED.toErrorCode()) { + throw new TrinoException(ICEBERG_WRITE_VALIDATION_FAILED, e); + } + throw e; + } } @Override public void rollback() { - parquetFileWriter.rollback(); + try { + parquetFileWriter.rollback(); + } + catch (TrinoException e) { + if (e.getErrorCode() == HIVE_WRITER_CLOSE_ERROR.toErrorCode()) { + throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write parquet to Iceberg", e); + } + throw e; + } } @Override