Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: postgres arrays #233

Merged
merged 19 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/synth-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ jobs:
working-directory: synth/testing_harness/postgres
- run: ./e2e.sh test-warning
working-directory: synth/testing_harness/postgres
- run: ./e2e.sh test-arrays
working-directory: synth/testing_harness/postgres
96 changes: 95 additions & 1 deletion core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,98 @@ impl Type<MySql> for Value {
}
}

impl Value {
fn to_postgres_string(&self) -> String {
match self {
Self::Array(arr) => {
let (typ, _) = self.get_postgres_type();
let inner = arr
.iter()
.map(|v| v.to_postgres_string())
.collect::<Vec<String>>()
.join(", ");

if typ == "jsonb" {
format!("[{}]", inner)
} else {
format!("{{{}}}", inner)
}
}
Self::Null(_) => "NULL".to_string(),
Self::Bool(b) => b.to_string(),
Self::Number(num) => match num {
Number::F32(f32) => (*f32).to_string(),
Number::F64(f64) => (*f64).to_string(),
_ => num.to_string(),
},
Self::String(str) => format!("\"{}\"", str),
Self::DateTime(date) => date.format_to_string(),
Self::Object(_) => {
serde_json::to_string(&json::synth_val_to_json(self.clone())).unwrap()
}
}
}

pub fn get_postgres_type(&self) -> (&'static str, usize) {
let mut depth = 0;
let mut typ = "";

let mut current = Some(self);

// Based on https://docs.rs/sqlx-core/0.5.9/sqlx_core/postgres/types/index.html
while let Some(c) = current {
let pair = match c {
Value::Null(_) => (None, "unknown"),
Value::Bool(_) => (None, "bool"),
Value::Number(num) => match *num {
Number::I8(_) => (None, "char"),
Number::I16(_) => (None, "int2"),
Number::I32(_) => (None, "int4"),
Number::I64(_) => (None, "int8"),
Number::I128(_) => (None, "numeric"),
Number::U8(_) => (None, "char"),
Number::U16(_) => (None, "int2"),
Number::U32(_) => (None, "int4"),
Number::U64(_) => (None, "int8"),
Number::U128(_) => (None, "numeric"),
Number::F32(_) => (None, "float4"),
Number::F64(_) => (None, "float8"),
},
Value::String(_) => (None, "text"),
Value::DateTime(ChronoValueAndFormat { value, .. }) => match value {
ChronoValue::NaiveDate(_) => (None, "date"),
ChronoValue::NaiveTime(_) => (None, "time"),
ChronoValue::NaiveDateTime(_) => (None, "timestamp"),
ChronoValue::DateTime(_) => (None, "timestamptz"),
},
Value::Object(_) => (None, "jsonb"),
Value::Array(arr) => {
depth += 1;
if arr.is_empty() {
(None, "unknown")
} else {
(Some(&arr[0]), "")
}
}
};

current = pair.0;
typ = pair.1;
}

(typ, depth)
}
}

impl Encode<'_, Postgres> for Value {
fn produces(&self) -> Option<PgTypeInfo> {
// Only arrays needs a special type
match self {
Value::Array(_) => Some(PgTypeInfo::with_name("text")),
_ => None,
}
}

