From 62c955ffd5fd089923c6976ec5acfef59eeafb2a Mon Sep 17 00:00:00 2001 From: jrconlin Date: Mon, 19 Mar 2018 08:38:29 -0700 Subject: [PATCH] Initial Version This is a rust implementation of PushBox and is a work in progress. --- Cargo.toml | 39 +- README.md | 46 ++ Rocket.toml | 11 + .../down.sql | 2 + .../up.sql | 10 + setup_diesel.sh | 1 + src/auth.rs | 26 - src/auth/mod.rs | 445 ++++++++++++++ src/config.rs | 51 ++ src/db.rs | 51 -- src/db/mod.rs | 53 ++ src/db/models.rs | 167 ++++++ src/db/schema.rs | 10 + src/error.rs | 116 ++++ src/logging.rs | 34 ++ src/main.rs | 45 +- src/server.rs | 548 ++++++++++++++++++ src/store.rs | 48 -- 18 files changed, 1553 insertions(+), 150 deletions(-) create mode 100644 README.md create mode 100644 Rocket.toml create mode 100644 migrations/2018-03-28-211330_create_users_table/down.sql create mode 100644 migrations/2018-03-28-211330_create_users_table/up.sql create mode 100644 setup_diesel.sh delete mode 100644 src/auth.rs create mode 100644 src/auth/mod.rs create mode 100644 src/config.rs delete mode 100644 src/db.rs create mode 100644 src/db/mod.rs create mode 100644 src/db/models.rs create mode 100644 src/db/schema.rs create mode 100644 src/error.rs create mode 100644 src/logging.rs create mode 100644 src/server.rs delete mode 100644 src/store.rs diff --git a/Cargo.toml b/Cargo.toml index ee647e5..0b52a8e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,21 +1,36 @@ +# Remember to run `cargo update` to ensure that versions are updated. +# `cargo clean` does not change the Cargo.lock file. [package] name = "rustbox" version = "0.1.0" authors = ["jrconlin "] [dependencies] -rocket="0.3" -rocket_codegen="0.3" -serde="1.0" -serde_json="1.0" -serde_derive="1.0" -mysql="12.2" -reqwest="0.8" # for HTTP Client calls. - +failure="~0.1" +mysql="~12.2" +rand="~0.4" +reqwest="^0.8.5" +rocket="~0.3" +rocket_codegen="~0.3" +serde="~1.0" +serde_derive="~1.0" +serde_json="~1.0" +slog="~2.2" +slog-async="~2.2" +slog-term="~2.3" +slog-json="~2.2" +slog-scope="~4.0" +slog-stdlog="~3.0" +time="~0.1" [dependencies.rocket_contrib] -default-features=false +version="~0.3" +default-features=true + +[dependencies.diesel] +version="~1.1" +features=["mysql", "r2d2"] -[dependencies.mysql] -version="*" -#features=["ssl"] +[dependencies.diesel_migrations] +version="~1.1" +features=["mysql"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2b19f70 --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# Rustbox - A rust implementation of PushBox long term storage + +## What is it? + +This is an internal project. mozilla needs the ability to store large data +chunks that may not fit perfectly within a standard WebPush message. PushBox +acts as an intermediary store for those chunks. + +Messages are created by Firefox Accounts (FxA), stored here, and then a +WebPush message containing a URL that points back to this storage is sent +to the User Agent. + +The User Agent can then fetch the data, decrypt it and do whatever it needs +to. + +This project, once completed, will eventually replace the AWS Severless +PushBox project. It's being developed here because serverless can be a bit +greedy about what it grabs, and since PushBox is a rapid prototype, it's +good to treat it in a clean room environment. + + +## Requirements + +The project requires Rust Nightly, a MySQL compliant data store, and +access to a [Firefox Accounts token verifier](https://github.com/mozilla/fxa-auth-server) system. + + +## Setting Up + +1) Install Rust Nightly. + +The rocket.rs [Getting Started](https://rocket.rs/guide/getting-started/) +document lists information on how to set that up. + +2) create the Rustbox MySQL user and database. + +Because I'm horribly creative and because this is a WIP, I use "`test:test@localhost/pushbox`". +This is not recommended for production use. You can set your preferred +MySQL access credential information as "database_url" in the `Rocket.toml` +settings file (See [Rocket Config](https://rocket.rs/guide/configuration/#rockettoml) +information.) + +3) Run the MySQL migrations `up.sql` file located in `./migrations/*/up.sql` + +4) Run `cargo run` to start the application, which (depending on the last commit) may actually start the program. YMMV. + diff --git a/Rocket.toml b/Rocket.toml new file mode 100644 index 0000000..ce94de1 --- /dev/null +++ b/Rocket.toml @@ -0,0 +1,11 @@ +[development] +database_url="mysql://test:test@localhost/pushbox" +fxa_host="oauth.stage.mozaws.net" +dryrun=true + +[staging] + +[production] + +[global.limits] +json = 1048576 diff --git a/migrations/2018-03-28-211330_create_users_table/down.sql b/migrations/2018-03-28-211330_create_users_table/down.sql new file mode 100644 index 0000000..ebc6286 --- /dev/null +++ b/migrations/2018-03-28-211330_create_users_table/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +Drop Table pushboxv1; diff --git a/migrations/2018-03-28-211330_create_users_table/up.sql b/migrations/2018-03-28-211330_create_users_table/up.sql new file mode 100644 index 0000000..336625b --- /dev/null +++ b/migrations/2018-03-28-211330_create_users_table/up.sql @@ -0,0 +1,10 @@ +CREATE TABLE pushboxv1 ( + user_id Varchar(200) Not Null, + device_id Varchar(200), + data Blob, + idx BigInt Auto_Increment, + ttl BigInt, + Primary Key(idx) +); +Create Index user_id_idx on pushboxv1 (user_id); +Create Index full_idx on pushboxv1 (user_id, device_id); diff --git a/setup_diesel.sh b/setup_diesel.sh new file mode 100644 index 0000000..7e59815 --- /dev/null +++ b/setup_diesel.sh @@ -0,0 +1 @@ +*REMEMBER* you need to hand edit the up.sql and down.sql files. diff --git a/src/auth.rs b/src/auth.rs deleted file mode 100644 index 73f9d1e..0000000 --- a/src/auth.rs +++ /dev/null @@ -1,26 +0,0 @@ -use std::collections::HashMap; -use serde::json; - -struct Validate { - fxa_url: String, // FxA Verifier URL - token: String, // Authorization Header token -} - -impl Validate { - fn new(auth_header:String, config: HashMap) -> Validate { - Validate { - fxa_url: "", - token: "" - } - } - - fn verify(auth_header:String) -> bool { - /// Verify a given action ("read", "write") - let mut splitter = auth_header.splitn(1, " "); - let schema = splitter.next().expect("Missing schema"); - let token = splitter.next().expect("Missing token"); - // Get the scopes from the verify server. - - return false - } -} diff --git a/src/auth/mod.rs b/src/auth/mod.rs new file mode 100644 index 0000000..0aa1e8c --- /dev/null +++ b/src/auth/mod.rs @@ -0,0 +1,445 @@ +use std::collections::HashMap; +use std::time::Duration; + +use reqwest; +use rocket::Outcome::{Failure, Success}; +use rocket::request::{self, FromRequest}; +use rocket::{Request, State}; + +use error::{HandlerError, HandlerErrorKind, VALIDATION_FAILED}; + +use config::ServerConfig; +use logging::RBLogger; + +/// Fetch FxA scopes for a requests Authentication header. +/// +/// The `Authentication` header should contain the FxA token as +/// `bearer `. This will attempt to query the FxA Authentication +/// server and will return the valid set of scopes. Due to various limitations, +/// your app will need to check the returned scopes for fine grain permissions. +/// +#[derive(Debug)] +pub struct FxAAuthenticator { + pub auth_type: AuthType, + pub scope: Vec, +} + +#[derive(Debug)] +pub enum AuthType { + FxAOauth, // Authorization: (Bearer | FxA-Oauth-Token) + FxAServer, // Authorization: (FxA-Server-Key) +} + +#[derive(Clone, Deserialize, Debug)] +pub struct FxAResp { + user: String, + client_id: String, + scope: Vec, +} + +impl FxAAuthenticator { + pub fn fxa_root(app: &str) -> String { + /// template for the FxA root. + /// + /// Rust requires that the first argument for `format!()` is a string literal, + /// thus this is a function. + format!("https://identity.mozilla.com/apps/{}/", app) + } + + /// parse the request header and extract the Authorization header, send to + /// FxA auth, and return the scope array. + /// + /// For `dryrun` settings: + /// this will pull `auth_app_name` from the managed memory configuration + /// object to spoof a response from FxA. + /// + /// For `test` configurations: + /// this will pull a HashTable from the managed memory configuration object + /// called '*fxa_response*` which contains the spoofed return data for + /// Thus currently pulls a managed memory configuration object that contains + /// an `.auth_app_name` String for `dryrun` and `test` configurations. + fn as_fxa_oauth( + token: String, + config: &ServerConfig, + logger: &RBLogger, + ) -> request::Outcome { + // Get the scopes from the verify server. + let fxa_host = &config.fxa_host; + let fxa_url = Self::fxa_root(fxa_host); + let mut body = HashMap::new(); + body.insert("token", token); + if &config.dryrun == &true { + slog_debug!(logger.log, "Dryrun, skipping auth"); + return Success(FxAAuthenticator { + auth_type: AuthType::FxAOauth, + scope: vec![Self::fxa_root(&config.auth_app_name)], + }); + } + let client = match reqwest::Client::builder() + .gzip(true) + .timeout(Duration::from_secs(3)) + .build() + { + Ok(client) => client, + Err(err) => { + slog_crit!(logger.log, "Reqwest failure"; "err" => format!("{:?}",err)); + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized(format!("Client error {:?}", err)).into(), + )); + } + }; + let resp: FxAResp = if cfg!(test) { + /* + Sadly, there doesn't seem to be a good way to do this. We can't add a trait for mocking + this because the FromRequest trait doesn't allow additional methods, we can't dummy + out the reqwest call, the only thing we can modify and access is the config info. + fortunately, the following are mostly boilerplate for calling out to the FxA server. + */ + let data = config + .test_data + .get("fxa_response") + .expect("Could not parse test fxa_response"); + let mut fscopes: Vec = Vec::new(); + for scope in data["scope"].as_array().expect("Invalid scope array") { + fscopes.push( + scope + .as_str() + .expect("Missing valid scope for test") + .to_string(), + ); + } + FxAResp { + user: data["user"] + .as_str() + .expect("Missing user info for test") + .to_string(), + client_id: data["client_id"] + .as_str() + .expect("Missing client_id for test") + .to_string(), + scope: fscopes, + } + } else { + // get the FxA Validiator response. + let mut raw_resp = match client.post(&fxa_url).json(&body).send() { + Ok(response) => response, + Err(err) => { + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized(format!("Pushbox Server Error: {:?}", err)) + .into(), + )) + } + }; + if raw_resp.status().is_success() == false { + // Log validation fail + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized("Missing Authorization Header".to_string()) + .into(), + )); + }; + match raw_resp.json() { + Ok(val) => val, + Err(e) => { + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized(format!("FxA Server error: {:?}", e)).into(), + )) + } + } + }; + return Success(FxAAuthenticator { + auth_type: AuthType::FxAOauth, + scope: resp.scope.clone(), + }); + } + + /// Minimal handshake security + fn as_server_token( + token: String, + config: &ServerConfig, + ) -> request::Outcome { + match config.server_token == Some(token) { + true => Success(FxAAuthenticator { + auth_type: AuthType::FxAServer, + scope: Vec::new(), + }), + false => Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized("Invalid Authorization token".to_string()).into(), + )), + } + } +} + +impl<'a, 'r> FromRequest<'a, 'r> for FxAAuthenticator { + type Error = HandlerError; + + /// Process the Authorization header and return the token. + fn from_request(request: &'a Request<'r>) -> request::Outcome { + let logger = request + .guard::>() + .expect("Logger missing") + .inner(); + if let Some(auth_header) = request.headers().get_one("Authorization") { + // Get a copy of the rocket config from the request's managed memory. + // There is no other way to get the rocket.config() from inside a request + // handler. + let config = request + .guard::>() + .expect("Application missing config") + .inner(); + let auth_bits: Vec<&str> = auth_header.splitn(2, " ").collect(); + slog_info!(logger.log, "Checking auth token"); + if auth_bits.len() != 2 { + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized( + "Incorrect Authorization Header Token".to_string(), + ).into(), + )); + }; + match auth_bits[0].to_lowercase().as_str() { + "bearer" | "fxa-oauth-token" => { + slog_info!(logger.log, "Found Oauth token"); + return Self::as_fxa_oauth(auth_bits[1].into(), config, logger); + } + "fxa-server-key" => return Self::as_server_token(auth_bits[1].into(), config), + _ => { + slog_info!(logger.log, "Found Server token"); + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized( + "Incorrect Authorization Header Schema".to_string(), + ).into(), + )); + } + } + } else { + // No Authorization header + slog_info!(logger.log, "No Authorization Header found"); + return Failure(( + VALIDATION_FAILED, + HandlerErrorKind::Unauthorized("Missing Authorization Header".to_string()).into(), + )); + } + } +} + +#[cfg(test)] +mod test { + // cargo test -- --no-capture + + use rocket; + use rocket::config::{Config, Environment, RocketConfig, Table}; + use rocket::fairing::AdHoc; + use rocket::http::Header; + use rocket::local::Client; + use rocket_contrib::json::Json; + + use super::FxAAuthenticator; + use config::ServerConfig; + use error::HandlerResult; + use logging::RBLogger; + + struct StubServer {} + impl StubServer { + pub fn start(rocket: rocket::Rocket) -> HandlerResult { + Ok(rocket + .attach(AdHoc::on_attach(|rocket| { + // Copy the config into a state manager. + let rbconfig = ServerConfig::new(rocket.config()); + let logger = RBLogger::new(rocket.config()); + Ok(rocket.manage(rbconfig).manage(logger)) + })) + .mount("", routes![auth_test_read_stub, auth_test_write_stub])) + } + } + + // The following stub function is used for testing only. + #[get("/test/")] + fn auth_test_read_stub( + token: HandlerResult, + device_id: String, + ) -> HandlerResult { + Ok(Json(json!({ + "status": 200, + "scope": token?.scope, + "device_id": device_id + }))) + } + + // The following stub function is used for testing only. + #[post("/test/")] + fn auth_test_write_stub( + token: HandlerResult, + device_id: String, + ) -> HandlerResult { + Ok(Json(json!({ + "status": 200, + "scope": token?.scope, + "device_id": device_id + }))) + } + + fn rocket_config(test_data: Table) -> Config { + let rconfig = RocketConfig::read().expect("failed to read config"); + let fxa_host = rconfig + .active() + .get_str("fxa_host") + .unwrap_or("oauth.stage.mozaws.net"); + + let config = Config::build(Environment::Development) + .extra("fxa_host", fxa_host) + .extra("dryrun", false) + .extra("auth_app_name", "test") + .extra("test_data", test_data) + .finalize() + .unwrap(); + config + } + + fn rocket_client(config: Config) -> Client { + let test_rocket = + StubServer::start(rocket::custom(config, true)).expect("test rocket failed"); + Client::new(test_rocket).expect("test rocket launch failed") + } + + #[test] + fn test_valid() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![FxAAuthenticator::fxa_root("test")].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + + test_data.insert("auth_only".into(), true.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Authorization", "bearer tokentoken")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#.to_string()) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(200)); + } + + #[test] + fn test_no_auth() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![format!("{}send/bar", FxAAuthenticator::fxa_root("test"))].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(401)) + } + + #[test] + fn test_bad_auth_schema() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![FxAAuthenticator::fxa_root("test")].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + + test_data.insert("auth_only".into(), true.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Authorization", "invalid tokentoken")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#.to_string()) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(401)) + } + + #[test] + fn test_bad_auth_no_schema() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![FxAAuthenticator::fxa_root("test")].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + + test_data.insert("auth_only".into(), true.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Authorization", "invalid")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#.to_string()) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(401)) + } + + #[test] + fn test_bad_auth_no_token() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![FxAAuthenticator::fxa_root("test")].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + + test_data.insert("auth_only".into(), true.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Authorization", "bearer")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#.to_string()) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(401)) + } + + #[test] + fn test_bad_auth_blank() { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![FxAAuthenticator::fxa_root("test")].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + + test_data.insert("auth_only".into(), true.into()); + let client = rocket_client(rocket_config(test_data)); + let result = client + .post("/test/test") + .header(Header::new("Authorization", "")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 123, "data": "Some Data"}"#.to_string()) + .dispatch(); + assert!(result.status() == rocket::http::Status::raw(401)) + } + +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..4418417 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,51 @@ +use rocket::config::{Config, Table}; +use rocket::request::{self, FromRequest}; +use rocket::{Outcome, Request, State}; + +// Due to some private variables, this must be defined in the same module as rocket.manage() +#[derive(Debug, Clone)] +pub struct ServerConfig { + // Authorization Configuration block + pub fxa_host: String, + pub dryrun: bool, + pub default_ttl: u64, + pub auth_app_name: String, + pub test_data: Table, + pub server_token: Option, +} + +// Helper functions to pull values from the private config. +impl ServerConfig { + pub fn new(config: &Config) -> ServerConfig { + // Transcode rust Config values + ServerConfig { + fxa_host: String::from( + config + .get_str("fxa_host") + .unwrap_or("oauth.stage.mozaws.net"), + ), + dryrun: config.get_bool("dryrun").unwrap_or(false), + default_ttl: config.get_float("default_ttl").unwrap_or(3600.0) as u64, + auth_app_name: config + .get_str("auth_app_name") + .unwrap_or("pushbox") + .replace(" ", ""), + server_token: match config.get_str("server_token") { + Ok(token) => Some(token.to_string()), + Err(_) => None, + }, + test_data: config + .get_table("test_data") + .unwrap_or(&Table::new()) + .clone(), + } + } +} + +impl<'a, 'r> FromRequest<'a, 'r> for ServerConfig { + type Error = (); + + fn from_request(req: &'a Request<'r>) -> request::Outcome { + Outcome::Success(req.guard::>().unwrap().inner().clone()) + } +} diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index 69d8e4f..0000000 --- a/src/db.rs +++ /dev/null @@ -1,51 +0,0 @@ -/// Database functions - -use std::collections::HashMap; -use mysql; - -struct Record { - fxa_uid: String, - device_id: String, - service: String, - index: u64, - ttl: u64, - store_ref: String, - store_size: u64, -} - -struct Database { - pool:mysql::Pool -} - -impl Database { - fn new(config: HashMap) -> Database { - // build the table if not already present. - let pool = mysql::Pool::new(config.get("dsn")).expect("Could not initialize data pool"); - pool.prep_exec( - r"Create table if not exists :table_name ( - fxa_uid text not null, - device_id text not null, - service text not null, - ttl int not null, - store_ref text not null, - store_size int)", - params!{"table_name"=>config.get("db_tablename")} - ).expect("Could not create table"); - - Database{ - pool - } - } - - fn store(record:Record) -> Result{ - /// Store Record into the database - - return 0 - } - - fn fetch(fxa_uid:String, device_id:String, service:String, limit:u8, start_at: u64) -> Result { - /// Fetch a Record from the database - return Record{} - } - -} diff --git a/src/db/mod.rs b/src/db/mod.rs new file mode 100644 index 0000000..9d0518a --- /dev/null +++ b/src/db/mod.rs @@ -0,0 +1,53 @@ +pub mod models; +pub mod schema; + +use std::ops::Deref; + +use diesel::mysql::MysqlConnection; +use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; +use failure::err_msg; + +use rocket::http::Status; +use rocket::request::{self, FromRequest}; +use rocket::{Config, Outcome, Request, State}; + +use error::Result; + +pub type MysqlPool = Pool>; + +pub fn pool_from_config(config: &Config) -> Result { + let database_url = config + .get_str("database_url") + .map_err(|_| err_msg("ROCKET_DATABASE_URL undefined"))? + .to_string(); + let max_size = config.get_int("database_pool_max_size").unwrap_or(10) as u32; + let manager = ConnectionManager::::new(database_url); + let pman = Pool::builder() + .max_size(max_size) + .build(manager) + .expect("Could not build pool"); + Ok(pman) +} + +pub struct Conn(pub PooledConnection>); + +impl Deref for Conn { + type Target = MysqlConnection; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, 'r> FromRequest<'a, 'r> for Conn { + type Error = (); + + fn from_request(request: &'a Request<'r>) -> request::Outcome { + let pool = request.guard::>()?; + match pool.get() { + Ok(conn) => Outcome::Success(Conn(conn)), + Err(_) => Outcome::Failure((Status::ServiceUnavailable, ())), + } + } +} diff --git a/src/db/models.rs b/src/db/models.rs new file mode 100644 index 0000000..0d9fad7 --- /dev/null +++ b/src/db/models.rs @@ -0,0 +1,167 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use diesel::connection::TransactionManager; +use diesel::mysql::MysqlConnection; +use diesel::{self, insert_into, Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; +use failure::ResultExt; +use serde::ser::{Serialize, SerializeStruct, Serializer}; + +use super::schema::pushboxv1; +use error::{HandlerErrorKind, HandlerResult}; + +#[derive(Debug, Queryable, Insertable)] +#[table_name = "pushboxv1"] +pub struct Record { + pub user_id: String, + pub device_id: String, + pub ttl: i64, // expiration date in UTC. + pub idx: i64, + pub data: Vec, +} + +impl Serialize for Record { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let data = &self.data.clone(); + let mut s = serializer.serialize_struct("Record", 2)?; + let index = &self.idx; + s.serialize_field("index", &(*index as u64))?; + s.serialize_field("data", &String::from_utf8(data.to_vec()).unwrap())?; + s.end() + } +} + +pub fn now_utc() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as u64 +} + +pub fn calc_ttl(seconds: u64) -> u64 { + now_utc() + seconds +} + +/// An authorized broadcaster + +pub struct DatabaseManager {} + +impl DatabaseManager { + pub fn max_index(conn: &MysqlConnection, user_id: &String, device_id: &String) -> u64 { + let mut max_index_sel: Vec = match pushboxv1::table + .select(pushboxv1::idx) + .filter(pushboxv1::user_id.eq(user_id)) + .filter(pushboxv1::device_id.eq(device_id)) + .order(pushboxv1::idx.desc()) + .limit(1) + .load::(conn) + { + Ok(val) => val, + Err(_) => vec![], + }; + max_index_sel.pop().unwrap_or(0) as u64 + } + + pub fn new_record( + conn: &MysqlConnection, + user_id: &String, + device_id: &String, + data: &String, + ttl: u64, + ) -> HandlerResult { + let t_manager = conn.transaction_manager(); + t_manager + .begin_transaction(conn) + .expect("Could not create transaction"); + insert_into(pushboxv1::table) + .values(( + pushboxv1::user_id.eq(user_id), + pushboxv1::device_id.eq(device_id), + pushboxv1::ttl.eq(ttl as i64), + pushboxv1::data.eq(data.clone().into_bytes()), + )) + .execute(conn) + .context(HandlerErrorKind::DBError)?; + let record_index = match pushboxv1::table + .select(pushboxv1::idx) + .order(pushboxv1::idx.desc()) + .limit(1) + .load::(conn) + { + Ok(val) => val[0], + Err(_) => return Err(HandlerErrorKind::DBError.into()), + }; + t_manager + .commit_transaction(conn) + .expect("Could not close transaction"); + + Ok(record_index as u64) + } + + pub fn read_records( + conn: &MysqlConnection, + user_id: &String, + device_id: &String, + index: &Option, + limit: &Option, + ) -> HandlerResult> { + // flatten into HashMap FromIterator<(K, V)> + let mut query = pushboxv1::table + .select(( + pushboxv1::user_id, // NOTE: load() does not order these, so you should + pushboxv1::device_id, // keep them in field order for Record{} + pushboxv1::ttl, + pushboxv1::idx, + pushboxv1::data, + )) + .into_boxed(); + query = query + .filter(pushboxv1::user_id.eq(user_id)) + .filter(pushboxv1::device_id.eq(device_id)) + .filter(pushboxv1::ttl.ge(now_utc() as i64)); + match index { + None => {} + Some(index) => { + query = query.filter(pushboxv1::idx.ge(index.clone() as i64)); + } + }; + match limit { + None => {} + Some(limit) => { + query = query.limit(limit.clone() as i64); + } + } + Ok(query + .order(pushboxv1::idx) + .load::(conn) + .context(HandlerErrorKind::DBError)? + .into_iter() + .collect()) + } + + pub fn delete( + conn: &MysqlConnection, + user_id: &String, + device_id: &String, + ) -> HandlerResult { + // boxed deletes are "coming soon" + // see https://github.com/diesel-rs/diesel/pull/1534 + if device_id.len() > 0 { + diesel::delete( + pushboxv1::table + .filter(pushboxv1::user_id.eq(user_id)) + .filter(pushboxv1::device_id.eq(device_id)), + ).execute(conn) + .context(HandlerErrorKind::DBError) + .unwrap(); + } else { + diesel::delete(pushboxv1::table.filter(pushboxv1::user_id.eq(user_id))) + .execute(conn) + .context(HandlerErrorKind::DBError) + .unwrap(); + } + Ok(true) + } +} diff --git a/src/db/schema.rs b/src/db/schema.rs new file mode 100644 index 0000000..365f566 --- /dev/null +++ b/src/db/schema.rs @@ -0,0 +1,10 @@ +table! { + pushboxv1 (user_id, device_id, service) { + user_id -> Varchar, + device_id -> Varchar, + service -> Varchar, + data -> Binary, + idx -> Bigint, + ttl -> Bigint, + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..741141c --- /dev/null +++ b/src/error.rs @@ -0,0 +1,116 @@ +/// Error handling based on the failure crate +/// +/// Only rocket's Handlers can render error responses w/ a contextual JSON +/// payload. So request guards should generally return VALIDATION_FAILED, +/// leaving error handling to the Handler (which in turn must take a Result of +/// request guards' fields). +/// +/// HandlerErrors are rocket Responders (render their own error responses). +use std::fmt; +use std::result; + +use failure::{Backtrace, Context, Error, Fail}; +use rocket::http::Status; +use rocket::response::{Responder, Response}; +use rocket::{self, response, Request}; +use rocket_contrib::Json; + +pub type Result = result::Result; + +pub type HandlerResult = result::Result; + +/// Signal a request guard failure, propagated up to the Handler to render an +/// error response +pub const VALIDATION_FAILED: Status = Status::InternalServerError; + +#[derive(Debug)] +pub struct HandlerError { + inner: Context, +} + +#[derive(Clone, Eq, PartialEq, Debug, Fail)] +pub enum HandlerErrorKind { + /// 401 Unauthorized + #[fail(display = "Missing authorization header")] + MissingAuth, + #[fail(display = "Invalid authorization header")] + InvalidAuth, + #[fail(display = "Unauthorized: {:?}", _0)] + Unauthorized(String), + /// 404 Not Found + #[fail(display = "Not Found")] + NotFound, + + #[fail(display = "A database error occurred")] + DBError, + + // Note: Make sure that if display has an argument, the label includes the argument, + // otherwise the process macro parser will fail on `derive(Fail)` + #[fail(display = "Unexpected rocket error: {:?}", _0)] + RocketError(rocket::Error), // rocket::Error isn't a std Error (so no #[cause]) + #[fail(display = "Unexpected application error: {:?}", _0)] + InternalError(String), + // Application Errors +} + +impl HandlerErrorKind { + /// Return a rocket response Status to be rendered for an error + pub fn http_status(&self) -> Status { + match *self { + HandlerErrorKind::MissingAuth | HandlerErrorKind::InvalidAuth => Status::Unauthorized, + HandlerErrorKind::Unauthorized(ref _msg) => Status::Unauthorized, + HandlerErrorKind::NotFound => Status::NotFound, + HandlerErrorKind::DBError => Status::ServiceUnavailable, + _ => Status::BadRequest, + } + } +} + +impl HandlerError { + pub fn kind(&self) -> &HandlerErrorKind { + self.inner.get_context() + } +} + +impl Fail for HandlerError { + fn cause(&self) -> Option<&Fail> { + self.inner.cause() + } + + fn backtrace(&self) -> Option<&Backtrace> { + self.inner.backtrace() + } +} + +impl fmt::Display for HandlerError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&self.inner, f) + } +} + +impl From for HandlerError { + fn from(kind: HandlerErrorKind) -> HandlerError { + Context::new(kind).into() + } +} + +impl From> for HandlerError { + fn from(inner: Context) -> HandlerError { + HandlerError { inner: inner } + } +} + +/// Generate HTTP error responses for HandlerErrors +impl<'r> Responder<'r> for HandlerError { + fn respond_to(self, request: &Request) -> response::Result<'r> { + let status = self.kind().http_status(); + let json = Json(json!({ + "code": status.code, + "error": format!("{}", self) + })); + // XXX: logging + Response::build_from(json.respond_to(request)?) + .status(status) + .ok() + } +} diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..034fa91 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,34 @@ +use rocket::config::Config; +use rocket::request::{self, FromRequest}; +use rocket::{Outcome, Request, State}; +use slog; +use slog::Drain; +use slog_async; +use slog_scope; +use slog_stdlog; +use slog_term; + +#[derive(Clone, Debug)] +pub struct RBLogger { + pub log: slog::Logger, +} + +impl RBLogger { + pub fn new(config: &Config) -> RBLogger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::CompactFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + RBLogger { + log: slog::Logger::root(drain, o!()).new(o!()), + } + } +} + +impl<'a, 'r> FromRequest<'a, 'r> for RBLogger { + type Error = (); + + fn from_request(req: &'a Request<'r>) -> request::Outcome { + Outcome::Success(req.guard::>().unwrap().inner().clone()) + } +} diff --git a/src/main.rs b/src/main.rs index b48a0a5..8562bc9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,39 @@ -#![feature(plugin, decl_macro)] +#![feature(plugin, decl_macro, custom_derive, duration_extras)] #![plugin(rocket_codegen)] -extern crate rocket; -#[macro_use] extern crate rocket_contrib; -#[macro_use] extern crate serde_json; -#[macro_use] extern crate serde_derive; -#[macro_use] extern crate mysql; +#[macro_use] +extern crate diesel; +#[macro_use] +extern crate failure; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate serde_json; +#[macro_use] +extern crate slog; +#[macro_use] +extern crate slog_scope; -use rocket_contrib::{Json, JsonValue}; -use rocket::State; -use std::collections::HashMap; -// Add database +extern crate diesel_migrations; +extern crate mysql; +extern crate rand; +extern crate reqwest; +extern crate rocket; +extern crate rocket_contrib; +extern crate serde; +extern crate slog_async; +extern crate slog_json; +extern crate slog_stdlog; +extern crate slog_term; -mod store; +mod auth; +mod config; +mod db; +mod error; +mod logging; +mod server; fn main() { - println!("Hello, world!"); + let rocket_serv = server::Server::start(rocket::ignite()); + rocket_serv.unwrap().launch(); } - diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..5711f33 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,548 @@ +use std::cmp; +use std::str::Utf8Error; + +use rocket; +use rocket::config; +use rocket::fairing::AdHoc; +use rocket::http::Method; +use rocket::request::{FormItems, FromForm}; +use rocket_contrib::json::Json; +use slog::Logger; + +use auth::{AuthType, FxAAuthenticator}; +use config::ServerConfig; +use db::models::{calc_ttl, DatabaseManager}; +use db::{pool_from_config, Conn}; +use error::{HandlerError, HandlerErrorKind, HandlerResult}; +use logging::RBLogger; + +#[derive(Deserialize, Debug)] +pub struct DataRecord { + ttl: u64, + data: String, +} + +#[derive(Debug)] +pub struct Options { + pub index: Option, + pub limit: Option, + pub status: Option, +} + +// Convenience function to convert a Option result into a u64 value (or 0) +fn as_u64(opt: Result) -> u64 { + opt.unwrap_or("0".to_string()).parse::().unwrap_or(0) +} + +/// Valid GET options include: +/// +/// * *index* - Start from a given index number. +/// * *limit* - Only return **limit** number of items. +/// * *status* - Friendly format to specify the state of the client: +/// * *new* - This is a new UA, that needs all pending records. +/// * *lost* - The UA is lost and just needs to get the latest index record. +impl<'f> FromForm<'f> for Options { + type Error = (); + + fn from_form(items: &mut FormItems<'f>, _strict: bool) -> Result { + let mut opt = Options { + index: None, + limit: None, + status: None, + }; + + for (key, val) in items { + let decoded = val.url_decode(); + match key.to_lowercase().as_str() { + "index" => opt.index = Some(as_u64(decoded)), + "limit" => opt.limit = Some(as_u64(decoded)), + "status" => { + opt.status = match decoded { + Ok(status) => Some(status), + Err(_) => None, + } + } + _ => {} + } + } + Ok(opt) + } +} + +// Encapsulate the server. +pub struct Server {} + +impl Server { + pub fn start(rocket: rocket::Rocket) -> HandlerResult { + Ok(rocket + .attach(AdHoc::on_attach(|rocket| { + // Copy the config into a state manager. + let pool = pool_from_config(rocket.config()).expect("Could not get pool"); + let rbconfig = ServerConfig::new(rocket.config()); + let logger = RBLogger::new(rocket.config()); + slog_info!(logger.log, "sLogging initialized..."); + Ok(rocket.manage(rbconfig).manage(pool).manage(logger)) + })) + .mount( + "/v1/store", + routes![read, read_opt, write, delete, delete_user], + ) + .mount("/v1/", routes![status])) + } +} + +/// Check an Authorization token permission, defaulting to the proper schema check. +pub fn check_token( + config: &ServerConfig, + method: Method, + device_id: &String, + token: &HandlerResult, +) -> Result { + match token { + Ok(token) => match token.auth_type { + AuthType::FxAServer => check_server_token(config, method, device_id, token), + AuthType::FxAOauth => check_fxa_token(config, method, device_id, token), + }, + Err(_e) => Err(HandlerErrorKind::Unauthorized(String::from("Token invalid")).into()), + } +} + +/// Stub for FxA server token permission authentication. +pub fn check_server_token( + _config: &ServerConfig, + _method: Method, + _device_id: &String, + _token: &FxAAuthenticator, +) -> Result { + // currently a stub for the FxA server token auth. + // In theory, the auth mod already checks the token against config. + return Ok(true); +} + +/// Check the permissions of the FxA token to see if read/write access is provided. +pub fn check_fxa_token( + config: &ServerConfig, + method: Method, + device_id: &String, + token: &FxAAuthenticator, +) -> Result { + // call unwrap here because we already checked for instances. + let scope = &token.scope; + if scope.contains(&FxAAuthenticator::fxa_root(&config.auth_app_name)) { + return Ok(true); + } + // Otherwise check for explicit allowances + match method { + Method::Put | Method::Post | Method::Delete => { + if scope.contains(&format!( + "{}send/{}", + FxAAuthenticator::fxa_root(&config.auth_app_name), + device_id + )) + || scope.contains(&format!( + "{}send", + FxAAuthenticator::fxa_root(&config.auth_app_name) + )) { + return Ok(true); + } + } + Method::Get => { + if scope.contains(&format!( + "{}recv/{}", + FxAAuthenticator::fxa_root(&config.auth_app_name), + device_id + )) + || scope.contains(&format!( + "{}recv", + FxAAuthenticator::fxa_root(&config.auth_app_name) + )) { + return Ok(true); + } + } + _ => {} + } + return Err(HandlerErrorKind::Unauthorized("Access Token Unauthorized".to_string()).into()); +} + +// Method handlers::: +// Apparently you can't set these on impl methods, must be at top level. +// query string parameters for limit and index +#[get("//?")] +fn read_opt( + conn: Conn, + config: ServerConfig, + logger: RBLogger, + token: HandlerResult, + user_id: String, + device_id: String, + options: Options, +) -> HandlerResult { + // 👩🏫 note that the "token" var is a HandlerResult wrapped Validate struct. + // Validate::from_request extracts the token from the Authorization header, validates it + // against FxA and the method, and either returns OK or an error. We need to reraise it to the + // handler. + slog_info!(logger.log, "Handling Read"); + check_token(&config, Method::Get, &device_id, &token)?; + let max_index = DatabaseManager::max_index(&conn, &user_id, &device_id); + let mut index = options.index; + let mut limit = options.limit; + match options.status.unwrap_or("".into()).to_lowercase().as_str() { + "new" => { + // New entry, needs all data + index = None; + limit = None; + slog_debug!(logger.log, "Welcome new user"); + } + "lost" => { + // Just lost, needs just the next index. + index = None; + limit = Some(0); + slog_debug!(logger.log, "Sorry, you're lost."); + } + _ => {} + }; + let messages = + DatabaseManager::read_records(&conn, &user_id, &device_id, &index, &limit).unwrap(); + let mut msg_max: u64 = 0; + for message in &messages { + msg_max = cmp::max(msg_max, message.idx as u64); + } + slog_info!(logger.log, "Found messages"; "len" => messages.len()); + // returns json {"status":200, "index": max_index, "messages":[{"index": #, "data": String}, ...]} + let is_last = match limit { + None => true, + Some(0) => true, + Some(_) => { + let last = messages.last().unwrap(); + (last.idx as u64) == max_index + } + }; + Ok(Json(json!({ + "last": is_last, + "index": msg_max, + "status": 200, + "messages": messages + }))) +} + +#[get("//")] +fn read( + conn: Conn, + config: ServerConfig, + logger: RBLogger, + token: HandlerResult, + user_id: String, + device_id: String, +) -> HandlerResult { + read_opt( + conn, + config, + logger, + token, + user_id, + device_id, + Options { + index: None, + limit: None, + status: Some(String::from("start")), + }, + ) +} + +/// Write the user data to the database. +#[post("//", data = "")] +fn write( + conn: Conn, + config: ServerConfig, + logger: RBLogger, + token: HandlerResult, + user_id: String, + device_id: String, + data: Json, +) -> HandlerResult { + check_token(&config, Method::Post, &device_id, &token)?; + if config + .test_data + .get("auth_only") + .unwrap_or(&config::Value::from(false)) + .as_bool() + .unwrap_or(false) + { + // Auth testing, do not write to db. + slog_info!(logger.log, "Auth Skipping database check."); + return Ok(Json(json!({ + "status": 200, + "index": -1, + }))); + } + slog_debug!(logger.log, "Writing new record"); + let response = + DatabaseManager::new_record(&conn, &user_id, &device_id, &data.data, calc_ttl(data.ttl)); + if response.is_err() { + return Err(response.err().unwrap()); + } + // returns json {"status": 200, "index": #} + Ok(Json(json!({ + "status": 200, + "index": response.unwrap(), + }))) +} + +#[delete("//")] +fn delete( + conn: Conn, + config: ServerConfig, + token: HandlerResult, + user_id: String, + device_id: String, +) -> HandlerResult { + check_token(&config, Method::Delete, &device_id, &token)?; + DatabaseManager::delete(&conn, &user_id, &device_id)?; + // returns an empty object + Ok(Json(json!({}))) +} + +#[delete("/")] +fn delete_user( + conn: Conn, + config: ServerConfig, + token: HandlerResult, + user_id: String, +) -> HandlerResult { + check_token(&config, Method::Delete, &String::from(""), &token)?; + DatabaseManager::delete(&conn, &user_id, &String::from(""))?; + // returns an empty object + Ok(Json(json!({}))) +} + +#[get("/status")] +fn status(config: ServerConfig) -> HandlerResult { + let config = config; + + Ok(Json(json!({ + "status": "Ok", + "fxa_auth": config.fxa_host.clone(), + }))) +} + +#[cfg(test)] +mod test { + use rand::{thread_rng, Rng}; + use std::env; + + use rocket; + use rocket::config::{Config, Environment, RocketConfig, Table}; + use rocket::http::Header; + use rocket::local::Client; + use serde_json; + + use super::Server; + use auth::FxAAuthenticator; + + #[derive(Debug, Deserialize)] + struct WriteResp { + index: u64, + status: u32, + } + + #[derive(Debug, Deserialize)] + struct Msg { + index: u64, + data: String, + } + + #[derive(Debug, Deserialize)] + struct ReadResp { + status: u32, + index: u64, + last: bool, + messages: Vec, + } + + fn rocket_config(test_data: Table) -> Config { + let rconfig = RocketConfig::read().expect("failed to read config"); + let fxa_host = rconfig + .active() + .get_str("fxa_host") + .unwrap_or("oauth.stage.mozaws.net"); + + let db_url = env::var("ROCKET_DATABASE_URL") + .unwrap_or(String::from("mysql://test:test@localhost/pushbox")); + let config = Config::build(Environment::Development) + .extra("fxa_host", fxa_host) + .extra("database_url", db_url) + .extra("dryrun", true) + .extra("auth_app_name", "pushbox") + .extra("test_data", test_data) + .finalize() + .unwrap(); + config + } + + fn rocket_client(config: Config) -> Client { + let test_rocket = Server::start(rocket::custom(config, true)).expect("test rocket failed"); + Client::new(test_rocket).expect("test rocket launch failed") + } + + fn device_id() -> String { + thread_rng().gen_ascii_chars().take(8).collect() + } + + fn user_id() -> String { + format!("test-{}", device_id()) + } + + fn default_config_data() -> Table { + let mut test_data = Table::new(); + let mut fxa_response = Table::new(); + fxa_response.insert("user".into(), "test".into()); + fxa_response.insert("client_id".into(), "test".into()); + fxa_response.insert( + "scope".into(), + vec![format!("{}send/bar", FxAAuthenticator::fxa_root("pushbox"))].into(), + ); + test_data.insert("fxa_response".into(), fxa_response.into()); + test_data + } + + #[test] + fn test_valid_write() { + let test_data = default_config_data(); + println!("test_data: {:?}", &test_data); + let config = rocket_config(test_data); + let client = rocket_client(config); + let user_id = user_id(); + let url = format!("/v1/store/{}/{}", user_id, device_id()); + let mut result = client + .post(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 60, "data":"Some Data"}"#) + .dispatch(); + let body = &result.body_string().unwrap(); + assert!(result.status() == rocket::http::Status::raw(200)); + assert!(body.contains(r#""index":"#)); + assert!(body.contains(r#""status":200"#)); + + // cleanup + client + .delete(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + } + + #[test] + fn test_valid_read() { + let config = rocket_config(default_config_data()); + let client = rocket_client(config); + let url = format!("/v1/store/{}/{}", user_id(), device_id()); + let mut write_result = client + .post(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 60, "data":"Some Data"}"#) + .dispatch(); + let write_json: WriteResp = + serde_json::from_str(&write_result + .body_string() + .expect("Empty body string for write")) + .expect("Could not parse write response body"); + let mut read_result = client + .get(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + assert!(read_result.status() == rocket::http::Status::raw(200)); + let mut read_json: ReadResp = + serde_json::from_str(&read_result + .body_string() + .expect("Empty body for read response")) + .expect("Could not parse read response"); + + assert!(read_json.status == 200); + assert!(read_json.messages.len() > 0); + // a MySql race condition can cause this to fail. + assert!(write_json.index <= read_json.index); + + // return the message at index + read_result = client + .get(format!("{}?index={}&limit=1", url, write_json.index)) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + + read_json = + serde_json::from_str(&read_result + .body_string() + .expect("Empty body for read query")) + .expect("Could not parse read query body"); + assert!(read_json.status == 200); + assert!(read_json.messages.len() == 1); + // a MySql race condition can cause these to fail. + assert!(&read_json.index == &write_json.index); + assert!(&read_json.messages[0].index == &write_json.index); + // cleanup + client + .delete(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + } + + #[test] + fn test_valid_delete() { + let config = rocket_config(default_config_data()); + let client = rocket_client(config); + let user_id = user_id(); + let url = format!("/v1/store/{}/{}", user_id, device_id()); + client + .post(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .body(r#"{"ttl": 60, "data":"Some Data"}"#) + .dispatch(); + let mut del_result = client + .delete(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + assert!(del_result.status() == rocket::http::Status::raw(200)); + let mut res_str = del_result.body_string().expect("Empty delete body string"); + assert!(res_str == "{}"); + let mut read_result = client + .get(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + assert!(read_result.status() == rocket::http::Status::raw(200)); + res_str = read_result.body_string().expect("Empty read body string"); + let mut read_json: ReadResp = + serde_json::from_str(&res_str).expect("Could not parse ready body"); + assert!(read_json.messages.len() == 0); + + let read_result = client + .delete(format!("/v1/store/{}", user_id)) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + assert!(read_result.status() == rocket::http::Status::raw(200)); + assert!(del_result.body_string() == None); + + let mut read_result = client + .get(url.clone()) + .header(Header::new("Authorization", "bearer token")) + .header(Header::new("Content-Type", "application/json")) + .dispatch(); + assert!(read_result.status() == rocket::http::Status::raw(200)); + read_json = serde_json::from_str(&read_result + .body_string() + .expect("Empty verification body string")) + .expect( + "Could not parse verification body string", + ); + assert!(read_json.messages.len() == 0); + } + + // TODO: add tests for servertoken checks. +} diff --git a/src/store.rs b/src/store.rs deleted file mode 100644 index c7a2acf..0000000 --- a/src/store.rs +++ /dev/null @@ -1,48 +0,0 @@ - -#[derive(Serialize, Deserialize, Debug)] -struct Credentials { - user: String, - password: String, - url: String, -} - -#[derive(Serialize, Deserialize, Debug)] -struct PushData { - uaid: String, - device_id: String, - service: String, - data: String, -} - -#[derive(Serialize, Deserialize, Debug)] -struct Storage { - filename: String, // Storage location identifier (s3 name, etc.) - content_type: String, // Type of data storage (text/plain, application/json, etc.) - ttl: u64, // UTC of expiration - _io: None, // TODO: Placeholder for IO handle. -} - - -impl Storage { - fn new(credentials:Credentials) -> Storage { - // TODO: use credential info to initialize IO Handle. - return Storage { - filename: "", - content_type: "", - ttl: 0, - _io: None - } - } - - fn read() -> Result{ - // TODO Return the content of the IO Handle - } - - fn write(data: String) -> Result{ - // TODO: Write the data (should be uBuffer) to IO Handle - } - - fn delete() -> Result{ - // TODO: Delete the buffer - } -}