diff --git a/src/azure.rs b/src/azure.rs index 9c30c33..90e1a7b 100644 --- a/src/azure.rs +++ b/src/azure.rs @@ -10,7 +10,7 @@ impl AzureDriver { } } -use crate::{get_config_content, ReceivedFile}; +use crate::{debug_log, get_config_content, ReceivedFile}; use axum::http::{HeaderName, HeaderValue}; use azure_storage::StorageCredentials; use azure_storage_blobs::container::operations::BlobItem; @@ -23,15 +23,6 @@ use serde::Deserialize; use std::fs::read_to_string; use std::fs::File; use std::io::Read; -use std::env; - -macro_rules! debug_log { - ($($arg:tt)*) => { - if env::var("STORAGE_DEBUG").is_ok() { - println!($($arg)*); - } - }; -} use std::io::Write; use std::sync::Arc; use tempfile::Builder; @@ -81,12 +72,17 @@ fn get_azure_credentials(name: &str) -> AzureConfig { fn calculate_checksum(filename: &String, data: &[u8]) { let hash = sha2_512::default().update(data).finalize(); let digest = hash.digest(); - println!("File: {} Checksum: {}", filename, digest.to_hex_lowercase()); + debug_log!("File: {} Checksum: {}", filename, digest.to_hex_lowercase()); } /// Write file to Azure blob storage /// TBD: Rework, do not keep whole file as Vec in memory!!! -async fn write_file_to_blob(filename: String, data: Vec, cont_type: String, owner_email: Option) -> &'static str { +async fn write_file_to_blob( + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, +) -> &'static str { let azure_cfg = Arc::new(get_azure_credentials("azure")); let storage_account = azure_cfg.account.as_str(); @@ -130,7 +126,7 @@ async fn write_file_to_blob(filename: String, data: Vec, cont_type: String, match blob_client.put_block(block_id, buffer).await { Ok(_) => { total_bytes_uploaded += bytes_read; - println!("Uploaded {} bytes", total_bytes_uploaded); + debug_log!("Uploaded {} bytes", total_bytes_uploaded); } Err(e) => { eprintln!("Error uploading block: {:?}", e); @@ -150,17 +146,17 @@ async fn write_file_to_blob(filename: String, data: Vec, cont_type: String, .await { Ok(_) => { - println!("Block list uploaded"); + debug_log!("Block list uploaded"); let blob_url_res = blob_client.url(); match blob_url_res { Ok(blob_url) => { - println!("Blob URL: {}", blob_url); + debug_log!("Blob URL: {}", blob_url); } Err(e) => { eprintln!("Error getting blob URL: {:?}", e); } } - + // Set owner tag if email is provided if let Some(email) = owner_email { let mut tags = Tags::new(); @@ -168,11 +164,16 @@ async fn write_file_to_blob(filename: String, data: Vec, cont_type: String, if sanitized != email { debug_log!( "Sanitized owner tag value from '{}' to '{}'", - email, sanitized + email, + sanitized ); } // Ensure non-empty value - let final_value = if sanitized.is_empty() { "_".to_string() } else { sanitized }; + let final_value = if sanitized.is_empty() { + "_".to_string() + } else { + sanitized + }; tags.insert("owner".to_string(), final_value); match blob_client.set_tags(tags).await { Ok(_) => { @@ -275,9 +276,10 @@ async fn get_file_from_blob(filename: String) -> ReceivedFile { break; } std::thread::sleep(std::time::Duration::from_millis(1000)); - println!( + debug_log!( "Waiting for headers file {} to exist: {} seconds", - cache_filename_headers, seconds + cache_filename_headers, + seconds ); } @@ -320,7 +322,11 @@ async fn get_file_from_blob(filename: String) -> ReceivedFile { } } } - debug_log!("Downloading blob to cache file {} from {}", cache_filename, blob_url); + debug_log!( + "Downloading blob to cache file {} from {}", + cache_filename, + blob_url + ); let client = Client::new(); let response = client.get(blob_url).send().await; match response { @@ -382,17 +388,17 @@ async fn azure_set_filename_tags( debug_log!("Skipping tag with empty key after sanitization: '{}'", key); continue; } - let final_value = if sanitized_value.is_empty() { "_".to_string() } else { sanitized_value }; + let final_value = if sanitized_value.is_empty() { + "_".to_string() + } else { + sanitized_value + }; tags.insert(sanitized_key, final_value); } let res = blob_client.set_tags(tags).await; match res { - Ok(_) => { - Ok(String::from("OK")) - } - Err(e) => { - Err(e.to_string()) - } + Ok(_) => Ok(String::from("OK")), + Err(e) => Err(e.to_string()), } } @@ -424,7 +430,13 @@ async fn azure_list_files(directory: String) -> Vec { /// Implement Driver trait for AzureDriver impl super::Driver for AzureDriver { - fn write_file(&self, filename: String, data: Vec, cont_type: String, owner_email: Option) -> String { + fn write_file( + &self, + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, + ) -> String { let filenameret = filename.clone(); /* Call async write_file_to_blob use tokio::task::block_in_place */ tokio::task::block_in_place(|| { diff --git a/src/local.rs b/src/local.rs index c345ba7..99371ca 100644 --- a/src/local.rs +++ b/src/local.rs @@ -2,25 +2,16 @@ // Copyright (C) 2024-2025 Collabora, Ltd. // Author: Denys Fedoryshchenko -use crate::{get_config_content, ReceivedFile}; +use crate::{debug_log, get_config_content, ReceivedFile}; use axum::http::{HeaderName, HeaderValue}; use chksum_hash_sha2_512 as sha2_512; use headers::HeaderMap; use serde::Deserialize; -use std::env; use std::fs::{self, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; use toml::Table; -macro_rules! debug_log { - ($($arg:tt)*) => { - if env::var("STORAGE_DEBUG").is_ok() { - println!($($arg)*); - } - }; -} - pub struct LocalDriver; impl LocalDriver { @@ -38,12 +29,12 @@ struct LocalConfig { fn get_local_config() -> LocalConfig { let cfg_content = get_config_content(); let cfg: Table = toml::from_str(&cfg_content).unwrap(); - + // Default to "./storage" if no local config section exists let default_config = LocalConfig { storage_path: "./storage".to_string(), }; - + let local_cfg = match cfg.get("local") { Some(local_cfg) => local_cfg, None => { @@ -51,13 +42,13 @@ fn get_local_config() -> LocalConfig { return default_config; } }; - + let storage_path = local_cfg .get("storage_path") .and_then(|v| v.as_str()) .unwrap_or("./storage") .to_string(); - + LocalConfig { storage_path } } @@ -81,12 +72,12 @@ fn get_metadata_file_path(filename: &str) -> PathBuf { let config = get_local_config(); let storage_path = Path::new(&config.storage_path); let metadata_path = storage_path.join(".metadata"); - + // Create metadata directory if it doesn't exist if !metadata_path.exists() { let _ = fs::create_dir_all(&metadata_path); } - + // Generate hash-based filename for metadata let hash = sha2_512::default().update(filename.as_bytes()).finalize(); let digest = hash.digest(); @@ -101,14 +92,19 @@ fn calculate_checksum(filename: &str, data: &[u8]) { } /// Write file to local storage -fn write_file_to_local(filename: String, data: Vec, cont_type: String, owner_email: Option) -> Result { +fn write_file_to_local( + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, +) -> Result { let file_path = get_storage_file_path(&filename); - + // Ensure directory structure exists if let Err(e) = ensure_directory_exists(&file_path) { return Err(format!("Failed to create directory structure: {}", e)); } - + // Write the file match File::create(&file_path) { Ok(mut file) => { @@ -121,7 +117,7 @@ fn write_file_to_local(filename: String, data: Vec, cont_type: String, owner return Err(format!("Failed to create file: {}", e)); } } - + // Create and write metadata (headers and owner tag) let metadata_path = get_metadata_file_path(&filename); if let Ok(mut metadata_file) = File::create(&metadata_path) { @@ -131,7 +127,7 @@ fn write_file_to_local(filename: String, data: Vec, cont_type: String, owner } let _ = metadata_file.write_all(metadata_content.as_bytes()); } - + debug_log!("File written to local storage: {}", file_path.display()); Ok(filename) } @@ -140,13 +136,13 @@ fn write_file_to_local(filename: String, data: Vec, cont_type: String, owner fn get_headers_from_metadata_file(filename: &str) -> HeaderMap { let mut headers = HeaderMap::new(); let metadata_path = get_metadata_file_path(filename); - + if let Ok(content) = fs::read_to_string(&metadata_path) { for line in content.lines() { if let Some((name, value)) = line.split_once(':') { if let (Ok(key), Ok(val)) = ( HeaderName::from_bytes(name.trim().as_bytes()), - HeaderValue::from_str(value.trim()) + HeaderValue::from_str(value.trim()), ) { headers.insert(key, val); } @@ -158,32 +154,32 @@ fn get_headers_from_metadata_file(filename: &str) -> HeaderMap { headers.insert("content-type", val); } } - + headers } /// Get file from local storage fn get_file_from_local(filename: String) -> ReceivedFile { let file_path = get_storage_file_path(&filename); - + let mut received_file = ReceivedFile { original_filename: filename.clone(), cached_filename: String::new(), headers: HeaderMap::new(), valid: false, }; - + // Check if file exists if !file_path.exists() { debug_log!("File not found in local storage: {}", file_path.display()); return received_file; } - + // For local storage, we use the same file as both original and cached received_file.cached_filename = file_path.to_string_lossy().to_string(); received_file.headers = get_headers_from_metadata_file(&filename); received_file.valid = true; - + debug_log!("File found in local storage: {}", file_path.display()); received_file } @@ -197,20 +193,23 @@ fn list_files_in_local(directory: String) -> Vec { } else { storage_path.join(directory.trim_start_matches('/')) }; - + let mut files = Vec::new(); - + fn collect_files_recursive(path: &Path, base_path: &Path, files: &mut Vec) { if let Ok(entries) = fs::read_dir(path) { for entry in entries { if let Ok(entry) = entry { let entry_path = entry.path(); - + // Skip metadata directory - if entry_path.file_name().map_or(false, |name| name == ".metadata") { + if entry_path + .file_name() + .map_or(false, |name| name == ".metadata") + { continue; } - + if entry_path.is_file() { // Get relative path from storage root if let Ok(relative_path) = entry_path.strip_prefix(base_path) { @@ -223,27 +222,30 @@ fn list_files_in_local(directory: String) -> Vec { } } } - + if search_path.exists() && search_path.is_dir() { collect_files_recursive(&search_path, storage_path, &mut files); } - + files.sort(); debug_log!("Listed {} files from local storage", files.len()); files } /// Set tags for local storage (stored in metadata) -fn set_tags_for_local_file(filename: String, user_tags: Vec<(String, String)>) -> Result { +fn set_tags_for_local_file( + filename: String, + user_tags: Vec<(String, String)>, +) -> Result { let metadata_path = get_metadata_file_path(&filename); - + // Read existing metadata let mut existing_content = String::new(); if metadata_path.exists() { existing_content = fs::read_to_string(&metadata_path) .map_err(|e| format!("Failed to read metadata file: {}", e))?; } - + // Open metadata file for writing (create if doesn't exist) let mut metadata_file = OpenOptions::new() .create(true) @@ -251,25 +253,33 @@ fn set_tags_for_local_file(filename: String, user_tags: Vec<(String, String)>) - .truncate(true) .open(&metadata_path) .map_err(|e| format!("Failed to open metadata file: {}", e))?; - + // Write existing content first - metadata_file.write_all(existing_content.as_bytes()) + metadata_file + .write_all(existing_content.as_bytes()) .map_err(|e| format!("Failed to write existing metadata: {}", e))?; - + // Write tags for (tag, value) in user_tags { let tag_line = format!("tag-{}:{}\n", tag, value); - metadata_file.write_all(tag_line.as_bytes()) + metadata_file + .write_all(tag_line.as_bytes()) .map_err(|e| format!("Failed to write tag: {}", e))?; } - + debug_log!("Tags written to metadata file: {}", metadata_path.display()); Ok("OK".to_string()) } /// Implement Driver trait for LocalDriver impl super::Driver for LocalDriver { - fn write_file(&self, filename: String, data: Vec, cont_type: String, owner_email: Option) -> String { + fn write_file( + &self, + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, + ) -> String { match write_file_to_local(filename.clone(), data, cont_type, owner_email) { Ok(_) => filename, Err(e) => { @@ -278,11 +288,11 @@ impl super::Driver for LocalDriver { } } } - + fn get_file(&self, filename: String) -> ReceivedFile { get_file_from_local(filename) } - + fn tag_file( &self, filename: String, @@ -290,7 +300,7 @@ impl super::Driver for LocalDriver { ) -> Result { set_tags_for_local_file(filename, user_tags) } - + fn list_files(&self, directory: String) -> Vec { list_files_in_local(directory) } diff --git a/src/logging.rs b/src/logging.rs new file mode 100644 index 0000000..d8a9765 --- /dev/null +++ b/src/logging.rs @@ -0,0 +1,37 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::OnceLock; + +static VERBOSE: AtomicBool = AtomicBool::new(false); +static INIT_GUARD: OnceLock<()> = OnceLock::new(); + +const ENV_DEBUG: &str = "STORAGE_DEBUG"; +const ENV_VERBOSE: &str = "STORAGE_VERBOSE"; + +pub fn init(cli_verbose: bool) { + INIT_GUARD.get_or_init(|| { + let flag = cli_verbose || env_verbose_enabled(); + VERBOSE.store(flag, Ordering::Relaxed); + }); +} + +pub fn verbose_enabled() -> bool { + if INIT_GUARD.get().is_some() { + VERBOSE.load(Ordering::Relaxed) + } else { + env_verbose_enabled() + } +} + +fn env_verbose_enabled() -> bool { + std::env::var(ENV_DEBUG).map_or(false, |v| !v.is_empty()) + || std::env::var(ENV_VERBOSE).map_or(false, |v| !v.is_empty()) +} + +#[macro_export] +macro_rules! debug_log { + ($($arg:tt)*) => { + if $crate::logging::verbose_enabled() { + println!($($arg)*); + } + }; +} diff --git a/src/main.rs b/src/main.rs index abb2342..4ef4cea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,12 +12,14 @@ mod azure; mod local; +#[macro_use] +mod logging; mod storcaching; mod storjwt; use axum::{ body::Body, - extract::{ConnectInfo, DefaultBodyLimit, Multipart, Path, State}, + extract::{ConnectInfo, DefaultBodyLimit, Multipart, OriginalUri, Path, State}, http::{header, Method, StatusCode}, response::IntoResponse, routing::{get, post}, @@ -27,22 +29,16 @@ use axum_server::tls_rustls::RustlsConfig; use clap::Parser; use headers::HeaderMap; use std::path; -use std::{env, net::SocketAddr, path::PathBuf}; +use std::sync::OnceLock; +use std::{net::SocketAddr, path::PathBuf}; use tokio::io::AsyncSeekExt; -macro_rules! debug_log { - ($($arg:tt)*) => { - if env::var("STORAGE_DEBUG").is_ok() { - println!($($arg)*); - } - }; -} +use std::{collections::HashMap, sync::Arc}; +use sysinfo::Disks; +use tokio::sync::{RwLock, Semaphore}; use tokio_util::io::ReaderStream; use toml::Table; use tower::ServiceBuilder; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{RwLock, Semaphore}; -use sysinfo::Disks; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -71,6 +67,15 @@ struct Args { #[clap(long, default_value = "", help = "Generate JWT token for email")] generate_jwt_token: String, + + #[clap(short, long, action = clap::ArgAction::SetTrue, help = "Enable verbose logging")] + verbose: bool, +} + +static ARGS: OnceLock = OnceLock::new(); + +fn get_args() -> &'static Args { + ARGS.get_or_init(|| Args::parse()) } // const names for last-modified and etag in lowercase @@ -85,11 +90,7 @@ struct AppState { file_locks: FileSemaphores, } - -async fn get_or_create_semaphore( - locks: &FileSemaphores, - filename: &str, -) -> Arc { +async fn get_or_create_semaphore(locks: &FileSemaphores, filename: &str) -> Arc { let mut map = locks.write().await; map.entry(filename.to_string()) .or_insert_with(|| Arc::new(Semaphore::new(1))) @@ -104,7 +105,13 @@ struct ReceivedFile { } trait Driver { - fn write_file(&self, filename: String, data: Vec, cont_type: String, owner_email: Option) -> String; + fn write_file( + &self, + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, + ) -> String; fn get_file(&self, filename: String) -> ReceivedFile; fn tag_file( &self, @@ -128,12 +135,12 @@ fn init_driver(driver_type: &str) -> Box { } pub fn get_config_content() -> String { - let args = Args::parse(); + let args = get_args(); let mut cfg_file = PathBuf::from(&args.config_file); if let Ok(cfg_file_env) = std::env::var("KCI_STORAGE_CONFIG") { cfg_file = PathBuf::from(&cfg_file_env); } - + std::fs::read_to_string(&cfg_file).unwrap() } @@ -141,18 +148,52 @@ pub fn get_config_content() -> String { fn get_driver_type() -> String { let cfg_content = get_config_content(); let cfg: Table = toml::from_str(&cfg_content).unwrap(); - + cfg.get("driver") .and_then(|v| v.as_str()) .unwrap_or("azure") .to_string() } +fn client_ip_from_headers(headers: &HeaderMap, fallback: SocketAddr) -> String { + if let Some(forwarded_for) = headers + .get("X-Forwarded-For") + .and_then(|value| value.to_str().ok()) + { + if let Some(first_ip) = forwarded_for + .split(',') + .map(|part| part.trim()) + .find(|part| !part.is_empty()) + { + return first_ip.to_string(); + } + } + + if let Some(forwarded) = headers + .get("Forwarded") + .and_then(|value| value.to_str().ok()) + { + for entry in forwarded.split(',') { + for directive in entry.split(';') { + let directive = directive.trim(); + if let Some(value) = directive.strip_prefix("for=") { + let cleaned = value.trim_matches('"'); + if !cleaned.is_empty() { + return cleaned.to_string(); + } + } + } + } + } + + fallback.ip().to_string() +} + /// Initial variables configuration and checks async fn initial_setup() -> Option { let cache_dir = "cache"; let download_dir = "download"; - let args = Args::parse(); + let args = get_args(); if args.generate_jwt_secret { storjwt::generate_jwt_secret(); @@ -191,10 +232,10 @@ async fn initial_setup() -> Option { // is ENV KCI_STORAGE_CONFIG set? if let Ok(cfg_file_env) = std::env::var("KCI_STORAGE_CONFIG") { cfg_file = PathBuf::from(&cfg_file_env); - println!("Using config file from ENV: {}", cfg_file.display()); + debug_log!("Using config file from ENV: {}", cfg_file.display()); } else { cfg_file = PathBuf::from(&args.config_file); - println!("Using config file from args: {}", cfg_file.display()); + debug_log!("Using config file from args: {}", cfg_file.display()); } if !cfg_file.exists() { @@ -209,7 +250,7 @@ async fn initial_setup() -> Option { .await; match config { Ok(tlsconf) => { - println!("TLS config loaded, HTTPS mode enabled"); + debug_log!("TLS config loaded, HTTPS mode enabled"); Some(tlsconf) } Err(e) => { @@ -245,22 +286,29 @@ async fn ax_metrics() -> (StatusCode, String) { let tag_diskname = disk.name().to_string_lossy(); let tag_total_space = disk.total_space(); let tag_available_space = disk.available_space(); - - metrics.push_str(&format!("storage_free_space {{hostname=\"{}\", diskname=\"{}\", mount_point=\"{}\"}} {}\n", hostname, tag_diskname, mount_point, tag_available_space)); - metrics.push_str(&format!("storage_total_space {{hostname=\"{}\", diskname=\"{}\", mount_point=\"{}\"}} {}\n", hostname, tag_diskname, mount_point, tag_total_space)); + + metrics.push_str(&format!( + "storage_free_space {{hostname=\"{}\", diskname=\"{}\", mount_point=\"{}\"}} {}\n", + hostname, tag_diskname, mount_point, tag_available_space + )); + metrics.push_str(&format!( + "storage_total_space {{hostname=\"{}\", diskname=\"{}\", mount_point=\"{}\"}} {}\n", + hostname, tag_diskname, mount_point, tag_total_space + )); } (StatusCode::OK, metrics) } #[tokio::main] async fn main() { + logging::init(get_args().verbose); tracing_subscriber::fmt::init(); let tlscfg = initial_setup().await; let port = 3000; let state = AppState { file_locks: Arc::new(RwLock::new(HashMap::new())), }; - println!("Starting server, tls: {:?}", tlscfg); + debug_log!("Starting server, tls: {:?}", tlscfg); // Supported endpoints: // GET / - root @@ -320,7 +368,7 @@ async fn ax_check_auth(headers: HeaderMap) -> (StatusCode, String) { match message { Ok(email) => { let message = format!("Authorized: {}", email); - println!("Authorized: {}", email); + debug_log!("Authorized: {}", email); (StatusCode::OK, message) } Err(_) => (StatusCode::UNAUTHORIZED, "Unauthorized".to_string()), @@ -366,7 +414,7 @@ fn verify_upload_permissions(owner: &str, path: &str) -> Result<(), String> { let users = match users_r { Some(users) => users, None => { - println!("No users section in config.toml, ignoring upload path restriction"); + debug_log!("No users section in config.toml, ignoring upload path restriction"); return Ok(()); } }; @@ -397,14 +445,20 @@ fn verify_upload_permissions(owner: &str, path: &str) -> Result<(), String> { This function will check if the Authorization header is present and if the token is correct If the token is correct, it will write the content of the file to the server */ -async fn ax_post_file(headers: HeaderMap, State(state): State, mut multipart: Multipart) -> (StatusCode, Vec) { +async fn ax_post_file( + ConnectInfo(remote_addr): ConnectInfo, + OriginalUri(original_uri): OriginalUri, + headers: HeaderMap, + State(state): State, + mut multipart: Multipart, +) -> (StatusCode, Vec) { // call check_auth let message = verify_auth_hdr(&headers); let owner = match message { Ok(owner) => owner, Err(_) => return (StatusCode::UNAUTHORIZED, Vec::new()), }; - println!("Authorized"); + debug_log!("Authorized"); /* 100-continue Expect is broken, quite hard to fix in axum */ /* @@ -416,11 +470,10 @@ async fn ax_post_file(headers: HeaderMap, State(state): State, mut mul } */ - println!("Uploading file"); + debug_log!("Uploading file"); let mut path: String = "".to_string(); let mut file0: Vec = Vec::new(); let mut file0_filename: String = "".to_string(); - // verify upload permissions, some users have upload permissions only for certain prefix(path) // check config.toml for upload_prefixes @@ -429,7 +482,6 @@ async fn ax_post_file(headers: HeaderMap, State(state): State, mut mul Err(e) => return (StatusCode::FORBIDDEN, e.to_string().into_bytes()), } - while let Some(field) = multipart.next_field().await.unwrap() { let name = field.name().unwrap().to_string(); //let filename = field.file_name(); @@ -438,7 +490,7 @@ async fn ax_post_file(headers: HeaderMap, State(state): State, mut mul match data { Ok(data) => { - println!("Field {}: {} bytes", name, data.len()); + debug_log!("Field {}: {} bytes", name, data.len()); if name == "path" { path = String::from_utf8(data.to_vec()).unwrap(); } else if name == "file0" { @@ -448,7 +500,7 @@ async fn ax_post_file(headers: HeaderMap, State(state): State, mut mul None => todo!(), } } else { - println!("Unknown field {}: {} bytes", name, data.len()); + debug_log!("Unknown field {}: {} bytes", name, data.len()); } } Err(e) => { @@ -460,46 +512,72 @@ async fn ax_post_file(headers: HeaderMap, State(state): State, mut mul } } } - println!("Upload: {} bytes, {}/{}", file0.len(), path, file0_filename); + debug_log!("Upload: {} bytes, {}/{}", file0.len(), path, file0_filename); // if path ends on /, remove it if path.ends_with("/") { // TBD: Fix it! - println!("Removing trailing /, workaround"); + debug_log!("Removing trailing /, workaround"); path.pop(); } - + let full_path = format!("{}/{}", path, file0_filename); let hdr_content_type = headers.get("Content-Type-Upstream"); let semaphore = get_or_create_semaphore(&state.file_locks, &full_path).await; - + // Try to acquire permit - fails immediately if upload in progress - let permit = match semaphore.try_acquire() { + let _permit = match semaphore.try_acquire() { Ok(permit) => permit, Err(_) => { - return (StatusCode::CONFLICT, "Upload already in progress".to_string().into_bytes()); + return ( + StatusCode::CONFLICT, + "Upload already in progress".to_string().into_bytes(), + ); } }; let content_type: String = match hdr_content_type { - Some(content_type) => { - content_type.to_str().unwrap().to_string() - } + Some(content_type) => content_type.to_str().unwrap().to_string(), None => { let heuristic_ctype = heuristic_filetype(file0_filename); - debug_log!("Content-Type not found, using heuristics: {}", heuristic_ctype); + debug_log!( + "Content-Type not found, using heuristics: {}", + heuristic_ctype + ); heuristic_ctype } }; // TBD - let message = write_file_driver(full_path, file0, content_type.to_string(), Some(owner)); + let upload_size = file0.len(); + let message = write_file_driver( + full_path.clone(), + file0, + content_type.to_string(), + Some(owner.clone()), + ); if !message.is_empty() { return (StatusCode::CONFLICT, Vec::new()); } + let status = StatusCode::OK; + let client_ip = client_ip_from_headers(&headers, remote_addr); + let timestamp = std::time::SystemTime::now(); + let human_time = chrono::DateTime::::from(timestamp); + let request_target = original_uri.to_string(); + println!( + "{} {} {} {} {} {} {} {}", + client_ip, + status.as_u16(), + upload_size, + human_time, + Method::POST, + request_target, + full_path, + owner + ); // write metadata file into cache directory //let metadata_filename = format!("{}/{}.metadata", path, file0_filename); //write_cache_metadata(metadata_filename, file0.len()); - (StatusCode::OK, Vec::new()) + (status, Vec::new()) } fn filename_from_fullpath(filepath: &str) -> String { @@ -540,18 +618,17 @@ async fn ax_get_file( let semaphore = get_or_create_semaphore(&state.file_locks, &filepath).await; // Wait for permit with timeout - let _permit = match tokio::time::timeout( - tokio::time::Duration::from_secs(30), - semaphore.acquire(), - ).await { - Ok(Ok(permit)) => permit, - Ok(Err(_)) => { - return (StatusCode::INTERNAL_SERVER_ERROR, "Semaphore closed").into_response(); - } - Err(_) => { - return (StatusCode::REQUEST_TIMEOUT, "Timeout waiting for upload").into_response(); - } - }; + let _permit = + match tokio::time::timeout(tokio::time::Duration::from_secs(30), semaphore.acquire()).await + { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => { + return (StatusCode::INTERNAL_SERVER_ERROR, "Semaphore closed").into_response(); + } + Err(_) => { + return (StatusCode::REQUEST_TIMEOUT, "Timeout waiting for upload").into_response(); + } + }; // IMPORTANT! Headers in cache must be stored in lowercase let received_file = driver_get_file(filepath.clone()); @@ -711,7 +788,12 @@ fn driver_get_file(filepath: String) -> ReceivedFile { driver.get_file(filepath) } -fn write_file_driver(filename: String, data: Vec, cont_type: String, owner_email: Option) -> String { +fn write_file_driver( + filename: String, + data: Vec, + cont_type: String, + owner_email: Option, +) -> String { let driver_name = get_driver_type(); let driver = init_driver(&driver_name); driver.write_file(filename, data, cont_type, owner_email); @@ -738,7 +820,9 @@ fn parse_range(range: &str) -> (u64, u64) { /// Return error message + owner if the token is correct fn verify_auth_hdr(headers: &HeaderMap) -> Result> { let auth = headers.get("Authorization"); - if auth == None { return Err(None) } + if auth == None { + return Err(None); + } let token = auth.unwrap().to_str().unwrap().split_whitespace(); let token_parts: Vec<&str> = token.collect(); if token_parts.len() != 2 { diff --git a/src/storcaching.rs b/src/storcaching.rs index 9a0b8ad..ed72c3f 100644 --- a/src/storcaching.rs +++ b/src/storcaching.rs @@ -1,15 +1,7 @@ +use crate::debug_log; use std::fs; use std::time::SystemTime; use tokio::time::Duration; -use std::env; - -macro_rules! debug_log { - ($($arg:tt)*) => { - if env::var("STORAGE_DEBUG").is_ok() { - println!($($arg)*); - } - }; -} struct Files { file: String, @@ -67,7 +59,11 @@ fn delete_cache_file(file: String) { // Truncate from filename .content, and add .headers, delete both files let content_filename = file.clone(); let headers_filename = file.replace(".content", ".headers"); - debug_log!("Deleting files: {} {}", &content_filename, &headers_filename); + debug_log!( + "Deleting files: {} {}", + &content_filename, + &headers_filename + ); let res = fs::remove_file(&content_filename); match res { Ok(_) => {} @@ -99,7 +95,10 @@ async fn clean_disk(cache_dir: String) { if oldest_file.last_update.elapsed().unwrap() > Duration::from_secs(60 * 60) { delete_cache_file(oldest_file.file); } else { - debug_log!("File is less than 60 min old, skipping: {}, sleeping 60 seconds", oldest_file.file); + debug_log!( + "File is less than 60 min old, skipping: {}, sleeping 60 seconds", + oldest_file.file + ); // sleep 60 seconds tokio::time::sleep(Duration::from_secs(60)).await; } diff --git a/src/storjwt.rs b/src/storjwt.rs index feae1fb..7072c2a 100644 --- a/src/storjwt.rs +++ b/src/storjwt.rs @@ -1,18 +1,9 @@ -use crate::get_config_content; +use crate::{debug_log, get_config_content}; use hmac::{Hmac, Mac}; use jwt::{Header, SignWithKey, Token, VerifyWithKey}; use sha2::Sha256; use std::collections::BTreeMap; use toml::value::Table; -use std::env; - -macro_rules! debug_log { - ($($arg:tt)*) => { - if env::var("STORAGE_DEBUG").is_ok() { - println!($($arg)*); - } - }; -} pub fn verify_jwt_token(token_str: &str) -> Result, jwt::Error> { // config.toml, jwt_secret parameter let toml_cfg = get_config_content();