diff --git a/Cargo.lock b/Cargo.lock index 884c0fe16..2749b1cf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3607,6 +3607,7 @@ dependencies = [ "aws-smithy-runtime", "aws-smithy-runtime-api", "base", + "base_rt", "ctor", "deno", "deno_ast", diff --git a/crates/base/src/runtime/mod.rs b/crates/base/src/runtime/mod.rs index 8eccb3943..ced3efb92 100644 --- a/crates/base/src/runtime/mod.rs +++ b/crates/base/src/runtime/mod.rs @@ -651,15 +651,22 @@ where let tmp_fs = TmpFs::try_from(maybe_tmp_fs_config.unwrap_or_default())?; let tmp_fs_actual_path = tmp_fs.actual_path().to_path_buf(); - let fs = PrefixFs::new("/tmp", tmp_fs.clone(), Some(base_fs)) + let mut fs = PrefixFs::new("/tmp", tmp_fs.clone(), Some(base_fs)) .tmp_dir("/tmp") .add_fs(tmp_fs_actual_path, tmp_fs); + fs + .set_runtime_state(&runtime_state); + Ok( if let Some(s3_fs) = maybe_s3_fs_config.map(S3Fs::new).transpose()? { - (Arc::new(fs.add_fs("/s3", s3_fs.clone())), Some(s3_fs)) + let mut s3_prefix_fs = fs.add_fs("/s3", s3_fs.clone()); + + s3_prefix_fs.set_check_sync_api(is_user_worker); + + (Arc::new(s3_prefix_fs), Some(s3_fs)) } else { (Arc::new(fs), None) }, diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml index b07632392..d7aecf6b8 100644 --- a/crates/fs/Cargo.toml +++ b/crates/fs/Cargo.toml @@ -20,6 +20,7 @@ deno_semver.workspace = true eszip_trait.workspace = true +base_rt.workspace = true ext_node.workspace = true anyhow.workspace = true diff --git a/crates/fs/impl/prefix_fs.rs b/crates/fs/impl/prefix_fs.rs index b0f802222..b1e5332c5 100644 --- a/crates/fs/impl/prefix_fs.rs +++ b/crates/fs/impl/prefix_fs.rs @@ -5,6 +5,8 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; +use anyhow::anyhow; +use base_rt::RuntimeState; use deno_fs::AccessCheckCb; use deno_fs::FsDirEntry; use deno_fs::FsFileType; @@ -21,6 +23,8 @@ pub struct PrefixFs { tmp_dir: Option, fs: Arc, base_fs: Option>, + runtime_state: Option>, + check_sync_api: bool, } impl PrefixFs @@ -41,6 +45,8 @@ where tmp_dir: None, fs: Arc::new(fs), base_fs, + runtime_state: None, + check_sync_api: false, } } @@ -75,6 +81,16 @@ where self.tmp_dir = Some(v.as_ref().to_path_buf()); self } + + pub fn set_runtime_state(&mut self, v: &Arc) -> &mut Self { + self.runtime_state = Some(v.clone()); + self + } + + pub fn set_check_sync_api(&mut self, v: bool) -> &mut Self { + self.check_sync_api = v; + self + } } impl PrefixFs @@ -95,6 +111,8 @@ where fs: Arc::new(fs), cwd: self.cwd.take(), tmp_dir: self.tmp_dir.take(), + runtime_state: self.runtime_state.clone(), + check_sync_api: self.check_sync_api, base_fs: Some(Arc::new(self)), } } @@ -149,6 +167,7 @@ where options: OpenOptions, access_check: Option, ) -> FsResult> { + self.check_sync_api_allowed("open_sync")?; if path.starts_with(&self.prefix) { self.fs.open_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -194,6 +213,7 @@ where recursive: bool, mode: Option, ) -> FsResult<()> { + self.check_sync_api_allowed("mkdir_sync")?; if path.starts_with(&self.prefix) { self.fs.mkdir_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -232,6 +252,7 @@ where } fn chmod_sync(&self, path: &Path, mode: u32) -> FsResult<()> { + self.check_sync_api_allowed("chmod_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -264,6 +285,7 @@ where uid: Option, gid: Option, ) -> FsResult<()> { + self.check_sync_api_allowed("chown_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -305,6 +327,7 @@ where uid: Option, gid: Option, ) -> FsResult<()> { + self.check_sync_api_allowed("lchown_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -341,6 +364,7 @@ where } fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> { + self.check_sync_api_allowed("remove_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -371,6 +395,8 @@ where } fn copy_file_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { + self.check_sync_api_allowed("copy_file_sync")?; + let oldpath_matches = oldpath.starts_with(&self.prefix); let newpath_matches = newpath.starts_with(&self.prefix); if oldpath_matches || newpath_matches { @@ -426,6 +452,8 @@ where } fn cp_sync(&self, path: &Path, new_path: &Path) -> FsResult<()> { + self.check_sync_api_allowed("cp_sync")?; + let path_matches = path.starts_with(&self.prefix); let new_path_matches = new_path.starts_with(&self.prefix); if path_matches || new_path_matches { @@ -477,6 +505,7 @@ where } fn stat_sync(&self, path: &Path) -> FsResult { + self.check_sync_api_allowed("stat_sync")?; if path.starts_with(&self.prefix) { self.fs.stat_sync(path.strip_prefix(&self.prefix).unwrap()) } else { @@ -502,6 +531,7 @@ where } fn lstat_sync(&self, path: &Path) -> FsResult { + self.check_sync_api_allowed("lstat_sync")?; if path.starts_with(&self.prefix) { self.fs.lstat_sync(path.strip_prefix(&self.prefix).unwrap()) } else { @@ -527,6 +557,7 @@ where } fn realpath_sync(&self, path: &Path) -> FsResult { + self.check_sync_api_allowed("realpath_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -554,6 +585,7 @@ where } fn read_dir_sync(&self, path: &Path) -> FsResult> { + self.check_sync_api_allowed("read_dir_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -581,6 +613,8 @@ where } fn rename_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { + self.check_sync_api_allowed("rename_sync")?; + let oldpath_matches = oldpath.starts_with(&self.prefix); let newpath_matches = newpath.starts_with(&self.prefix); if oldpath_matches || newpath_matches { @@ -636,6 +670,8 @@ where } fn link_sync(&self, oldpath: &Path, newpath: &Path) -> FsResult<()> { + self.check_sync_api_allowed("link_sync")?; + let oldpath_matches = oldpath.starts_with(&self.prefix); let newpath_matches = newpath.starts_with(&self.prefix); if oldpath_matches || newpath_matches { @@ -696,6 +732,8 @@ where newpath: &Path, file_type: Option, ) -> FsResult<()> { + self.check_sync_api_allowed("symlink_sync")?; + let oldpath_matches = oldpath.starts_with(&self.prefix); let newpath_matches = newpath.starts_with(&self.prefix); if oldpath_matches || newpath_matches { @@ -754,6 +792,7 @@ where } fn read_link_sync(&self, path: &Path) -> FsResult { + self.check_sync_api_allowed("read_link_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -781,6 +820,7 @@ where } fn truncate_sync(&self, path: &Path, len: u64) -> FsResult<()> { + self.check_sync_api_allowed("truncate_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -818,6 +858,7 @@ where mtime_secs: i64, mtime_nanos: u32, ) -> FsResult<()> { + self.check_sync_api_allowed("utime_sync")?; if path.starts_with(&self.prefix) { self.fs.utime_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -872,6 +913,7 @@ where mtime_secs: i64, mtime_nanos: u32, ) -> FsResult<()> { + self.check_sync_api_allowed("lutime_sync")?; if path.starts_with(&self.prefix) { self.fs.lutime_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -925,6 +967,7 @@ where access_check: Option, data: &[u8], ) -> FsResult<()> { + self.check_sync_api_allowed("write_file_sync")?; if path.starts_with(&self.prefix) { self.fs.write_file_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -970,6 +1013,7 @@ where path: &Path, access_check: Option, ) -> FsResult> { + self.check_sync_api_allowed("read_file_sync")?; if path.starts_with(&self.prefix) { self .fs @@ -1004,6 +1048,9 @@ where } fn is_file_sync(&self, path: &Path) -> bool { + if self.check_sync_api_allowed("is_file_sync").is_err() { + return false; + } if path.starts_with(&self.prefix) { self .fs @@ -1018,6 +1065,9 @@ where } fn is_dir_sync(&self, path: &Path) -> bool { + if self.check_sync_api_allowed("is_dir_sync").is_err() { + return false; + } if path.starts_with(&self.prefix) { self .fs @@ -1032,6 +1082,9 @@ where } fn exists_sync(&self, path: &Path) -> bool { + if self.check_sync_api_allowed("exists_sync").is_err() { + return false; + } if path.starts_with(&self.prefix) { self .fs @@ -1050,6 +1103,7 @@ where path: &Path, access_check: Option, ) -> FsResult> { + self.check_sync_api_allowed("read_text_file_lossy_sync")?; if path.starts_with(&self.prefix) { self.fs.read_text_file_lossy_sync( path.strip_prefix(&self.prefix).unwrap(), @@ -1084,3 +1138,22 @@ where } } } + +impl PrefixFs { + fn check_sync_api_allowed(&self, name: &'static str) -> FsResult<()> { + if !self.check_sync_api { + return Ok(()); + } + let Some(state) = self.runtime_state.as_ref() else { + return Ok(()); + }; + + if state.is_init() { + Ok(()) + } else { + Err(FsError::Io(io::Error::other(anyhow!(format!( + "invoking {name} is not allowed in the current context" + ))))) + } + } +} diff --git a/crates/fs/impl/s3_fs.rs b/crates/fs/impl/s3_fs.rs index 2c7a1949c..21b6b5977 100644 --- a/crates/fs/impl/s3_fs.rs +++ b/crates/fs/impl/s3_fs.rs @@ -3,8 +3,8 @@ use std::borrow::Cow; use std::cell::RefCell; use std::ffi::OsStr; use std::fmt::Debug; +use std::io; use std::io::Cursor; -use std::io::{self}; use std::mem; use std::os::fd::AsRawFd; use std::path::Path; @@ -16,7 +16,6 @@ use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use super::TryNormalizePath; use anyhow::anyhow; use anyhow::Context; use aws_config::retry::RetryConfig; @@ -57,6 +56,7 @@ use deno_io::fs::FsStat; use either::Either; use enum_as_inner::EnumAsInner; use futures::future::BoxFuture; +use futures::future::LocalBoxFuture; use futures::future::Shared; use futures::io::AllowStdIo; use futures::stream::FuturesUnordered; @@ -64,7 +64,6 @@ use futures::AsyncWriteExt; use futures::FutureExt; use futures::StreamExt; use futures::TryFutureExt; - use memmap2::MmapOptions; use memmap2::MmapRaw; use once_cell::sync::OnceCell; @@ -84,6 +83,10 @@ use tracing::trace_span; use tracing::warn; use tracing::Instrument; +use crate::rt; + +use super::TryNormalizePath; + const MIN_PART_SIZE: usize = 1024 * 1024 * 5; type BackgroundTask = Shared>>>; @@ -111,6 +114,62 @@ impl S3Fs { } } } + + async fn open_inner<'a>( + &'a self, + path: PathBuf, + options: OpenOptions, + _access_check: Option>, + ) -> FsResult> { + self.flush_background_tasks().await; + + let (bucket_name, key) = + try_get_bucket_name_and_key(path.try_normalize()?)?; + + if key.is_empty() { + return Err(FsError::Io(io::Error::from(io::ErrorKind::InvalidInput))); + } + + let resp = self + .client + .head_object() + .bucket(&bucket_name) + .key(&key) + .send() + .await; + + let mut not_found = false; + + if let Some(err) = resp.err() { + not_found = err + .as_service_error() + .map(|it| it.is_not_found()) + .unwrap_or_default(); + + if not_found { + if !(options.create || options.create_new) { + return Err(FsError::Io(io::Error::from(io::ErrorKind::NotFound))); + } + } else { + return Err(FsError::Io(io::Error::other(err))); + } + } + + let file = Rc::new(S3Object { + bucket_name, + key, + fs: self.clone(), + op_slot: AsyncRefCell::default(), + }); + + if not_found || options.truncate { + file.clone().write(BufView::empty()).await?; + } else if options.create_new { + return Err(FsError::Io(io::Error::from(io::ErrorKind::AlreadyExists))); + } + + Ok(file) + } } #[derive(Deserialize, Serialize, Debug, Clone, Default)] @@ -387,82 +446,58 @@ impl deno_fs::FileSystem for S3Fs { fn open_sync( &self, - _path: &Path, - _options: OpenOptions, + path: &Path, + options: OpenOptions, _access_check: Option, ) -> FsResult> { - Err(FsError::NotSupported) + let file_ptr = std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT.block_on(async move { + self.open_inner(path, options, None).await.map(|it| unsafe { + // SATEFY: parent thread is blocked by join() + it.into_ptr() + }) + }) + }) + .join() + .unwrap() + })?; + + Ok(unsafe { S3Object::from_ptr(file_ptr) }) } #[instrument( - level = "trace", - skip(self, options, _access_check), - fields(?options, has_access_check = _access_check.is_some()), - err(Debug) - )] + level = "trace", + skip(self, options, access_check), + fields(?options, has_access_check = access_check.is_some()), + err(Debug) + )] async fn open_async<'a>( &'a self, path: PathBuf, options: OpenOptions, - _access_check: Option>, + access_check: Option>, ) -> FsResult> { - self.flush_background_tasks().await; - - let (bucket_name, key) = - try_get_bucket_name_and_key(path.try_normalize()?)?; - - if key.is_empty() { - return Err(FsError::Io(io::Error::from(io::ErrorKind::InvalidInput))); - } - - let resp = self - .client - .head_object() - .bucket(&bucket_name) - .key(&key) - .send() - .await; - - let mut not_found = false; - - if let Some(err) = resp.err() { - not_found = err - .as_service_error() - .map(|it| it.is_not_found()) - .unwrap_or_default(); - - if not_found { - if !(options.create || options.create_new) { - return Err(FsError::Io(io::Error::from(io::ErrorKind::NotFound))); - } - } else { - return Err(FsError::Io(io::Error::other(err))); - } - } - - let file = Rc::new(S3Object { - bucket_name, - key, - fs: self.clone(), - op_slot: AsyncRefCell::default(), - }); - - if not_found || options.truncate { - file.clone().write(BufView::empty()).await?; - } else if options.create_new { - return Err(FsError::Io(io::Error::from(io::ErrorKind::AlreadyExists))); - } - - Ok(file) + Ok(self.open_inner(path, options, access_check).await?) } fn mkdir_sync( &self, - _path: &Path, - _recursive: bool, - _mode: Option, + path: &Path, + recursive: bool, + mode: Option, ) -> FsResult<()> { - Err(FsError::NotSupported) + std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT.block_on(async move { + self.mkdir_async(path, recursive, mode).await + }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip(self, _mode), fields(mode = _mode) ret, err(Debug))] @@ -623,8 +658,16 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } - fn remove_sync(&self, _path: &Path, _recursive: bool) -> FsResult<()> { - Err(FsError::NotSupported) + fn remove_sync(&self, path: &Path, recursive: bool) -> FsResult<()> { + std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT + .block_on(async move { self.remove_async(path, recursive).await }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip(self), ret, err(Debug))] @@ -764,8 +807,15 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } - fn stat_sync(&self, _path: &Path) -> FsResult { - Err(FsError::NotSupported) + fn stat_sync(&self, path: &Path) -> FsResult { + std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT.block_on(async move { self.stat_async(path).await }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip(self), err(Debug))] @@ -822,8 +872,15 @@ impl deno_fs::FileSystem for S3Fs { .await } - fn lstat_sync(&self, _path: &Path) -> FsResult { - Err(FsError::NotSupported) + fn lstat_sync(&self, path: &Path) -> FsResult { + std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT.block_on(async move { self.lstat_async(path).await }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip(self), err(Debug))] @@ -839,8 +896,15 @@ impl deno_fs::FileSystem for S3Fs { Err(FsError::NotSupported) } - fn read_dir_sync(&self, _path: &Path) -> FsResult> { - Err(FsError::NotSupported) + fn read_dir_sync(&self, path: &Path) -> FsResult> { + std::thread::scope(|s| { + let path = path.to_path_buf(); + s.spawn(move || { + rt::IO_RT.block_on(async move { self.read_dir_async(path).await }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip(self), err(Debug))] @@ -1364,6 +1428,10 @@ impl S3WriteUploadMethod { } } +pub struct S3ObjectPtr(*const S3Object); + +unsafe impl Send for S3ObjectPtr {} + pub struct S3Object { fs: S3Fs, op_slot: AsyncRefCell>, @@ -1417,211 +1485,288 @@ impl Drop for S3Object { } } -#[async_trait::async_trait(?Send)] -impl deno_io::fs::File for S3Object { - fn read_sync(self: Rc, _buf: &mut [u8]) -> FsResult { - Err(FsError::NotSupported) +impl S3Object { + unsafe fn into_ptr(self: Rc) -> S3ObjectPtr { + S3ObjectPtr(Rc::into_raw(self)) } - #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] - async fn read_byob( + unsafe fn from_ptr(ptr: S3ObjectPtr) -> Rc { + unsafe { Rc::from_raw(ptr.0) } + } + + fn read_byob_inner( self: Rc, - mut buf: BufMutView, - ) -> FsResult<(usize, BufMutView)> { - let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; - let Some(op_slot_mut) = op_slot.as_mut() else { - let resp = self - .fs - .client - .get_object() - .bucket(&self.bucket_name) - .key(&self.key) - .send() - .await; + buf: &mut [u8], + ) -> LocalBoxFuture<'_, FsResult> { + async move { + let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; + let Some(op_slot_mut) = op_slot.as_mut() else { + let resp = self + .fs + .client + .get_object() + .bucket(&self.bucket_name) + .key(&self.key) + .send() + .await; - let Ok(resp) = resp else { - return Ok((0, buf)); - }; + let Ok(resp) = resp else { + return Ok(0); + }; - let mut body_buf = resp.body.into_async_read(); - let nread = body_buf.read(&mut buf).await?; + let mut body_buf = resp.body.into_async_read(); + let nread = body_buf.read(buf).await?; - *op_slot = Some(S3ObjectOpSlot::Read(S3ObjectReadState( - Box::pin(body_buf), - nread, - ))); + *op_slot = Some(S3ObjectOpSlot::Read(S3ObjectReadState( + Box::pin(body_buf), + nread, + ))); - trace!(nread); - return Ok((nread, buf)); - }; + trace!(nread); + return Ok(nread); + }; - let Some(state) = op_slot_mut.as_read_mut() else { - return Err( - io::Error::other("read operation was blocked by another operation") - .into(), - ); - }; + let Some(state) = op_slot_mut.as_read_mut() else { + return Err( + io::Error::other("read operation was blocked by another operation") + .into(), + ); + }; - let err = match state.0.read(&mut buf).await { - Ok(nread) => { - state.1 += nread; + let err = match state.0.read(buf).await { + Ok(nread) => { + state.1 += nread; - if nread == 0 { - op_slot.take(); - } - - trace!(nread); - return Ok((nread, buf)); - } + if nread == 0 { + op_slot.take(); + } - Err(err) => err, - }; + trace!(nread); + return Ok(nread); + } - let is_retryable = { - use io::ErrorKind as E; - matches!( - err.kind(), - E::ConnectionRefused - | E::ConnectionReset - | E::ConnectionAborted - | E::BrokenPipe - | E::TimedOut - | E::NotConnected - ) - }; + Err(err) => err, + }; - warn!(kind = %err.kind(), reason = ?err, "stream closed abnormally"); - debug!(is_retryable); + let is_retryable = { + use io::ErrorKind as E; + matches!( + err.kind(), + E::ConnectionRefused + | E::ConnectionReset + | E::ConnectionAborted + | E::BrokenPipe + | E::TimedOut + | E::NotConnected + ) + }; - if is_retryable { - let resp = self - .fs - .client - .get_object() - .bucket(&self.bucket_name) - .key(&self.key) - .range(format!("{}-", state.1)) - .send() - .await - .map_err(io::Error::other)?; + warn!(kind = %err.kind(), reason = ?err, "stream closed abnormally"); + debug!(is_retryable); + + if is_retryable { + let resp = self + .fs + .client + .get_object() + .bucket(&self.bucket_name) + .key(&self.key) + .range(format!("{}-", state.1)) + .send() + .await + .map_err(io::Error::other)?; - let mut body_buf = resp.body.into_async_read(); - let nread = body_buf.read(&mut buf).await?; + let mut body_buf = resp.body.into_async_read(); + let nread = body_buf.read(buf).await?; - state.1 += nread; - state.0 = Box::pin(body_buf); + state.1 += nread; + state.0 = Box::pin(body_buf); - trace!(nread); - Ok((nread, buf)) - } else { - op_slot.take(); - Err(io::Error::other(err).into()) + trace!(nread); + Ok(nread) + } else { + op_slot.take(); + Err(io::Error::other(err).into()) + } } + .boxed_local() } - fn write_sync(self: Rc, _buf: &[u8]) -> FsResult { - Err(FsError::NotSupported) - } + fn write_inner( + self: Rc, + buf: &[u8], + ) -> LocalBoxFuture<'_, FsResult<(usize, bool)>> { + async move { + let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; + let Some(op_slot_mut) = op_slot.as_mut() else { + let mut state = S3ObjectWriteState::new().map_err(io::Error::other)?; + let size = buf.len(); + let nwritten = state.cursor.write(buf).await?; + let written = if size == nwritten { + (nwritten, true) + } else { + assert!(size > nwritten); + (nwritten, false) + }; - #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key, len = buf.len()), err(Debug))] - async fn write(self: Rc, buf: BufView) -> FsResult { - let mut op_slot = RcRef::map(&self, |r| &r.op_slot).borrow_mut().await; - let Some(op_slot_mut) = op_slot.as_mut() else { - let mut state = S3ObjectWriteState::new().map_err(io::Error::other)?; - let size = buf.len(); - let nwritten = state.cursor.write(&buf).await?; - let written = if size == nwritten { - WriteOutcome::Full { nwritten } - } else { - assert!(size > nwritten); - WriteOutcome::Partial { - nwritten, - view: buf, - } + state.total_written += nwritten; + state.method = + (nwritten < MIN_PART_SIZE).then_some(S3WriteUploadMethod::PutObject); + + *op_slot = Some(S3ObjectOpSlot::Write(state)); + + trace!(nwritten); + return Ok(written); + }; + + let Some(state) = op_slot_mut.as_write_mut() else { + return Err( + io::Error::other("write operation was blocked by another operation") + .into(), + ); }; + let size = buf.len(); + let nwritten = state.cursor.write(buf).await?; + state.total_written += nwritten; - state.method = - (nwritten < MIN_PART_SIZE).then_some(S3WriteUploadMethod::PutObject); - *op_slot = Some(S3ObjectOpSlot::Write(state)); + if nwritten < size { + // Now that the tmp buffer is full and we still have bytes to write, we must switch + // to a multi-part upload. + let mmap_buf = state.try_swap_buffer().map_err(io::Error::other)?; + + mmap_buf.raw.flush_async()?; + assert_eq!(state.buf.cursor.get_ref().position(), 0); + + let method = match &mut state.method { + Some(S3WriteUploadMethod::MultiPartUpload(method)) => method, + method @ (Some(S3WriteUploadMethod::PutObject) | None) => { + method.take(); + method + .insert(S3WriteUploadMethod::MultiPartUpload( + S3MultiPartUploadMethod { + upload_id: self + .fs + .client + .create_multipart_upload() + .bucket(&self.bucket_name) + .key(&self.key) + .send() + .await + .map_err(io::Error::other)? + .upload_id + .ok_or(io::Error::other(concat!( + "upload id could not be found in ", + "the multipart upload response" + )))?, + + ..Default::default() + }, + )) + .as_multi_part_upload_mut() + .unwrap() + } + }; + method.add_upload_part_task( + self.fs.client.clone(), + &self.bucket_name, + &self.key, + Either::Right(mmap_buf), + false, + ); + + trace!(nwritten); + return Ok((nwritten, false)); + } + + assert_eq!(nwritten, size); trace!(nwritten); - return Ok(written); - }; + Ok((nwritten, true)) + } + .boxed_local() + } - let Some(state) = op_slot_mut.as_write_mut() else { - return Err( - io::Error::other("write operation was blocked by another operation") - .into(), - ); - }; + async fn write_all_inner(self: Rc, mut buf: &[u8]) -> FsResult<()> { + loop { + let (nwritten, is_full) = self.clone().write_inner(buf).await?; + if is_full { + return Ok(()); + } + buf = &buf[nwritten..]; + } + } +} - let size = buf.len(); - let nwritten = state.cursor.write(&buf).await?; - - state.total_written += nwritten; - - if nwritten < size { - // Now that the tmp buffer is full and we still have bytes to write, we must switch - // to a multi-part upload. - let mmap_buf = state.try_swap_buffer().map_err(io::Error::other)?; - - mmap_buf.raw.flush_async()?; - assert_eq!(state.buf.cursor.get_ref().position(), 0); - - let method = match &mut state.method { - Some(S3WriteUploadMethod::MultiPartUpload(method)) => method, - method @ (Some(S3WriteUploadMethod::PutObject) | None) => { - method.take(); - method - .insert(S3WriteUploadMethod::MultiPartUpload( - S3MultiPartUploadMethod { - upload_id: self - .fs - .client - .create_multipart_upload() - .bucket(&self.bucket_name) - .key(&self.key) - .send() - .await - .map_err(io::Error::other)? - .upload_id - .ok_or(io::Error::other(concat!( - "upload id could not be found in ", - "the multipart upload response" - )))?, - - ..Default::default() - }, - )) - .as_multi_part_upload_mut() - .unwrap() - } - }; +#[async_trait::async_trait(?Send)] +impl deno_io::fs::File for S3Object { + fn read_sync(self: Rc, buf: &mut [u8]) -> FsResult { + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) } + .read_byob_inner(buf) + .await + }) + }) + .join() + .unwrap() + }) + } - method.add_upload_part_task( - self.fs.client.clone(), - &self.bucket_name, - &self.key, - Either::Right(mmap_buf), - false, - ); + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] + async fn read_byob( + self: Rc, + mut buf: BufMutView, + ) -> FsResult<(usize, BufMutView)> { + self.read_byob_inner(&mut buf).await.map(|it| (it, buf)) + } - trace!(nwritten); - return Ok(WriteOutcome::Partial { + fn write_sync(self: Rc, buf: &[u8]) -> FsResult { + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) }.write_inner(buf).await + }) + }) + .join() + .unwrap() + }) + .map(|it| it.0) + } + + #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key, len = buf.len()), err(Debug))] + async fn write(self: Rc, buf: BufView) -> FsResult { + let (nwritten, is_full) = self.write_inner(&buf).await?; + if is_full { + Ok(WriteOutcome::Full { nwritten }) + } else { + Ok(WriteOutcome::Partial { nwritten, view: buf, - }); + }) } - - assert_eq!(nwritten, size); - trace!(nwritten); - Ok(WriteOutcome::Full { nwritten }) } - fn write_all_sync(self: Rc, _buf: &[u8]) -> FsResult<()> { - Err(FsError::NotSupported) + fn write_all_sync(self: Rc, buf: &[u8]) -> FsResult<()> { + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) } + .write_all_inner(buf) + .await + }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] @@ -1638,7 +1783,17 @@ impl deno_io::fs::File for S3Object { } fn read_all_sync(self: Rc) -> FsResult> { - Err(FsError::NotSupported) + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) }.read_all_async().await + }) + }) + .join() + .unwrap() + }) } #[instrument(level = "trace", skip_all, fields(self.bucket_name, self.key), err(Debug))] @@ -1680,15 +1835,39 @@ impl deno_io::fs::File for S3Object { } fn datasync_sync(self: Rc) -> FsResult<()> { - Err(FsError::NotSupported) + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) }.datasync_async().await + }) + }) + .join() + .unwrap() + }) } + /// ## Note + /// This function should sync only data without metadata, but this is + /// meaningless on S3, so it's just an alias of + /// [sync_async](S3Object::sync_async). async fn datasync_async(self: Rc) -> FsResult<()> { - Err(FsError::NotSupported) + self.sync_async().await } fn sync_sync(self: Rc) -> FsResult<()> { - Err(FsError::NotSupported) + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) }.sync_async().await + }) + }) + .join() + .unwrap() + }) } async fn sync_async(self: Rc) -> FsResult<()> { @@ -1714,7 +1893,17 @@ impl deno_io::fs::File for S3Object { } fn stat_sync(self: Rc) -> FsResult { - Err(FsError::NotSupported) + // SATEFY: current thread will be blocked by join() + let ptr = unsafe { self.into_ptr() }; + std::thread::scope(|s| { + s.spawn(move || { + rt::IO_RT.block_on(async move { + unsafe { S3Object::from_ptr(ptr) }.stat_async().await + }) + }) + .join() + .unwrap() + }) } async fn stat_async(self: Rc) -> FsResult { diff --git a/crates/fs/tests/fixture/get/index.ts b/crates/fs/tests/fixture/get/index.ts index 25b1696f0..8ba07d6be 100644 --- a/crates/fs/tests/fixture/get/index.ts +++ b/crates/fs/tests/fixture/get/index.ts @@ -1,10 +1,22 @@ export default { async fetch(req: Request) { const url = new URL(req.url); + const sync = url.searchParams.get("sync") == "true"; const bucketName = Deno.env.get("S3FS_TEST_BUCKET_NAME")!; const key = url.pathname.split("/").slice(2).join("/"); - const f = await Deno.open(`/s3/${bucketName}/${key}`); - return new Response(f.readable, { status: 200 }); + if (sync) { + try { + return new Response(Deno.readFileSync(`/s3/${bucketName}/${key}`), { + status: 200, + }); + } catch (err) { + console.error(err); + return new Response(null, { status: 500 }); + } + } else { + const f = await Deno.open(`/s3/${bucketName}/${key}`); + return new Response(f.readable, { status: 200 }); + } }, }; diff --git a/crates/fs/tests/fixture/main_with_s3fs/index.ts b/crates/fs/tests/fixture/main_with_s3fs/index.ts index 333bd1191..2dc98572c 100644 --- a/crates/fs/tests/fixture/main_with_s3fs/index.ts +++ b/crates/fs/tests/fixture/main_with_s3fs/index.ts @@ -66,6 +66,9 @@ export default { noModuleCache, envVars, s3FsConfig, + context: { + useReadSyncFileAPI: true, + }, }); }; diff --git a/crates/fs/tests/integration_tests.rs b/crates/fs/tests/integration_tests.rs index 2bcb72216..f8b18579c 100644 --- a/crates/fs/tests/integration_tests.rs +++ b/crates/fs/tests/integration_tests.rs @@ -584,3 +584,54 @@ async fn test_mkdir_recursive_and_remove_recursive() { tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; } } + +#[cfg_attr(not(dotenv), ignore)] +#[tokio::test] +#[serial] +async fn test_ensure_using_sync_api_in_async_callback_is_not_allowed() { + remove("", true).await; + + let mut arr = vec![0u8; 1024]; + + { + let tb = get_tb_builder() + .with_server_flags(ServerFlags { + request_buffer_size: Some(64 * 1024), + ..Default::default() + }) + .build() + .await; + + rand::thread_rng().fill_bytes(&mut arr); + + let resp = tb + .request(|b| { + b.uri(format!("/write/{}", get_path("meow.bin"))) + .method("POST") + .body(arr.clone().into()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } + + { + let tb = get_tb_builder().build().await; + let resp = tb + .request(|b| { + b.uri(format!("/get/{}?sync=true", get_path("meow.bin"))) + .method("GET") + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::INTERNAL_SERVER_ERROR); + + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + } +}