Skip to content

Commit

Permalink
Add tests for heya, get, set and update queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ohsayan committed Sep 9, 2020
1 parent 096a0c9 commit cab2ce7
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 37 deletions.
19 changes: 19 additions & 0 deletions examples/config-files/template.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# This is a complete TDB configuration template which is always kept updated
# to include all the configuration options. I encourage you to always use this
# when you use a configuration file
# Instead of deleting entire sections from this file, comment them out, so that you
# now what you've kept enabled and what you've kept disabled. This helps avoid
# configuration problems during production

# This is a *REQUIRED* key
[server]
host = "127.0.0.1" # The IP address to which you want TDB to bind to
port = 2003 # The port to which you want TDB to bind to
# Set `noart` to true if you want to disable terminal artwork
noart = false

# This key is *REQUIRED*
[bgsave]
# Run `BGSAVE` `every` seconds. For example, setting this to 60 will cause BGSAVE to run
# every 1 minute
every = 120
7 changes: 6 additions & 1 deletion libtdb/src/terrapipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,14 @@ impl RespCodes {
}
}

pub fn proc_query(querystr: String) -> Vec<u8> {
/// Prepare a query packet from a string of whitespace separated values
pub fn proc_query<T>(querystr: T) -> Vec<u8>
where
T: AsRef<str>,
{
// TODO(@ohsayan): Enable "" to be escaped
// let args: Vec<&str> = RE.find_iter(&querystr).map(|val| val.as_str()).collect();
let querystr = querystr.as_ref().to_owned();
let args: Vec<&str> = querystr.split_whitespace().collect();
let mut bytes = Vec::with_capacity(querystr.len());
bytes.extend(b"#2\n*1\n#");
Expand Down
7 changes: 3 additions & 4 deletions server/src/coredb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,10 @@ impl CoreDB {
Ok(())
}

#[cfg(test)]
/// **⚠⚠⚠ This deletes everything stored in the in-memory table**
pub fn finish_db(self, areyousure: bool, areyouverysure: bool, areyousupersure: bool) {
if areyousure && areyouverysure && areyousupersure {
self.acquire_write().coremap.clear()
}
pub fn finish_db(&self) {
self.acquire_write().coremap.clear()
}
}

Expand Down
31 changes: 31 additions & 0 deletions server/src/dbnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,34 @@ pub async fn run(listener: TcpListener, sig: impl Future) {
let _ = terminate_rx.recv().await;
terminal::write_info("Goodbye :)\n").unwrap();
}

/// This is a **test only** function
/// This takes a `CoreDB` object so that keys can be modified externally by
/// the testing suite. This will **not save any data to disk**!
/// > **This is not for release builds in any way!**
#[cfg(test)]
pub async fn test_run(listener: TcpListener, db: CoreDB, sig: impl Future) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);
let mut server = Listener {
listener,
db,
climit: Arc::new(Semaphore::new(50000)),
signal,
terminate_tx,
terminate_rx,
};
tokio::select! {
_ = server.run() => {}
_ = sig => {}
}
let Listener {
mut terminate_rx,
terminate_tx,
signal,
..
} = server;
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}
20 changes: 13 additions & 7 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,26 @@ async fn check_args_or_connect() -> TcpListener {
if cfg.is_artful() {
println!("{}\n{}", TEXT, MSG);
} else {
terminal::write_info("TerrabaseDB v0.4.1 | Protocol: Terrapipe 1.0").unwrap();
println!("{}", MSG);
}
log::info!("info: Using settings from config file");
TcpListener::bind(cfg.get_host_port_tuple()).await.unwrap()
log::info!("Using settings from config file");
TcpListener::bind(cfg.get_host_port_tuple()).await
}
Ok(config::ConfigType::Def(cfg)) => {
println!("{}\n{}", TEXT, MSG);
log::info!("info: No configuration file supplied. Using default settings");
TcpListener::bind(cfg.get_host_port_tuple()).await.unwrap()
log::warn!("No configuration file supplied. Using default settings");
TcpListener::bind(cfg.get_host_port_tuple()).await
}
Err(e) => {
terminal::write_error(e).unwrap();
log::error!("{}", e);
std::process::exit(0x100);
}
};
binding
match binding {
Ok(b) => b,
Err(e) => {
log::error!("Failed to bind to socket with error: '{}'", e);
std::process::exit(0x100);
}
}
}
162 changes: 137 additions & 25 deletions server/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,59 +21,171 @@

//! This module contains automated tests for queries

use crate::coredb::CoreDB;
use crate::dbnet;
use crate::protocol::responses::fresp;
use libtdb::terrapipe;
use std::future::Future;
use std::net::{Shutdown, SocketAddr};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;

static ADDR: &'static str = "127.0.0.1:2003";

/// Start the server as a background asynchronous task
async fn start_server() -> Option<SocketAddr> {
async fn start_server() -> (Option<SocketAddr>, CoreDB) {
// HACK(@ohsayan): Since we want to start the server if it is not already
// running, or use it if it is already running, we just return none if we failed
// to bind to the port, since this will _almost_ never happen on our CI
let listener = TcpListener::bind(ADDR).await.unwrap();
let db = CoreDB::new().unwrap();
let asyncdb = db.clone();
let addr = if let Ok(addr) = listener.local_addr() {
Some(addr)
} else {
None
};
tokio::spawn(async move { dbnet::run(listener, tokio::signal::ctrl_c()).await });
addr
tokio::spawn(async move { dbnet::test_run(listener, asyncdb, tokio::signal::ctrl_c()).await });
(addr, db)
}

struct QueryVec<'a> {
streams: Vec<TcpStream>,
db: &'a CoreDB,
}
impl<'a> QueryVec<'a> {
pub fn new<'b>(db: &'b CoreDB) -> Self
where
'b: 'a,
{
QueryVec {
streams: Vec::new(),
db,
}
}
pub async fn add<F, Fut>(&mut self, function: F)
where
F: FnOnce(TcpStream) -> Fut,
Fut: Future<Output = TcpStream>,
{
self.db.finish_db();
let stream = TcpStream::connect(ADDR).await.unwrap();
self.streams.push(function(stream).await);
}
pub fn run_queries_and_close_sockets(self) {
for socket in self.streams.into_iter() {
socket.shutdown(Shutdown::Both).unwrap();
}
self.db.finish_db();
}
}

