From 5df52bc2b56422f4510c06f89168f31b1dc998c7 Mon Sep 17 00:00:00 2001 From: Sayan Nandan Date: Thu, 7 Mar 2024 13:46:32 +0530 Subject: [PATCH] Refactor protocol modules --- src/io/aio.rs | 5 +- src/io/sync.rs | 5 +- src/protocol/handshake.rs | 55 +++++ src/{protocol.rs => protocol/mod.rs} | 340 ++------------------------- src/protocol/pipe.rs | 106 +++++++++ src/protocol/state.rs | 254 ++++++++++++++++++++ 6 files changed, 438 insertions(+), 327 deletions(-) create mode 100644 src/protocol/handshake.rs rename src/{protocol.rs => protocol/mod.rs} (70%) create mode 100644 src/protocol/pipe.rs create mode 100644 src/protocol/state.rs diff --git a/src/io/aio.rs b/src/io/aio.rs index e282ecc..34b4005 100644 --- a/src/io/aio.rs +++ b/src/io/aio.rs @@ -25,8 +25,9 @@ use { crate::{ error::{ClientResult, ConnectionSetupError, Error}, protocol::{ - ClientHandshake, DecodeState, Decoder, MRespState, PipelineResult, RState, - ServerHandshake, + handshake::{ClientHandshake, ServerHandshake}, + state_init::{DecodeState, MRespState, PipelineResult, RState}, + Decoder, }, query::Pipeline, response::{FromResponse, Response}, diff --git a/src/io/sync.rs b/src/io/sync.rs index d26523f..aae4647 100644 --- a/src/io/sync.rs +++ b/src/io/sync.rs @@ -27,8 +27,9 @@ use { config::Config, error::{ClientResult, ConnectionSetupError, Error}, protocol::{ - ClientHandshake, DecodeState, Decoder, MRespState, PipelineResult, RState, - ServerHandshake, + handshake::{ClientHandshake, ServerHandshake}, + state_init::{DecodeState, MRespState, PipelineResult, RState}, + Decoder, }, query::Pipeline, response::{FromResponse, Response}, diff --git a/src/protocol/handshake.rs b/src/protocol/handshake.rs new file mode 100644 index 0000000..d885aed --- /dev/null +++ b/src/protocol/handshake.rs @@ -0,0 +1,55 @@ +/* + * Copyright 2024, Sayan Nandan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +use crate::{ + error::{ConnectionSetupError, Error}, + ClientResult, Config, +}; + +pub struct ClientHandshake(Box<[u8]>); +impl ClientHandshake { + pub(crate) fn new(cfg: &Config) -> Self { + let mut v = Vec::with_capacity(6 + cfg.username().len() + cfg.password().len() + 5); + v.extend(b"H\x00\x00\x00\x00\x00"); + pushlen!(v, cfg.username().len()); + pushlen!(v, cfg.password().len()); + v.extend(cfg.username().as_bytes()); + v.extend(cfg.password().as_bytes()); + Self(v.into_boxed_slice()) + } + pub(crate) fn inner(&self) -> &[u8] { + &self.0 + } +} + +#[derive(Debug)] +pub enum ServerHandshake { + Okay(u8), + Error(u8), +} +impl ServerHandshake { + pub fn parse(v: [u8; 4]) -> ClientResult { + Ok(match v { + [b'H', 0, 0, msg] => Self::Okay(msg), + [b'H', 0, 1, msg] => Self::Error(msg), + _ => { + return Err(Error::ConnectionSetupErr( + ConnectionSetupError::InvalidServerHandshake, + )) + } + }) + } +} diff --git a/src/protocol.rs b/src/protocol/mod.rs similarity index 70% rename from src/protocol.rs rename to src/protocol/mod.rs index e2f9050..d364c7b 100644 --- a/src/protocol.rs +++ b/src/protocol/mod.rs @@ -14,12 +14,25 @@ * limitations under the License. */ -use crate::{ - config::Config, - error::{ClientResult, ConnectionSetupError, Error}, - response::{Response, Row, Value}, +pub mod handshake; +mod pipe; +mod state; + +use { + self::state::{ + DecodeState, MetaState, MultiRowState, PendingValue, RState, ResponseState, RowState, + ValueDecodeState, ValueDecodeStateAny, ValueDecodeStateRaw, ValueState, ValueStateMeta, + }, + crate::response::{Response, Row, Value}, }; +pub mod state_init { + pub(crate) use super::{ + pipe::{MRespState, PipelineResult}, + state::{DecodeState, RState}, + }; +} + pub(crate) type ProtocolResult = Result; /// Errors that can happen when handling protocol level encoding and decoding @@ -42,191 +55,6 @@ impl Value { } } -/* - Decode state management -*/ - -type ValueDecodeStateRaw = ValueDecodeStateAny; -type ValueDecodeState = ValueDecodeStateAny; - -#[derive(Debug, Default, PartialEq)] -struct MetaState { - completed: bool, - val: u64, -} - -impl MetaState { - fn new(completed: bool, val: u64) -> Self { - Self { completed, val } - } - #[inline(always)] - fn finished(&mut self, decoder: &mut Decoder) -> ProtocolResult { - self.finish_or_continue(decoder, || Ok(true), || Ok(false), |e| Err(e)) - } - #[inline(always)] - fn finish_or_continue( - &mut self, - decoder: &mut Decoder, - if_completed: impl FnOnce() -> T, - if_pending: impl FnOnce() -> T, - if_err: impl FnOnce(ProtocolError) -> T, - ) -> T { - Self::try_finish_or_continue( - self.completed, - &mut self.val, - decoder, - if_completed, - if_pending, - if_err, - ) - } - #[inline(always)] - fn try_finish(decoder: &mut Decoder, completed: bool, val: &mut u64) -> ProtocolResult { - Self::try_finish_or_continue( - completed, - val, - decoder, - || Ok(true), - || Ok(false), - |e| Err(e), - ) - } - #[inline(always)] - fn try_finish_or_continue( - completed: bool, - val: &mut u64, - decoder: &mut Decoder, - if_completed: impl FnOnce() -> T, - if_pending: impl FnOnce() -> T, - if_err: impl FnOnce(ProtocolError) -> T, - ) -> T { - if completed { - if_completed() - } else { - match decoder.__resume_decode(*val, ValueStateMeta::zero()) { - Ok(vs) => match vs { - ValueDecodeStateAny::Pending(ValueState { v, .. }) => { - *val = v.u64(); - if_pending() - } - ValueDecodeStateAny::Decoded(v) => { - *val = v.u64(); - if_completed() - } - }, - Err(e) => if_err(e), - } - } - } - #[inline(always)] - fn val(&self) -> u64 { - self.val - } -} - -#[derive(Debug, PartialEq)] -enum ValueDecodeStateAny { - Pending(P), - Decoded(V), -} - -#[derive(Debug, PartialEq)] -struct ValueState { - v: Value, - meta: ValueStateMeta, -} - -impl ValueState { - fn new(v: Value, meta: ValueStateMeta) -> Self { - Self { v, meta } - } -} - -#[derive(Debug, PartialEq)] -struct ValueStateMeta { - start: usize, - md: MetaState, -} - -impl ValueStateMeta { - fn zero() -> Self { - Self { - start: 0, - md: MetaState::default(), - } - } - fn new(start: usize, md1: u64, md1_flag: bool) -> Self { - Self { - start, - md: MetaState::new(md1_flag, md1), - } - } -} - -#[derive(Debug, PartialEq)] -struct RowState { - meta: ValueStateMeta, - row: Vec, - tmp: Option, -} - -impl RowState { - fn new(meta: ValueStateMeta, row: Vec, tmp: Option) -> Self { - Self { meta, row, tmp } - } -} - -#[derive(Debug, PartialEq)] -struct MultiRowState { - c_row: Option, - rows: Vec, - md_state: u8, - md1_target: u64, - md2_col_cnt: u64, -} - -impl Default for MultiRowState { - fn default() -> Self { - Self::new(None, vec![], 0, 0, 0) - } -} - -impl MultiRowState { - fn new(c_row: Option, rows: Vec, md_s: u8, md_cnt: u64, md_target: u64) -> Self { - Self { - c_row, - rows, - md_state: md_s, - md1_target: md_target, - md2_col_cnt: md_cnt, - } - } -} - -#[derive(Debug, PartialEq)] -enum ResponseState { - Initial, - PValue(PendingValue), - PError, - PRow(RowState), - PMultiRow(MultiRowState), -} - -#[derive(Debug, PartialEq)] -pub enum DecodeState { - ChangeState(RState), - Completed(Response), - Error(ProtocolError), -} - -#[derive(Debug, PartialEq)] -pub struct RState(ResponseState); -impl Default for RState { - fn default() -> Self { - RState(ResponseState::Initial) - } -} - /* Decoder */ @@ -516,23 +344,6 @@ impl_fstr!( f64 as Float64 ); -#[derive(Debug, PartialEq)] -struct PendingValue { - state: ValueState, - tmp: Option, - stack: Vec<(Vec, ValueStateMeta)>, -} - -impl PendingValue { - fn new( - state: ValueState, - tmp: Option, - stack: Vec<(Vec, ValueStateMeta)>, - ) -> Self { - Self { state, tmp, stack } - } -} - impl<'a> Decoder<'a> { fn parse_list( &mut self, @@ -699,90 +510,6 @@ impl<'a> Decoder<'a> { } } -pub struct ClientHandshake(Box<[u8]>); -impl ClientHandshake { - pub(crate) fn new(cfg: &Config) -> Self { - let mut v = Vec::with_capacity(6 + cfg.username().len() + cfg.password().len() + 5); - v.extend(b"H\x00\x00\x00\x00\x00"); - pushlen!(v, cfg.username().len()); - pushlen!(v, cfg.password().len()); - v.extend(cfg.username().as_bytes()); - v.extend(cfg.password().as_bytes()); - Self(v.into_boxed_slice()) - } - pub(crate) fn inner(&self) -> &[u8] { - &self.0 - } -} - -#[derive(Debug)] -pub enum ServerHandshake { - Okay(u8), - Error(u8), -} -impl ServerHandshake { - pub fn parse(v: [u8; 4]) -> ClientResult { - Ok(match v { - [b'H', 0, 0, msg] => Self::Okay(msg), - [b'H', 0, 1, msg] => Self::Error(msg), - _ => { - return Err(Error::ConnectionSetupErr( - ConnectionSetupError::InvalidServerHandshake, - )) - } - }) - } -} - -#[derive(Debug, PartialEq, Default)] -pub(crate) struct MRespState { - processed: Vec, - pending: Option, - expected: MetaState, -} - -#[derive(Debug, PartialEq)] -pub(crate) enum PipelineResult { - Completed(Vec), - Pending(MRespState), - Error(ProtocolError), -} - -impl MRespState { - fn step(mut self, decoder: &mut Decoder) -> PipelineResult { - match self.expected.finished(decoder) { - Ok(true) => {} - Ok(false) => return PipelineResult::Pending(self), - Err(e) => return PipelineResult::Error(e), - } - loop { - if self.processed.len() as u64 == self.expected.val() { - return PipelineResult::Completed(self.processed); - } - match decoder.validate_response(RState( - self.pending.take().unwrap_or(ResponseState::Initial), - )) { - DecodeState::ChangeState(RState(s)) => { - self.pending = Some(s); - return PipelineResult::Pending(self); - } - DecodeState::Completed(c) => self.processed.push(c), - DecodeState::Error(e) => return PipelineResult::Error(e), - } - } - } -} - -impl<'a> Decoder<'a> { - pub fn validate_pipe(&mut self, first: bool, state: MRespState) -> PipelineResult { - if first && self._cursor_next() != b'P' { - PipelineResult::Error(ProtocolError::InvalidPacket) - } else { - state.step(self) - } - } -} - #[test] fn t_row() { let mut decoder = Decoder::new(b"\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n", 0); @@ -828,36 +555,3 @@ fn t_mrow() { ])) ); } - -#[test] -fn t_pipe() { - let mut decoder = Decoder::new(b"P5\n\x12\x10\xFF\xFF\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0); - assert_eq!( - decoder.validate_pipe(true, MRespState::default()), - PipelineResult::Completed(vec![ - Response::Empty, - Response::Error(u16::MAX), - Response::Row(Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("sayan".into()), - Value::UInt8(20), - Value::List(vec![]) - ])), - Response::Row(Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("elana".into()), - Value::UInt8(21), - Value::List(vec![]) - ])), - Response::Row(Row::new(vec![ - Value::Null, - Value::Bool(true), - Value::String("emily".into()), - Value::UInt8(22), - Value::List(vec![]) - ])) - ]) - ); -} diff --git a/src/protocol/pipe.rs b/src/protocol/pipe.rs new file mode 100644 index 0000000..d0fef31 --- /dev/null +++ b/src/protocol/pipe.rs @@ -0,0 +1,106 @@ +/* + * Copyright 2024, Sayan Nandan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +use { + super::{ + state::{DecodeState, MetaState, RState, ResponseState}, + Decoder, ProtocolError, + }, + crate::response::Response, +}; + +#[derive(Debug, PartialEq, Default)] +pub(crate) struct MRespState { + processed: Vec, + pending: Option, + expected: MetaState, +} + +#[derive(Debug, PartialEq)] +pub(crate) enum PipelineResult { + Completed(Vec), + Pending(MRespState), + Error(ProtocolError), +} + +impl MRespState { + fn step(mut self, decoder: &mut Decoder) -> PipelineResult { + match self.expected.finished(decoder) { + Ok(true) => {} + Ok(false) => return PipelineResult::Pending(self), + Err(e) => return PipelineResult::Error(e), + } + loop { + if self.processed.len() as u64 == self.expected.val() { + return PipelineResult::Completed(self.processed); + } + match decoder.validate_response(RState( + self.pending.take().unwrap_or(ResponseState::Initial), + )) { + DecodeState::ChangeState(RState(s)) => { + self.pending = Some(s); + return PipelineResult::Pending(self); + } + DecodeState::Completed(c) => self.processed.push(c), + DecodeState::Error(e) => return PipelineResult::Error(e), + } + } + } +} + +impl<'a> Decoder<'a> { + pub fn validate_pipe(&mut self, first: bool, state: MRespState) -> PipelineResult { + if first && self._cursor_next() != b'P' { + PipelineResult::Error(ProtocolError::InvalidPacket) + } else { + state.step(self) + } + } +} + +#[test] +fn t_pipe() { + use crate::response::{Response, Row, Value}; + let mut decoder = Decoder::new(b"P5\n\x12\x10\xFF\xFF\x115\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x115\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0); + assert_eq!( + decoder.validate_pipe(true, MRespState::default()), + PipelineResult::Completed(vec![ + Response::Empty, + Response::Error(u16::MAX), + Response::Row(Row::new(vec![ + Value::Null, + Value::Bool(true), + Value::String("sayan".into()), + Value::UInt8(20), + Value::List(vec![]) + ])), + Response::Row(Row::new(vec![ + Value::Null, + Value::Bool(true), + Value::String("elana".into()), + Value::UInt8(21), + Value::List(vec![]) + ])), + Response::Row(Row::new(vec![ + Value::Null, + Value::Bool(true), + Value::String("emily".into()), + Value::UInt8(22), + Value::List(vec![]) + ])) + ]) + ); +} diff --git a/src/protocol/state.rs b/src/protocol/state.rs new file mode 100644 index 0000000..3e5753d --- /dev/null +++ b/src/protocol/state.rs @@ -0,0 +1,254 @@ +/* + * Copyright 2024, Sayan Nandan + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +use { + super::{Decoder, ProtocolError, ProtocolResult}, + crate::response::{Response, Row, Value}, +}; + +pub type ValueDecodeStateRaw = ValueDecodeStateAny; +pub type ValueDecodeState = ValueDecodeStateAny; + +/* + pending value + --- + a stack is useful for recursive types +*/ + +#[derive(Debug, PartialEq)] +pub struct PendingValue { + pub(super) state: ValueState, + pub(super) tmp: Option, + pub(super) stack: Vec<(Vec, ValueStateMeta)>, +} + +impl PendingValue { + pub fn new( + state: ValueState, + tmp: Option, + stack: Vec<(Vec, ValueStateMeta)>, + ) -> Self { + Self { state, tmp, stack } + } +} + +/* + value state +*/ + +#[derive(Debug, PartialEq)] +pub enum ValueDecodeStateAny { + Pending(P), + Decoded(V), +} + +#[derive(Debug, PartialEq)] +pub struct ValueState { + pub(super) v: Value, + pub(super) meta: ValueStateMeta, +} + +impl ValueState { + pub fn new(v: Value, meta: ValueStateMeta) -> Self { + Self { v, meta } + } +} + +#[derive(Debug, PartialEq)] +pub struct ValueStateMeta { + pub(super) start: usize, + pub(super) md: MetaState, +} + +impl ValueStateMeta { + pub fn zero() -> Self { + Self { + start: 0, + md: MetaState::default(), + } + } + pub fn new(start: usize, md1: u64, md1_flag: bool) -> Self { + Self { + start, + md: MetaState::new(md1_flag, md1), + } + } +} + +/* + metadata init state +*/ + +#[derive(Debug, Default, PartialEq)] +pub struct MetaState { + completed: bool, + val: u64, +} + +impl MetaState { + pub fn new(completed: bool, val: u64) -> Self { + Self { completed, val } + } + #[inline(always)] + pub fn finished(&mut self, decoder: &mut Decoder) -> ProtocolResult { + self.finish_or_continue(decoder, || Ok(true), || Ok(false), |e| Err(e)) + } + #[inline(always)] + pub fn finish_or_continue( + &mut self, + decoder: &mut Decoder, + if_completed: impl FnOnce() -> T, + if_pending: impl FnOnce() -> T, + if_err: impl FnOnce(ProtocolError) -> T, + ) -> T { + Self::try_finish_or_continue( + self.completed, + &mut self.val, + decoder, + if_completed, + if_pending, + if_err, + ) + } + #[inline(always)] + pub fn try_finish( + decoder: &mut Decoder, + completed: bool, + val: &mut u64, + ) -> ProtocolResult { + Self::try_finish_or_continue( + completed, + val, + decoder, + || Ok(true), + || Ok(false), + |e| Err(e), + ) + } + #[inline(always)] + pub fn try_finish_or_continue( + completed: bool, + val: &mut u64, + decoder: &mut Decoder, + if_completed: impl FnOnce() -> T, + if_pending: impl FnOnce() -> T, + if_err: impl FnOnce(ProtocolError) -> T, + ) -> T { + if completed { + if_completed() + } else { + match decoder.__resume_decode(*val, ValueStateMeta::zero()) { + Ok(vs) => match vs { + ValueDecodeStateAny::Pending(ValueState { v, .. }) => { + *val = v.u64(); + if_pending() + } + ValueDecodeStateAny::Decoded(v) => { + *val = v.u64(); + if_completed() + } + }, + Err(e) => if_err(e), + } + } + } + #[inline(always)] + pub fn val(&self) -> u64 { + self.val + } +} + +/* + row state +*/ + +#[derive(Debug, PartialEq)] +pub struct RowState { + pub(super) meta: ValueStateMeta, + pub(super) row: Vec, + pub(super) tmp: Option, +} + +impl RowState { + pub fn new(meta: ValueStateMeta, row: Vec, tmp: Option) -> Self { + Self { meta, row, tmp } + } +} + +/* + multi row state +*/ + +#[derive(Debug, PartialEq)] +pub struct MultiRowState { + pub(super) c_row: Option, + pub(super) rows: Vec, + pub(super) md_state: u8, + pub(super) md1_target: u64, + pub(super) md2_col_cnt: u64, +} + +impl Default for MultiRowState { + fn default() -> Self { + Self::new(None, vec![], 0, 0, 0) + } +} + +impl MultiRowState { + pub fn new( + c_row: Option, + rows: Vec, + md_s: u8, + md_cnt: u64, + md_target: u64, + ) -> Self { + Self { + c_row, + rows, + md_state: md_s, + md1_target: md_target, + md2_col_cnt: md_cnt, + } + } +} + +/* + response state +*/ + +#[derive(Debug, PartialEq)] +pub enum ResponseState { + Initial, + PValue(PendingValue), + PError, + PRow(RowState), + PMultiRow(MultiRowState), +} + +#[derive(Debug, PartialEq)] +pub enum DecodeState { + ChangeState(RState), + Completed(Response), + Error(ProtocolError), +} + +#[derive(Debug, PartialEq)] +pub struct RState(pub(super) ResponseState); +impl Default for RState { + fn default() -> Self { + RState(ResponseState::Initial) + } +}