Skip to content

Commit

Permalink
fixup! Use Storage write API in BigQuery connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 21, 2023
1 parent ad81a55 commit f38227b
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BigQueryPageSink
implements ConnectorPageSink
{
private final BigQueryWriteClient client;
private final TableName tableName;
private final WriteStream writeStream;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final ConnectorPageSinkId pageSinkId;
Expand All @@ -73,9 +73,13 @@ public BigQueryPageSink(
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null");
checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(),
"temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent");
this.tableName = temporaryTableName
.map(tableName -> TableName.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), tableName))
TableName tableName = temporaryTableName
.map(table -> TableName.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), table))
.orElseGet(remoteTableName::toTableName);
// TODO: Consider using PENDING mode
WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build();
CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder().setParent(tableName.toString()).setWriteStream(stream).build();
this.writeStream = client.createWriteStream(createWriteStreamRequest);
}

@Override
Expand All @@ -97,10 +101,6 @@ public CompletableFuture<?> appendPage(Page page)

private void insertWithCommitted(JSONArray batch)
{
WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build();
CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder().setParent(tableName.toString()).setWriteStream(stream).build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build()) {
ApiFuture<AppendRowsResponse> future = writer.append(batch);
AppendRowsResponse response = future.get(); // Throw error
Expand Down

0 comments on commit f38227b

Please sign in to comment.