#[tokio::test]
#[cfg(test)]
async fn test_queries() {
let server = start_server().await;
let mut stream = TcpStream::connect(ADDR).await.unwrap();
stream
.write_all(b"#2\n*1\n#2\n&1\n#4\nHEYA\n")
.await
.unwrap();
// Start the server
let (server, db) = start_server().await;
let mut queries = QueryVec::new(&db);
queries.add(test_heya).await;
queries.add(test_get_single_nil).await;
queries.add(test_get_single_okay).await;
queries.add(test_set_single_okay).await;
queries.add(test_set_single_overwrite_error).await;
queries.add(test_update_single_okay).await;
queries.add(test_update_single_nil).await;
queries.run_queries_and_close_sockets();

// Clean up everything else
drop(server);
drop(db);
}

#[cfg(test)]
/// Test a HEYA query: The server should return HEY!
async fn test_heya(mut stream: TcpStream) -> TcpStream {
let heya = terrapipe::proc_query("HEYA");
stream.write_all(&heya).await.unwrap();
let res_should_be = "#2\n*1\n#2\n&1\n+4\nHEY!\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(response.to_vec(), res_should_be, "HEYA FAILED!");
// Test single nil value
assert_eq!(response.to_vec(), res_should_be, "HEYA failed");
stream
.write_all(b"#2\n*1\n#2\n&2\n#3\nGET\n#1\nx\n")
.await
.unwrap();
let res_should_be = "#2\n*1\n#2\n&1\n!1\n1\n".as_bytes().to_owned();
}

#[cfg(test)]
/// Test a GET query: for a non-existing key
async fn test_get_single_nil(mut stream: TcpStream) -> TcpStream {
let get_single_nil = terrapipe::proc_query("GET x");
stream.write_all(&get_single_nil).await.unwrap();
let mut response = vec![0; fresp::R_NIL.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(
response.to_vec(),
fresp::R_NIL.to_owned(),
"GET SINGLE NIL failed"
);
stream
}

#[cfg(test)]
/// Test a GET query: for an existing key
async fn test_get_single_okay(stream: TcpStream) -> TcpStream {
let mut stream = test_set_single_okay(stream).await;
let get_single_nil = terrapipe::proc_query("GET x");
stream.write_all(&get_single_nil).await.unwrap();
let res_should_be = "#2\n*1\n#2\n&1\n+3\n100\n".as_bytes().to_owned();
let mut response = vec![0; res_should_be.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(response, res_should_be, "SINGLE NIL FAILED!");
assert_eq!(response.to_vec(), res_should_be, "GET SINGLE NIL failed");
stream
}

#[cfg(test)]
/// Test a SET query: SET a non-existing key, which should return code: 0
async fn test_set_single_okay(mut stream: TcpStream) -> TcpStream {
let set_single_okay = terrapipe::proc_query("SET x 100");
stream.write_all(&set_single_okay).await.unwrap();
let mut response = vec![0; fresp::R_OKAY.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(
response.to_vec(),
fresp::R_OKAY.to_owned(),
"SET SINGLE OKAY failed"
);
stream
}

// Test multiple nil
#[cfg(test)]
/// Test a SET query: SET an existing key, which should return code: 2
async fn test_set_single_overwrite_error(stream: TcpStream) -> TcpStream {
let mut stream = test_set_single_okay(stream).await;
let set_single_code_2 = terrapipe::proc_query("SET x 200");
stream.write_all(&set_single_code_2).await.unwrap();
let mut response = vec![0; fresp::R_OVERWRITE_ERR.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(response.to_vec(), fresp::R_OVERWRITE_ERR.to_owned());
stream
.write_all(b"#2\n*1\n#2\n&3\n#4\nMGET\n#1\nx\n#2\nex\n")
.await
.unwrap();
let res_should_be = b"#2\n*1\n#2\n&2\n!1\n1\n!1\n1";
let mut response = vec![0; res_should_be.len()];
}

#[cfg(test)]
/// Test an UPDATE query: which should return code: 0
async fn test_update_single_okay(stream: TcpStream) -> TcpStream {
let mut stream = test_set_single_okay(stream).await;
let update_single_okay = terrapipe::proc_query("UPDATE x 200");
stream.write_all(&update_single_okay).await.unwrap();
let mut response = vec![0; fresp::R_OKAY.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(response, res_should_be.to_vec(), "MULTIPLE NIL FAILED!");
// Stop the server
stream.shutdown(Shutdown::Write).unwrap();
assert_eq!(response.to_vec(), fresp::R_OKAY.to_owned());
stream
}

#[cfg(test)]
/// Test an UPDATE query: which should return code: 1
async fn test_update_single_nil(mut stream: TcpStream) -> TcpStream {
let update_single_okay = terrapipe::proc_query("UPDATE x 200");
stream.write_all(&update_single_okay).await.unwrap();
let mut response = vec![0; fresp::R_NIL.len()];
stream.read_exact(&mut response).await.unwrap();
assert_eq!(response, fresp::R_NIL.to_owned());
stream
}

0 comments on commit cab2ce7

Please sign in to comment.