Skip to content

Commit

Permalink
Convert Hive errors in IcebergParquetFileWriter to Iceberg errors
Browse files Browse the repository at this point in the history
  • Loading branch information
takezoe committed May 11, 2024
1 parent f054a9c commit fa6dfb4
Showing 1 changed file with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@
import java.util.stream.Stream;

import static io.trino.parquet.reader.MetadataReader.createParquetMetadata;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_CLOSE_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITER_DATA_ERROR;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_WRITE_VALIDATION_FAILED;
import static io.trino.plugin.iceberg.util.ParquetUtil.footerMetrics;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_DATA_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITE_VALIDATION_FAILED;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -106,19 +112,46 @@ public long getMemoryUsage()
@Override
public void appendRows(Page dataPage)
{
parquetFileWriter.appendRows(dataPage);
try {
parquetFileWriter.appendRows(dataPage);
}
catch (TrinoException e) {
if (e.getErrorCode() == HIVE_WRITER_DATA_ERROR.toErrorCode()) {
throw new TrinoException(ICEBERG_WRITER_DATA_ERROR, e);
}
throw e;
}
}

@Override
public Closeable commit()
{
return parquetFileWriter.commit();
try {
return parquetFileWriter.commit();
}
catch (TrinoException e) {
if (e.getErrorCode() == HIVE_WRITER_CLOSE_ERROR.toErrorCode()) {
throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error committing write parquet to Iceberg", e);
}
else if (e.getErrorCode() == HIVE_WRITE_VALIDATION_FAILED.toErrorCode()) {
throw new TrinoException(ICEBERG_WRITE_VALIDATION_FAILED, e);
}
throw e;
}
}

@Override
public void rollback()
{
parquetFileWriter.rollback();
try {
parquetFileWriter.rollback();
}
catch (TrinoException e) {
if (e.getErrorCode() == HIVE_WRITER_CLOSE_ERROR.toErrorCode()) {
throw new TrinoException(ICEBERG_WRITER_CLOSE_ERROR, "Error rolling back write parquet to Iceberg", e);
}
throw e;
}
}

@Override
Expand Down

0 comments on commit fa6dfb4

Please sign in to comment.