Skip to content

Commit

Permalink
Add support for multirow
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Dec 3, 2023
1 parent 62e4f31 commit 091aa94
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 2 deletions.
113 changes: 113 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,40 @@ impl RowState {
}
}

#[derive(Debug, PartialEq)]
struct MultiRowState {
c_row: Option<RowState>,
rows: Vec<Row>,
md_state: u8,
md1_target: u64,
md2_col_cnt: u64,
}

impl Default for MultiRowState {
fn default() -> Self {
Self::new(None, vec![], 0, 0, 0)
}
}

impl MultiRowState {
fn new(c_row: Option<RowState>, rows: Vec<Row>, md_s: u8, md_cnt: u64, md_target: u64) -> Self {
Self {
c_row,
rows,
md_state: md_s,
md1_target: md_target,
md2_col_cnt: md_cnt,
}
}
}

#[derive(Debug, PartialEq)]
enum ResponseState {
Initial,
PValue(PendingValue),
PError,
PRow(RowState),
PMultiRow(MultiRowState),
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -147,6 +175,7 @@ impl<'a> Decoder<'a> {
ResponseState::PError => self.resume_error(),
ResponseState::PValue(v) => self.resume_value(v),
ResponseState::PRow(r) => self.resume_row(r),
ResponseState::PMultiRow(mr) => self.resume_rows(mr),
}
}
pub fn position(&self) -> usize {
Expand All @@ -159,6 +188,7 @@ impl<'a> Decoder<'a> {
0x10 => self.resume_error(),
0x11 => self.resume_row(RowState::new(ValueStateMeta::zero(), vec![], None)),
0x12 => return DecodeState::Completed(Response::Empty),
0x13 => self.resume_rows(MultiRowState::default()),
code => match self.start_decode(true, code, vec![], None) {
Ok(ValueDecodeStateAny::Decoded(v)) => DecodeState::Completed(Response::Value(v)),
Ok(ValueDecodeStateAny::Pending(pv)) => {
Expand Down Expand Up @@ -198,6 +228,9 @@ impl<'a> Decoder<'a> {
Err(e) => return DecodeState::Error(e),
}
}
self._decode_row_core(row_state)
}
fn _decode_row_core(&mut self, mut row_state: RowState) -> DecodeState {
while row_state.row.len() as u64 != row_state.meta.md1 {
let r = match row_state.tmp.take() {
None => {
Expand Down Expand Up @@ -228,6 +261,55 @@ impl<'a> Decoder<'a> {
}
DecodeState::Completed(Response::Row(Row::new(row_state.row)))
}
fn resume_rows(&mut self, mut multirow: MultiRowState) -> DecodeState {
if multirow.md_state == 0 {
match self.__resume_decode(multirow.md1_target, ValueStateMeta::zero()) {
Ok(ValueDecodeStateAny::Pending(ValueState { v, .. })) => {
multirow.md1_target = v.u64();
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
}
Ok(ValueDecodeStateAny::Decoded(v)) => {
multirow.md1_target = v.u64();
multirow.md_state += 1;
}
Err(e) => return DecodeState::Error(e),
}
}
if multirow.md_state == 1 {
match self.__resume_decode(multirow.md2_col_cnt, ValueStateMeta::zero()) {
Ok(ValueDecodeStateAny::Pending(ValueState { v, .. })) => {
multirow.md2_col_cnt = v.u64();
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
}
Ok(ValueDecodeStateAny::Decoded(v)) => {
multirow.md2_col_cnt = v.u64();
multirow.md_state += 1;
}
Err(e) => return DecodeState::Error(e),
}
}
while multirow.rows.len() as u64 != multirow.md1_target {
let ret = match multirow.c_row.take() {
Some(r) => self._decode_row_core(r),
None => self._decode_row_core(RowState::new(
ValueStateMeta::new(0, multirow.md2_col_cnt, true),
vec![],
None,
)),
};
match ret {
DecodeState::Completed(Response::Row(r)) => multirow.rows.push(r),
DecodeState::Completed(_) => unreachable!(),
e @ DecodeState::Error(_) => return e,
DecodeState::ChangeState(RState(ResponseState::PRow(pr))) => {
multirow.c_row = Some(pr);
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
}
DecodeState::ChangeState(_) => unreachable!(),
}
}
DecodeState::Completed(Response::Rows(multirow.rows))
}
}

impl<'a> Decoder<'a> {
Expand Down Expand Up @@ -634,3 +716,34 @@ fn t_row() {
])))
);
}

#[test]
fn t_mrow() {
let mut decoder = Decoder::new(b"\x133\n5\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0);
assert_eq!(
decoder.validate_response(RState::default()),
DecodeState::Completed(Response::Rows(vec![
Row::new(vec![
Value::Null,
Value::Bool(true),
Value::String("sayan".into()),
Value::UInt8(20),
Value::List(vec![])
]),
Row::new(vec![
Value::Null,
Value::Bool(true),
Value::String("elana".into()),
Value::UInt8(21),
Value::List(vec![])
]),
Row::new(vec![
Value::Null,
Value::Bool(true),
Value::String("emily".into()),
Value::UInt8(22),
Value::List(vec![])
])
]))
);
}
6 changes: 4 additions & 2 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub enum Response {
Value(Value),
/// The server returned a row
Row(Row),
/// A list of rows
Rows(Vec<Row>),
/// The server returned an error code
Error(u16),
}
Expand Down Expand Up @@ -97,7 +99,7 @@ impl<V: FromValue> FromResponse for V {
fn from_response(resp: Response) -> ClientResult<Self> {
match resp {
Response::Value(v) => V::from_value(v),
Response::Row(_) | Response::Empty => {
Response::Row(_) | Response::Empty | Response::Rows(_) => {
Err(Error::ParseError(ParseError::ResponseMismatch))
}
Response::Error(e) => Err(Error::ServerError(e)),
Expand Down Expand Up @@ -144,7 +146,7 @@ macro_rules! from_response_row {
fn from_response(resp: Response) -> ClientResult<Self> {
let row = match resp {
Response::Row(r) => r.into_values(),
Response::Empty | Response::Value(_) => return Err(Error::ParseError(ParseError::ResponseMismatch)),
Response::Empty | Response::Value(_) | Response::Rows(_) => return Err(Error::ParseError(ParseError::ResponseMismatch)),
Response::Error(e) => return Err(Error::ServerError(e)),
};
if row.len() != $size {
Expand Down

0 comments on commit 091aa94

Please sign in to comment.