Skip to content

Commit

Permalink
readyset-psql: Eliminate row module and associated inefficiencies
Browse files Browse the repository at this point in the history
The `row` module contained a bunch of custom iterator code, but it all
seems fairly extraneous since it's only used in one place, and that
place can be rewritten to use simple primitives like `map()` and `zip()`
without losing any clarity or even increasing the lines-of-code count.

Additionally, RowIterator called `clone()` on both the DfValue and the
Type in its `next()` implementation, which seems wasteful and
unnecessary. I'm not sure of the best way to measure the impact here,
but cloning every single value in every cached result seems likely to
make a significant difference in at least some cases. (Even if it makes
no performance difference, though, it still seems worth it to simplify
the code and eliminate the extra 100+ lines of boilerplate.)

In the case of DfValue, we now just move it into TypedDfValue without
cloning it; in the case of Type, we can't move it out of the Arc that
it's behind, but I was able to easily adapt the existing code to pass
the Type values by reference instead.

Release-Note-Core: Improved performance of Postgres cached query
  response encoding by avoiding an unnecessary copy of the results in
  memory.
Change-Id: I82e12796fcd395ecbb59073297c47057042cc8cc
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/5555
Tested-by: Buildkite CI
Reviewed-by: Aspen Smith <aspen@readyset.io>
  • Loading branch information
nickelization committed Jul 26, 2023
1 parent ca7ef7d commit e662a9d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 177 deletions.
1 change: 0 additions & 1 deletion readyset-psql/src/lib.rs
Expand Up @@ -4,7 +4,6 @@ mod error;
mod query_handler;
mod response;
mod resultset;
mod row;
mod schema;
mod upstream;
mod value;
Expand Down
12 changes: 6 additions & 6 deletions readyset-psql/src/resultset.rs
Expand Up @@ -9,8 +9,8 @@ use readyset_client::results::ResultIterator;
use tokio_postgres::types::Type;
use tokio_postgres::{GenericResult, ResultStream};

use crate::row::Row;
use crate::schema::{type_to_pgsql, SelectSchema};
use crate::TypedDfValue;

