Skip to content

Commit b4618cc

Browse files
authored
feat(bigquery): Handle arrays with null values (#251)
1 parent 037592a commit b4618cc

File tree

12 files changed

+583
-233
lines changed

12 files changed

+583
-233
lines changed

etl-destinations/src/bigquery/client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,11 +210,12 @@ impl BigQueryClient {
210210
table_descriptor: &TableDescriptor,
211211
table_rows: Vec<TableRow>,
212212
) -> EtlResult<()> {
213-
// We have to map table rows into the new type due to the limitations of how Rust works.
213+
// We map the table rows into `BigQueryTableRow`s instances, so that we also do a validation
214+
// of the cells.
214215
let table_rows = table_rows
215216
.into_iter()
216-
.map(BigQueryTableRow)
217-
.collect::<Vec<_>>();
217+
.map(BigQueryTableRow::try_from)
218+
.collect::<EtlResult<Vec<_>>>()?;
218219

219220
// We create a slice on table rows, which will be updated while the streaming progresses.
220221
//

etl-destinations/src/bigquery/encoding.rs

Lines changed: 208 additions & 223 deletions
Large diffs are not rendered by default.

etl-destinations/tests/integration/bigquery_test.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
22
use etl::config::BatchConfig;
3+
use etl::error::ErrorKind;
34
use etl::state::table::TableReplicationPhaseType;
45
use etl::test_utils::database::{spawn_source_database, test_table_name};
56
use etl::test_utils::notify::NotifyingStore;
@@ -11,6 +12,8 @@ use etl_destinations::bigquery::install_crypto_provider_for_bigquery;
1112
use etl_telemetry::init_test_tracing;
1213
use rand::random;
1314
use std::str::FromStr;
15+
use std::time::Duration;
16+
use tokio::time::sleep;
1417

1518
use crate::common::bigquery::{
1619
BigQueryOrder, BigQueryUser, NonNullableColsScalar, NullableColsArray, NullableColsScalar,
@@ -1474,3 +1477,151 @@ async fn table_non_nullable_array_columns() {
14741477

14751478
pipeline.shutdown_and_wait().await.unwrap();
14761479
}
1480+
1481+
#[tokio::test(flavor = "multi_thread")]
1482+
async fn table_array_with_null_values() {
1483+
init_test_tracing();
1484+
install_crypto_provider_for_bigquery();
1485+
1486+
let database = spawn_source_database().await;
1487+
let bigquery_database = setup_bigquery_connection().await;
1488+
let table_name = test_table_name("array_with_nulls");
1489+
let table_id = database
1490+
.create_table(table_name.clone(), true, &[("int_array", "int4[]")])
1491+
.await
1492+
.unwrap();
1493+
1494+
let store = NotifyingStore::new();
1495+
let raw_destination = bigquery_database.build_destination(store.clone()).await;
1496+
let destination = TestDestinationWrapper::wrap(raw_destination);
1497+
1498+
let publication_name = "test_pub_array_nulls".to_string();
1499+
database
1500+
.create_publication(&publication_name, std::slice::from_ref(&table_name))
1501+
.await
1502+
.expect("Failed to create publication");
1503+
1504+
let pipeline_id: PipelineId = random();
1505+
let mut pipeline = create_pipeline(
1506+
&database.config,
1507+
pipeline_id,
1508+
publication_name.clone(),
1509+
store.clone(),
1510+
destination.clone(),
1511+
);
1512+
1513+
let table_sync_done_notification = store
1514+
.notify_on_table_state(table_id, TableReplicationPhaseType::SyncDone)
1515+
.await;
1516+
1517+
pipeline.start().await.unwrap();
1518+
1519+
table_sync_done_notification.notified().await;
1520+
1521+
// Insert array with null value
1522+
database
1523+
.client
1524+
.as_ref()
1525+
.unwrap()
1526+
.execute(
1527+
&format!(
1528+
"INSERT INTO {} (int_array) VALUES (ARRAY[1, NULL])",
1529+
table_name.as_quoted_identifier()
1530+
),
1531+
&[],
1532+
)
1533+
.await
1534+
.unwrap();
1535+
1536+
// We sleep to wait for the event to be processed. This is not ideal, but if we wanted to do
1537+
// this better, we would have to also implement error handling within the apply worker to write
1538+
// in the state store.
1539+
sleep(Duration::from_secs(1)).await;
1540+
1541+
// Wait for the pipeline expecting an error to be returned.
1542+
let err = pipeline.shutdown_and_wait().await.err().unwrap();
1543+
assert_eq!(err.kinds().len(), 1);
1544+
assert_eq!(err.kinds()[0], ErrorKind::NullValuesNotSupportedInArray);
1545+
1546+
// Reset and try with valid array (no nulls)
1547+
database
1548+
.client
1549+
.as_ref()
1550+
.unwrap()
1551+
.execute(
1552+
&format!(
1553+
"DELETE FROM {} WHERE true",
1554+
table_name.as_quoted_identifier()
1555+
),
1556+
&[],
1557+
)
1558+
.await
1559+
.unwrap();
1560+
1561+
// We have to reset the state of the table and copy it from scratch, otherwise the CDC will contain
1562+
// the inserts and deletes, failing again.
1563+
store.reset_table_state(table_id).await.unwrap();
1564+
// We also clear the events so that it's more idiomatic to wait for them, since we don't have
1565+
// the insert of before.
1566+
destination.clear_events().await;
1567+
1568+
// We recreate the pipeline and try again.
1569+
let mut pipeline = create_pipeline(
1570+
&database.config,
1571+
pipeline_id,
1572+
publication_name,
1573+
store.clone(),
1574+
destination.clone(),
1575+
);
1576+
1577+
let table_sync_done_notification = store
1578+
.notify_on_table_state(table_id, TableReplicationPhaseType::SyncDone)
1579+
.await;
1580+
1581+
pipeline.start().await.unwrap();
1582+
1583+
table_sync_done_notification.notified().await;
1584+
1585+
let event_notify = destination
1586+
.wait_for_events_count(vec![(EventType::Insert, 1)])
1587+
.await;
1588+
1589+
// Insert array without null values
1590+
database
1591+
.client
1592+
.as_ref()
1593+
.unwrap()
1594+
.execute(
1595+
&format!(
1596+
"INSERT INTO {} (int_array) VALUES (ARRAY[1, 2, 3])",
1597+
table_name.as_quoted_identifier()
1598+
),
1599+
&[],
1600+
)
1601+
.await
1602+
.unwrap();
1603+
1604+
event_notify.notified().await;
1605+
1606+
pipeline.shutdown_and_wait().await.unwrap();
1607+
1608+
// Verify the valid array was successfully replicated to BigQuery
1609+
let table_rows = bigquery_database
1610+
.query_table(table_name.clone())
1611+
.await
1612+
.unwrap();
1613+
1614+
// Check that there is only the valid row in BigQuery
1615+
assert_eq!(table_rows.len(), 1);
1616+
1617+
// Check that the int array contains 3 elements, meaning it must be the second insert with all
1618+
// NON-NULL values
1619+
let row = &table_rows[0];
1620+
if let Some(columns) = &row.columns {
1621+
assert_eq!(columns.len(), 2);
1622+
assert_eq!(
1623+
columns[1].value.clone().unwrap().as_array().unwrap().len(),
1624+
3
1625+
);
1626+
}
1627+
}

etl/src/conversions/mod.rs

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ use numeric::PgNumeric;
33
use std::fmt::Debug;
44
use uuid::Uuid;
55

6+
use crate::bail;
7+
use crate::error::ErrorKind;
8+
use crate::error::EtlError;
9+
610
pub mod bool;
711
pub mod event;
812
pub mod hex;
@@ -103,3 +107,172 @@ impl ArrayCell {
103107
}
104108
}
105109
}
110+
111+
#[derive(Debug, Clone, PartialEq)]
112+
pub enum CellNonOptional {
113+
Null,
114+
Bool(bool),
115+
String(String),
116+
I16(i16),
117+
I32(i32),
118+
U32(u32),
119+
I64(i64),
120+
F32(f32),
121+
F64(f64),
122+
Numeric(PgNumeric),
123+
Date(NaiveDate),
124+
Time(NaiveTime),
125+
TimeStamp(NaiveDateTime),
126+
TimeStampTz(DateTime<Utc>),
127+
Uuid(Uuid),
128+
Json(serde_json::Value),
129+
Bytes(Vec<u8>),
130+
Array(ArrayCellNonOptional),
131+
}
132+
133+
impl TryFrom<Cell> for CellNonOptional {
134+
type Error = EtlError;
135+
136+
fn try_from(cell: Cell) -> Result<Self, Self::Error> {
137+
match cell {
138+
Cell::Null => Ok(CellNonOptional::Null),
139+
Cell::Bool(val) => Ok(CellNonOptional::Bool(val)),
140+
Cell::String(val) => Ok(CellNonOptional::String(val)),
141+
Cell::I16(val) => Ok(CellNonOptional::I16(val)),
142+
Cell::I32(val) => Ok(CellNonOptional::I32(val)),
143+
Cell::U32(val) => Ok(CellNonOptional::U32(val)),
144+
Cell::I64(val) => Ok(CellNonOptional::I64(val)),
145+
Cell::F32(val) => Ok(CellNonOptional::F32(val)),
146+
Cell::F64(val) => Ok(CellNonOptional::F64(val)),
147+
Cell::Numeric(val) => Ok(CellNonOptional::Numeric(val)),
148+
Cell::Date(val) => Ok(CellNonOptional::Date(val)),
149+
Cell::Time(val) => Ok(CellNonOptional::Time(val)),
150+
Cell::TimeStamp(val) => Ok(CellNonOptional::TimeStamp(val)),
151+
Cell::TimeStampTz(val) => Ok(CellNonOptional::TimeStampTz(val)),
152+
Cell::Uuid(val) => Ok(CellNonOptional::Uuid(val)),
153+
Cell::Json(val) => Ok(CellNonOptional::Json(val)),
154+
Cell::Bytes(val) => Ok(CellNonOptional::Bytes(val)),
155+
Cell::Array(array_cell) => {
156+
let array_cell_non_optional = ArrayCellNonOptional::try_from(array_cell)?;
157+
Ok(CellNonOptional::Array(array_cell_non_optional))
158+
}
159+
}
160+
}
161+
}
162+
163+
impl CellNonOptional {
164+
pub fn clear(&mut self) {
165+
match self {
166+
CellNonOptional::Null => {}
167+
CellNonOptional::Bool(b) => *b = false,
168+
CellNonOptional::String(s) => s.clear(),
169+
CellNonOptional::I16(i) => *i = 0,
170+
CellNonOptional::I32(i) => *i = 0,
171+
CellNonOptional::I64(i) => *i = 0,
172+
CellNonOptional::F32(i) => *i = 0.,
173+
CellNonOptional::F64(i) => *i = 0.,
174+
CellNonOptional::Numeric(n) => *n = PgNumeric::default(),
175+
CellNonOptional::Date(t) => *t = NaiveDate::default(),
176+
CellNonOptional::Time(t) => *t = NaiveTime::default(),
177+
CellNonOptional::TimeStamp(t) => *t = NaiveDateTime::default(),
178+
CellNonOptional::TimeStampTz(t) => *t = DateTime::<Utc>::default(),
179+
CellNonOptional::Uuid(u) => *u = Uuid::default(),
180+
CellNonOptional::Json(j) => *j = serde_json::Value::default(),
181+
CellNonOptional::U32(u) => *u = 0,
182+
CellNonOptional::Bytes(b) => b.clear(),
183+
CellNonOptional::Array(vec) => {
184+
vec.clear();
185+
}
186+
}
187+
}
188+
}
189+
190+
#[derive(Debug, Clone, PartialEq)]
191+
pub enum ArrayCellNonOptional {
192+
Null,
193+
Bool(Vec<bool>),
194+
String(Vec<String>),
195+
I16(Vec<i16>),
196+
I32(Vec<i32>),
197+
U32(Vec<u32>),
198+
I64(Vec<i64>),
199+
F32(Vec<f32>),
200+
F64(Vec<f64>),
201+
Numeric(Vec<PgNumeric>),
202+
Date(Vec<NaiveDate>),
203+
Time(Vec<NaiveTime>),
204+
TimeStamp(Vec<NaiveDateTime>),
205+
TimeStampTz(Vec<DateTime<Utc>>),
206+
Uuid(Vec<Uuid>),
207+
Json(Vec<serde_json::Value>),
208+
Bytes(Vec<Vec<u8>>),
209+
}
210+
211+
macro_rules! convert_array_variant {
212+
($variant:ident, $vec:expr) => {
213+
if $vec.iter().any(|v| v.is_none()) {
214+
bail!(
215+
ErrorKind::NullValuesNotSupportedInArray,
216+
"NULL values in arrays are not supported",
217+
format!(
218+
"Remove the NULL values from the array {:?} and try again",
219+
$vec
220+
)
221+
)
222+
} else {
223+
Ok(ArrayCellNonOptional::$variant(
224+
$vec.into_iter().flatten().collect(),
225+
))
226+
}
227+
};
228+
}
229+
230+
impl TryFrom<ArrayCell> for ArrayCellNonOptional {
231+
type Error = EtlError;
232+
233+
fn try_from(array_cell: ArrayCell) -> Result<Self, Self::Error> {
234+
match array_cell {
235+
ArrayCell::Null => Ok(ArrayCellNonOptional::Null),
236+
ArrayCell::Bool(vec) => convert_array_variant!(Bool, vec),
237+
ArrayCell::String(vec) => convert_array_variant!(String, vec),
238+
ArrayCell::I16(vec) => convert_array_variant!(I16, vec),
239+
ArrayCell::I32(vec) => convert_array_variant!(I32, vec),
240+
ArrayCell::U32(vec) => convert_array_variant!(U32, vec),
241+
ArrayCell::I64(vec) => convert_array_variant!(I64, vec),
242+
ArrayCell::F32(vec) => convert_array_variant!(F32, vec),
243+
ArrayCell::F64(vec) => convert_array_variant!(F64, vec),
244+
ArrayCell::Numeric(vec) => convert_array_variant!(Numeric, vec),
245+
ArrayCell::Date(vec) => convert_array_variant!(Date, vec),
246+
ArrayCell::Time(vec) => convert_array_variant!(Time, vec),
247+
ArrayCell::TimeStamp(vec) => convert_array_variant!(TimeStamp, vec),
248+
ArrayCell::TimeStampTz(vec) => convert_array_variant!(TimeStampTz, vec),
249+
ArrayCell::Uuid(vec) => convert_array_variant!(Uuid, vec),
250+
ArrayCell::Json(vec) => convert_array_variant!(Json, vec),
251+
ArrayCell::Bytes(vec) => convert_array_variant!(Bytes, vec),
252+
}
253+
}
254+
}
255+
256+
impl ArrayCellNonOptional {
257+
fn clear(&mut self) {
258+
match self {
259+
ArrayCellNonOptional::Null => {}
260+
ArrayCellNonOptional::Bool(vec) => vec.clear(),
261+
ArrayCellNonOptional::String(vec) => vec.clear(),
262+
ArrayCellNonOptional::I16(vec) => vec.clear(),
263+
ArrayCellNonOptional::I32(vec) => vec.clear(),
264+
ArrayCellNonOptional::U32(vec) => vec.clear(),
265+
ArrayCellNonOptional::I64(vec) => vec.clear(),
266+
ArrayCellNonOptional::F32(vec) => vec.clear(),
267+
ArrayCellNonOptional::F64(vec) => vec.clear(),
268+
ArrayCellNonOptional::Numeric(vec) => vec.clear(),
269+
ArrayCellNonOptional::Date(vec) => vec.clear(),
270+
ArrayCellNonOptional::Time(vec) => vec.clear(),
271+
ArrayCellNonOptional::TimeStamp(vec) => vec.clear(),
272+
ArrayCellNonOptional::TimeStampTz(vec) => vec.clear(),
273+
ArrayCellNonOptional::Uuid(vec) => vec.clear(),
274+
ArrayCellNonOptional::Json(vec) => vec.clear(),
275+
ArrayCellNonOptional::Bytes(vec) => vec.clear(),
276+
}
277+
}
278+
}

0 commit comments

Comments
 (0)