fn encode_by_ref(&self, buf: &mut PgArgumentBuffer) -> IsNull {
match self {
Value::Null(_) => IsNull::Yes,
Expand Down Expand Up @@ -409,7 +500,10 @@ impl Encode<'_, Postgres> for Value {
json::synth_val_to_json(self.clone()),
buf,
),
Value::Array(arr) => arr.encode_by_ref(buf), //TODO special-case for BYTEA
Value::Array(_) => {
let s = self.to_postgres_string();
<String as Encode<'_, Postgres>>::encode_by_ref(&s, buf)
} //TODO special-case for BYTEA
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions synth/src/datasource/mysql_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RelationalDataSource for MySqlDataSource {
async fn execute_query(
&self,
query: String,
query_params: Vec<&Value>,
query_params: Vec<Value>,
) -> Result<MySqlQueryResult> {
let mut query = sqlx::query(query.as_str());

Expand Down Expand Up @@ -199,7 +199,13 @@ impl RelationalDataSource for MySqlDataSource {
Ok(content)
}

fn extend_parameterised_query(query: &mut String, _curr_index: usize, extend: usize) {
fn extend_parameterised_query(
query: &mut String,
_curr_index: usize,
query_params: Vec<Value>,
) {
let extend = query_params.len();

query.push('(');
for i in 0..extend {
query.push('?');
Expand Down Expand Up @@ -299,7 +305,7 @@ fn try_match_value(row: &MySqlRow, column: &MySqlColumn) -> Result<Value> {
return Ok(Value::Number(Number::from(truncated)));
}

bail!("Failed to convert Postgresql numeric data type to 64 bit float")
bail!("Failed to convert Mysql numeric data type to 64 bit float")
}
"timestamp" => Value::String(row.try_get::<String, &str>(column.name())?),
"date" => Value::String(format!(
Expand Down
172 changes: 158 additions & 14 deletions synth/src/datasource/postgres_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use std::collections::BTreeMap;
use std::convert::TryFrom;
use synth_core::schema::number_content::{F32, F64, I32, I64};
use synth_core::schema::{
BoolContent, Categorical, ChronoValueType, DateTimeContent, NumberContent, RangeStep,
RegexContent, StringContent, Uuid,
ArrayContent, BoolContent, Categorical, ChronoValue, ChronoValueAndFormat, ChronoValueType,
DateTimeContent, NumberContent, ObjectContent, RangeStep, RegexContent, StringContent, Uuid,
};
use synth_core::{Content, Value};

Expand Down Expand Up @@ -120,7 +120,7 @@ impl RelationalDataSource for PostgresDataSource {
async fn execute_query(
&self,
query: String,
query_params: Vec<&Value>,
query_params: Vec<Value>,
) -> Result<PgQueryResult> {
let mut query = sqlx::query(query.as_str());

Expand Down Expand Up @@ -255,20 +255,20 @@ impl RelationalDataSource for PostgresDataSource {
RegexContent::pattern(pattern).context("pattern will always compile")?,
))
}
"int2" => Content::Number(NumberContent::I64(I64::Range(RangeStep::default()))),
"int2" => Content::Number(NumberContent::I32(I32::Range(RangeStep::default()))),
"int4" => Content::Number(NumberContent::I32(I32::Range(RangeStep::default()))),
"int8" => Content::Number(NumberContent::I64(I64::Range(RangeStep::default()))),
"float4" => Content::Number(NumberContent::F32(F32::Range(RangeStep::default()))),
"float8" => Content::Number(NumberContent::F64(F64::Range(RangeStep::default()))),
"numeric" => Content::Number(NumberContent::F64(F64::Range(RangeStep::default()))),
"timestamptz" => Content::DateTime(DateTimeContent {
format: "".to_string(), // todo
format: "%Y-%m-%dT%H:%M:%S%z".to_string(),
type_: ChronoValueType::DateTime,
begin: None,
end: None,
}),
"timestamp" => Content::DateTime(DateTimeContent {
format: "".to_string(), // todo
format: "%Y-%m-%dT%H:%M:%S".to_string(),
type_: ChronoValueType::NaiveDateTime,
begin: None,
end: None,
Expand All @@ -279,20 +279,56 @@ impl RelationalDataSource for PostgresDataSource {
begin: None,
end: None,
}),
"time" => Content::DateTime(DateTimeContent {
format: "%H:%M:%S".to_string(),
type_: ChronoValueType::NaiveTime,
begin: None,
end: None,
}),
"json" | "jsonb" => Content::Object(ObjectContent {
skip_when_null: false,
fields: BTreeMap::new(),
}),
"uuid" => Content::String(StringContent::Uuid(Uuid)),
_ => bail!(
"We haven't implemented a converter for {}",
column_info.data_type
),
_ => {
if let Some(data_type) = column_info.data_type.strip_prefix('_') {
let mut column_info = column_info.clone();
column_info.data_type = data_type.to_string();

Content::Array(ArrayContent::from_content_default_length(
self.decode_to_content(&column_info)?,
))
} else {
bail!(
"We haven't implemented a converter for {}",
column_info.data_type
)
}
}
};

Ok(content)
}

fn extend_parameterised_query(query: &mut String, curr_index: usize, extend: usize) {
fn extend_parameterised_query(query: &mut String, curr_index: usize, query_params: Vec<Value>) {
let extend = query_params.len();

query.push('(');
for i in 0..extend {
query.push_str(&format!("${}", curr_index + i + 1));
for (i, param) in query_params.iter().enumerate() {
let extra = if let Value::Array(_) = param {
let (typ, depth) = param.get_postgres_type();
if typ == "unknown" {
"".to_string() // This is currently not supported
} else if typ == "jsonb" {
"::jsonb".to_string() // Cannot have an array of jsonb - ie jsonb[]
} else {
format!("::{}{}", typ, "[]".repeat(depth))
}
} else {
"".to_string()
};

query.push_str(&format!("${}{}", curr_index + i + 1, extra));
if i != extend - 1 {
query.push(',');
}
Expand Down Expand Up @@ -347,7 +383,10 @@ impl TryFrom<PgRow> for ValueWrapper {
let mut kv = BTreeMap::new();

for column in row.columns() {
let value = try_match_value(&row, column).unwrap_or(Value::Null(()));
let value = try_match_value(&row, column).unwrap_or_else(|err| {
debug!("try_match_value failed: {}", err);
Value::Null(())
});
kv.insert(column.name().to_string(), value);
}

Expand Down Expand Up @@ -389,6 +428,111 @@ fn try_match_value(row: &PgRow, column: &PgColumn) -> Result<Value> {
"{}",
row.try_get::<chrono::NaiveDate, &str>(column.name())?
)),
"time" => Value::String(format!(
"{}",
row.try_get::<chrono::NaiveTime, &str>(column.name())?
)),
"json" | "jsonb" => {
let serde_value = row.try_get::<serde_json::Value, &str>(column.name())?;
serde_json::from_value(serde_value)?
}
"char[]" | "varchar[]" | "text[]" | "citext[]" | "bpchar[]" | "name[]" | "unknown[]" => {
Value::Array(
row.try_get::<Vec<String>, &str>(column.name())
.map(|vec| vec.iter().map(|s| Value::String(s.to_string())).collect())?,
)
}
"bool[]" => Value::Array(
row.try_get::<Vec<bool>, &str>(column.name())
.map(|vec| vec.into_iter().map(Value::Bool).collect())?,
),
"int2[]" => Value::Array(
row.try_get::<Vec<i16>, &str>(column.name())
.map(|vec| vec.into_iter().map(|i| Value::Number(i.into())).collect())?,
),
"int4[]" => Value::Array(
row.try_get::<Vec<i32>, &str>(column.name())
.map(|vec| vec.into_iter().map(|i| Value::Number(i.into())).collect())?,
),
"int8[]" => Value::Array(
row.try_get::<Vec<i64>, &str>(column.name())
.map(|vec| vec.into_iter().map(|i| Value::Number(i.into())).collect())?,
),
"float4[]" => Value::Array(
row.try_get::<Vec<f32>, &str>(column.name())
.map(|vec| vec.into_iter().map(|i| Value::Number(i.into())).collect())?,
),
"float8[]" => Value::Array(
row.try_get::<Vec<f64>, &str>(column.name())
.map(|vec| vec.into_iter().map(|i| Value::Number(i.into())).collect())?,
),
"numeric[]" => {
let vec = row.try_get::<Vec<Decimal>, &str>(column.name())?;
let result: Result<Vec<Value>, _> = vec
.into_iter()
.map(|d| {
if let Some(truncated) = d.to_f64() {
return Ok(Value::Number(truncated.into()));
}

bail!("Failed to convert Postgresql numeric data type to 64 bit float")
})
.collect();

Value::Array(result?)
}
"timestamp[]" => Value::Array(
row.try_get::<Vec<chrono::NaiveDateTime>, &str>(column.name())
.map(|vec| {
vec.into_iter()
.map(|d| {
Value::DateTime(ChronoValueAndFormat {
format: Arc::from("%Y-%m-%dT%H:%M:%S".to_owned()),
value: ChronoValue::NaiveDateTime(d),
})
})
.collect()
})?,
),
"timestamptz[]" => Value::Array(
row.try_get::<Vec<chrono::DateTime<chrono::FixedOffset>>, &str>(column.name())
.map(|vec| {
vec.into_iter()
.map(|d| {
Value::DateTime(ChronoValueAndFormat {
format: Arc::from("%Y-%m-%dT%H:%M:%S%z".to_owned()),
value: ChronoValue::DateTime(d),
})
})
.collect()
})?,
),
"date[]" => Value::Array(
row.try_get::<Vec<chrono::NaiveDate>, &str>(column.name())
.map(|vec| {
vec.into_iter()
.map(|d| {
Value::DateTime(ChronoValueAndFormat {
format: Arc::from("%Y-%m-%d".to_owned()),
value: ChronoValue::NaiveDate(d),
})
})
.collect()
})?,
),
"time[]" => Value::Array(
row.try_get::<Vec<chrono::NaiveTime>, &str>(column.name())
.map(|vec| {
vec.into_iter()
.map(|t| {
Value::DateTime(ChronoValueAndFormat {
format: Arc::from("%H:%M:%S".to_owned()),
value: ChronoValue::NaiveTime(t),
})
})
.collect()
})?,
),
_ => {
bail!(
"Could not convert value. Converter not implemented for {}",
Expand Down
Loading