enum ResultsetInner {
Empty,
Expand Down Expand Up @@ -82,11 +82,11 @@ impl Stream for Resultset {
let next = match &mut self.get_mut().results {
ResultsetInner::Empty => None,
ResultsetInner::ReadySet(i) => i.next().map(|values| {
let row = Row {
values,
project_field_types,
};
row.into_iter().map(PsqlValue::try_from).collect()
values
.into_iter()
.zip(project_field_types.iter())
.map(|(value, col_type)| PsqlValue::try_from(TypedDfValue { value, col_type }))
.collect()
}),
ResultsetInner::Stream { first_row, stream } => {
let row = match first_row.take() {
Expand Down
112 changes: 0 additions & 112 deletions readyset-psql/src/row.rs

This file was deleted.

116 changes: 58 additions & 58 deletions readyset-psql/src/value.rs
Expand Up @@ -14,16 +14,16 @@ use uuid::Uuid;

/// An encapsulation of a ReadySet `DfValue` value that facilitates conversion of this `DfValue`
/// into a `psql_srv::PsqlValue`.
pub struct TypedDfValue {
pub struct TypedDfValue<'a> {
/// A type attribute used to determine which variant of `psql_srv::PsqlValue` the `value`
/// attribute should be converted to.
pub col_type: Type,
pub col_type: &'a Type,

/// The data value itself.
pub value: DfValue,
}

impl TryFrom<TypedDfValue> for PsqlValue {
impl<'a> TryFrom<TypedDfValue<'a>> for PsqlValue {
type Error = ps::Error;

fn try_from(v: TypedDfValue) -> Result<Self, Self::Error> {
Expand All @@ -41,76 +41,76 @@ impl TryFrom<TypedDfValue> for PsqlValue {

match (v.col_type, v.value) {
(_, DfValue::None) => Ok(PsqlValue::Null),
(Type::CHAR, DfValue::Int(v)) => Ok(PsqlValue::Char(v.try_into()?)),
(Type::VARCHAR, DfValue::Text(v)) => Ok(PsqlValue::VarChar(v)),
(Type::VARCHAR, DfValue::TinyText(t)) => Ok(PsqlValue::VarChar(t.as_str().into())),
(Type::NAME, DfValue::Text(t)) => Ok(PsqlValue::Name(t)),
(Type::NAME, DfValue::TinyText(t)) => Ok(PsqlValue::Name(t.as_str().into())),
(Type::BPCHAR, DfValue::Text(v)) => Ok(PsqlValue::BpChar(v)),
(Type::BPCHAR, DfValue::TinyText(t)) => Ok(PsqlValue::BpChar(t.as_str().into())),

(Type::INT2, DfValue::Int(v)) => Ok(PsqlValue::SmallInt(v as _)),
(Type::INT4, DfValue::Int(v)) => Ok(PsqlValue::Int(v as _)),
(Type::INT8, DfValue::Int(v)) => Ok(PsqlValue::BigInt(v as _)),

(Type::INT2, DfValue::UnsignedInt(v)) => Ok(PsqlValue::SmallInt(v as _)),
(Type::INT4, DfValue::UnsignedInt(v)) => Ok(PsqlValue::Int(v as _)),
(Type::INT8, DfValue::UnsignedInt(v)) => Ok(PsqlValue::BigInt(v as _)),

(ref ty, DfValue::UnsignedInt(v)) if type_is_oid(ty) => {
Ok(PsqlValue::Oid(v.try_into()?))
}
(ref ty, DfValue::Int(v)) if type_is_oid(ty) => Ok(PsqlValue::Oid(v.try_into()?)),

(Type::FLOAT4 | Type::FLOAT8, DfValue::Float(f)) => Ok(PsqlValue::Float(f)),
(Type::FLOAT8, DfValue::Double(f)) => Ok(PsqlValue::Double(f)),
(Type::NUMERIC, DfValue::Double(f)) => Ok(PsqlValue::Numeric(
(&Type::CHAR, DfValue::Int(v)) => Ok(PsqlValue::Char(v.try_into()?)),
(&Type::VARCHAR, DfValue::Text(v)) => Ok(PsqlValue::VarChar(v)),
(&Type::VARCHAR, DfValue::TinyText(t)) => Ok(PsqlValue::VarChar(t.as_str().into())),
(&Type::NAME, DfValue::Text(t)) => Ok(PsqlValue::Name(t)),
(&Type::NAME, DfValue::TinyText(t)) => Ok(PsqlValue::Name(t.as_str().into())),
(&Type::BPCHAR, DfValue::Text(v)) => Ok(PsqlValue::BpChar(v)),
(&Type::BPCHAR, DfValue::TinyText(t)) => Ok(PsqlValue::BpChar(t.as_str().into())),

(&Type::INT2, DfValue::Int(v)) => Ok(PsqlValue::SmallInt(v as _)),
(&Type::INT4, DfValue::Int(v)) => Ok(PsqlValue::Int(v as _)),
(&Type::INT8, DfValue::Int(v)) => Ok(PsqlValue::BigInt(v as _)),

(&Type::INT2, DfValue::UnsignedInt(v)) => Ok(PsqlValue::SmallInt(v as _)),
(&Type::INT4, DfValue::UnsignedInt(v)) => Ok(PsqlValue::Int(v as _)),
(&Type::INT8, DfValue::UnsignedInt(v)) => Ok(PsqlValue::BigInt(v as _)),

(ty, DfValue::UnsignedInt(v)) if type_is_oid(ty) => Ok(PsqlValue::Oid(v.try_into()?)),
(ty, DfValue::Int(v)) if type_is_oid(ty) => Ok(PsqlValue::Oid(v.try_into()?)),

(&Type::FLOAT4 | &Type::FLOAT8, DfValue::Float(f)) => Ok(PsqlValue::Float(f)),
(&Type::FLOAT8, DfValue::Double(f)) => Ok(PsqlValue::Double(f)),
(&Type::NUMERIC, DfValue::Double(f)) => Ok(PsqlValue::Numeric(
<Decimal>::try_from(f).map_err(|e| ps::Error::InternalError(e.to_string()))?,
)),
(Type::NUMERIC, DfValue::Numeric(ref d)) => Ok(PsqlValue::Numeric(*d.as_ref())),
(Type::TEXT, DfValue::Text(v)) => Ok(PsqlValue::Text(v)),
(Type::TEXT, DfValue::TinyText(t)) => Ok(PsqlValue::Text(t.as_str().into())),
(ref ty, DfValue::Text(v)) if ty.name() == "citext" => Ok(PsqlValue::Text(v)),
(ref ty, DfValue::TinyText(t)) if ty.name() == "citext" => {
(&Type::NUMERIC, DfValue::Numeric(ref d)) => Ok(PsqlValue::Numeric(*d.as_ref())),
(&Type::TEXT, DfValue::Text(v)) => Ok(PsqlValue::Text(v)),
(&Type::TEXT, DfValue::TinyText(t)) => Ok(PsqlValue::Text(t.as_str().into())),
(ty, DfValue::Text(v)) if ty.name() == "citext" => Ok(PsqlValue::Text(v)),
(ty, DfValue::TinyText(t)) if ty.name() == "citext" => {
Ok(PsqlValue::Text(t.as_str().into()))
}
(Type::TIMESTAMP, DfValue::TimestampTz(v)) => {
(&Type::TIMESTAMP, DfValue::TimestampTz(v)) => {
Ok(PsqlValue::Timestamp(v.to_chrono().naive_local()))
}
(Type::TIMESTAMPTZ, DfValue::TimestampTz(v)) => {
(&Type::TIMESTAMPTZ, DfValue::TimestampTz(v)) => {
Ok(PsqlValue::TimestampTz(v.to_chrono()))
}
(Type::DATE, DfValue::TimestampTz(v)) => {
(&Type::DATE, DfValue::TimestampTz(v)) => {
Ok(PsqlValue::Date(v.to_chrono().naive_local().date()))
}
(Type::TIME, DfValue::Time(t)) => Ok(PsqlValue::Time((t).into())),
(Type::BOOL, DfValue::UnsignedInt(v)) => Ok(PsqlValue::Bool(v != 0)),
(Type::BOOL, DfValue::Int(v)) => Ok(PsqlValue::Bool(v != 0)),
(Type::BYTEA, DfValue::ByteArray(b)) => Ok(PsqlValue::ByteArray(
(&Type::TIME, DfValue::Time(t)) => Ok(PsqlValue::Time((t).into())),
(&Type::BOOL, DfValue::UnsignedInt(v)) => Ok(PsqlValue::Bool(v != 0)),
(&Type::BOOL, DfValue::Int(v)) => Ok(PsqlValue::Bool(v != 0)),
(&Type::BYTEA, DfValue::ByteArray(b)) => Ok(PsqlValue::ByteArray(
std::sync::Arc::try_unwrap(b).unwrap_or_else(|v| v.as_ref().to_vec()),
)),
(Type::MACADDR, DfValue::Text(m)) => Ok(PsqlValue::MacAddress(
(&Type::MACADDR, DfValue::Text(m)) => Ok(PsqlValue::MacAddress(
MacAddress::parse_str(m.as_str())
.map_err(|e| ps::Error::ParseError(e.to_string()))?,
)),
(Type::INET, dt @ (DfValue::Text(_) | DfValue::TinyText(_))) => Ok(PsqlValue::Inet(
(&Type::INET, dt @ (DfValue::Text(_) | DfValue::TinyText(_))) => Ok(PsqlValue::Inet(
<&str>::try_from(&dt)
.unwrap()
.parse::<IpInet>()
.map_err(|e| ps::Error::ParseError(e.to_string()))?,
)),
(Type::UUID, DfValue::Text(u)) => Ok(PsqlValue::Uuid(
(&Type::UUID, DfValue::Text(u)) => Ok(PsqlValue::Uuid(
Uuid::parse_str(u.as_str()).map_err(|e| ps::Error::ParseError(e.to_string()))?,
)),
(Type::JSON, ref d @ (DfValue::Text(_) | DfValue::TinyText(_))) => Ok(PsqlValue::Json(
<&str>::try_from(d)
.map_err(|e| ps::Error::InternalError(e.to_string()))
.and_then(|s| {
serde_json::from_str::<serde_json::Value>(s)
.map_err(|e| ps::Error::ParseError(e.to_string()))
})?,
)),
(Type::JSONB, ref d @ (DfValue::Text(_) | DfValue::TinyText(_))) => {
(&Type::JSON, ref d @ (DfValue::Text(_) | DfValue::TinyText(_))) => {
Ok(PsqlValue::Json(
<&str>::try_from(d)
.map_err(|e| ps::Error::InternalError(e.to_string()))
.and_then(|s| {
serde_json::from_str::<serde_json::Value>(s)
.map_err(|e| ps::Error::ParseError(e.to_string()))
})?,
))
}
(&Type::JSONB, ref d @ (DfValue::Text(_) | DfValue::TinyText(_))) => {
Ok(PsqlValue::Jsonb(
<&str>::try_from(d)
.map_err(|e| ps::Error::InternalError(e.to_string()))
Expand All @@ -120,8 +120,8 @@ impl TryFrom<TypedDfValue> for PsqlValue {
})?,
))
}
(Type::BIT, DfValue::BitVector(ref b)) => Ok(PsqlValue::Bit(b.as_ref().clone())),
(Type::VARBIT, DfValue::BitVector(ref b)) => Ok(PsqlValue::VarBit(b.as_ref().clone())),
(&Type::BIT, DfValue::BitVector(ref b)) => Ok(PsqlValue::Bit(b.as_ref().clone())),
(&Type::VARBIT, DfValue::BitVector(ref b)) => Ok(PsqlValue::VarBit(b.as_ref().clone())),
(t, DfValue::Array(ref arr)) => {
if let Kind::Array(member) = t.kind() {
let mut arr = (**arr).clone();
Expand All @@ -130,7 +130,7 @@ impl TryFrom<TypedDfValue> for PsqlValue {
*val = convert_enum_value(vs, val.clone())?.clone().into();
}
}
Ok(PsqlValue::Array(arr, t))
Ok(PsqlValue::Array(arr, t.clone()))
} else {
Err(ps::Error::InternalError(format!(
"Mismatched type for value: expected array type, but got {t}"
Expand All @@ -150,7 +150,7 @@ impl TryFrom<TypedDfValue> for PsqlValue {
data_type = ?val.sql_type(),
"Tried to serialize value to postgres with unsupported type"
);
Err(ps::Error::UnsupportedType(t))
Err(ps::Error::UnsupportedType(t.clone()))
}
}
}
Expand All @@ -167,7 +167,7 @@ mod tests {
#[test]
fn i8_char() {
let val = TypedDfValue {
col_type: Type::CHAR,
col_type: &Type::CHAR,
value: DfValue::Int(8i8 as _),
};
assert_eq!(PsqlValue::try_from(val).unwrap(), PsqlValue::Char(8));
Expand All @@ -176,7 +176,7 @@ mod tests {
#[test]
fn tiny_text_varchar() {
let val = TypedDfValue {
col_type: Type::VARCHAR,
col_type: &Type::VARCHAR,
value: DfValue::TinyText(TinyText::from_arr(b"aaaaaaaaaaaaaa")),
};
assert_eq!(
Expand All @@ -188,7 +188,7 @@ mod tests {
#[test]
fn tiny_text_text() {
let val = TypedDfValue {
col_type: Type::TEXT,
col_type: &Type::TEXT,
value: DfValue::TinyText(TinyText::from_arr(b"aaaaaaaaaaaaaa")),
};
assert_eq!(
Expand Down

0 comments on commit e662a9d

Please sign in to comment.