Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
xxhZs committed Apr 9, 2024
1 parent 4605ae0 commit 11f2ffb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 36 deletions.
99 changes: 63 additions & 36 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::mem;
use core::time::Duration;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -32,7 +33,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::{
};
use google_cloud_pubsub::client::google_cloud_auth;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use prost_reflect::MessageDescriptor;
use prost_reflect::{FieldDescriptor, MessageDescriptor};
use prost_types::{
field_descriptor_proto, DescriptorProto, FieldDescriptorProto, FileDescriptorProto,
FileDescriptorSet,
Expand All @@ -42,7 +43,7 @@ use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use risingwave_common::types::DataType;
use serde_derive::Deserialize;
use serde_with::serde_as;
use serde_with::{serde_as, DisplayFromStr};
use url::Url;
use uuid::Uuid;
use with_options::WithOptions;
Expand Down Expand Up @@ -77,6 +78,9 @@ pub struct BigQueryCommon {
pub dataset: String,
#[serde(rename = "bigquery.table")]
pub table: String,
#[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")]
#[serde_as(as = "DisplayFromStr")]
pub max_batch_rows: usize,
}

fn default_max_batch_rows() -> usize {
Expand Down Expand Up @@ -311,6 +315,9 @@ pub struct BigQuerySinkWriter {
writer_pb_schema: ProtoSchema,
message_descriptor: MessageDescriptor,
write_stream: String,
proto_field: Option<FieldDescriptor>,
write_rows: Vec<AppendRowsRequestRows>,
write_rows_count: usize,
}

impl TryFrom<SinkParam> for BigQuerySink {
Expand Down Expand Up @@ -366,6 +373,16 @@ impl BigQuerySinkWriter {
&config.common.table
))
})?;
let proto_field = if !is_append_only {
let proto_field = message_descriptor
.get_field_by_name(CHANGE_TYPE)
.ok_or_else(|| {
SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
})?;
Some(proto_field)
} else {
None
};
let row_encoder = ProtoEncoder::new(
schema.clone(),
None,
Expand All @@ -384,74 +401,71 @@ impl BigQuerySinkWriter {
is_append_only,
row_encoder,
message_descriptor,
proto_field,
writer_pb_schema: ProtoSchema {
proto_descriptor: Some(descriptor_proto),
},
write_rows: vec![],
write_rows_count: 0,
})
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}

serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
}
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
self.client
.append_rows(vec![rows], self.write_stream.clone())
.await?;
Ok(())
Ok(serialized_rows)
}

async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> {
fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
for (op, row) in chunk.rows() {
if op == Op::UpdateDelete {
continue;
}
let mut pb_row = self.row_encoder.encode(row)?;
let proto_field = self
.message_descriptor
.get_field_by_name(CHANGE_TYPE)
.ok_or_else(|| {
SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
})?;
match op {
Op::Insert => pb_row
.message
.try_set_field(
&proto_field,
self.proto_field.as_ref().unwrap(),
prost_reflect::Value::String("INSERT".to_string()),
)
.map_err(|e| SinkError::BigQuery(e.into()))?,
Op::Delete => pb_row
.message
.try_set_field(
&proto_field,
self.proto_field.as_ref().unwrap(),
prost_reflect::Value::String("DELETE".to_string()),
)
.map_err(|e| SinkError::BigQuery(e.into()))?,
Op::UpdateDelete => continue,
Op::UpdateInsert => pb_row
.message
.try_set_field(
&proto_field,
self.proto_field.as_ref().unwrap(),
prost_reflect::Value::String("UPSERT".to_string()),
)
.map_err(|e| SinkError::BigQuery(e.into()))?,
};

serialized_rows.push(pb_row.ser_to()?)
}
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
Ok(serialized_rows)
}

async fn write_rows(&mut self) -> Result<()> {
if self.write_rows.is_empty() {
return Ok(());
}
let rows = mem::take(&mut self.write_rows);
self.write_rows_count = 0;
self.client
.append_rows(vec![rows], self.write_stream.clone())
.append_rows(rows, self.write_stream.clone())
.await?;
Ok(())
}
Expand All @@ -460,14 +474,27 @@ impl BigQuerySinkWriter {
#[async_trait]
impl SinkWriter for BigQuerySinkWriter {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
if self.is_append_only {
self.append_only(chunk).await
let serialized_rows = if self.is_append_only {
self.append_only(chunk)?
} else {
self.upsert(chunk).await
self.upsert(chunk)?
};
self.write_rows_count += serialized_rows.len();
let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
writer_schema: Some(self.writer_pb_schema.clone()),
rows: Some(ProtoRows { serialized_rows }),
});
self.write_rows.push(rows);

if self.write_rows_count >= self.config.common.max_batch_rows {
self.write_rows().await?;
}

Ok(())
}

async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
self.write_rows().await?;
Ok(())
}

Expand Down Expand Up @@ -521,27 +548,27 @@ impl StorageWriterClient {
rows: Vec<AppendRowsRequestRows>,
write_stream: String,
) -> Result<()> {
let trace_id = Uuid::new_v4().hyphenated().to_string();
let append_req: Vec<AppendRowsRequest> = rows
.into_iter()
.map(|row| AppendRowsRequest {
write_stream: write_stream.clone(),
offset: None,
trace_id: trace_id.clone(),
trace_id: Uuid::new_v4().hyphenated().to_string(),
missing_value_interpretations: HashMap::default(),
rows: Some(row),
})
.collect();
let resp = self
let mut resp = self
.client
.append_rows(Request::new(tokio_stream::iter(append_req)))
.await
.map_err(|e| SinkError::BigQuery(e.into()))?
.into_inner()
.into_inner();
while let Some(i) = resp
.message()
.await
.map_err(|e| SinkError::BigQuery(e.into()))?;
if let Some(i) = resp {
.map_err(|e| SinkError::BigQuery(e.into()))?
{
if !i.row_errors.is_empty() {
return Err(SinkError::BigQuery(anyhow::anyhow!(
"Insert error {:?}",
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ BigQueryConfig:
- name: bigquery.table
field_type: String
required: true
- name: bigquery.max_batch_rows
field_type: usize
required: false
default: '1024'
- name: region
field_type: String
required: false
Expand Down

0 comments on commit 11f2ffb

Please sign in to comment.