Skip to content

Commit

Permalink
make module refresh as background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
yun-yeo committed Feb 21, 2022
1 parent 16eae76 commit 78026fd
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 79 deletions.
42 changes: 21 additions & 21 deletions packages/vm/src/cache.rs
Expand Up @@ -205,33 +205,28 @@ where
}

// Try to get module from the memory cache
let store = make_runtime_store(Some(cache.instance_memory_limit));
if let Some(module) = cache.memory_cache.load(checksum, &store)? {
let instance_memory_limit = cache.instance_memory_limit;
if let Some(module) = cache
.memory_cache
.load(checksum, Some(instance_memory_limit))?
{
cache.stats.hits_memory_cache += 1;
let module_size = loupe::size_of_val(&module);
return cache
.pinned_memory_cache
.store(checksum, module, module_size);
return cache.pinned_memory_cache.store(checksum, module);
}

// Try to get module from file system cache
let store = make_runtime_store(Some(cache.instance_memory_limit));
if let Some(module) = cache.fs_cache.load(checksum, &store)? {
cache.stats.hits_fs_cache += 1;
let module_size = loupe::size_of_val(&module);
return cache
.pinned_memory_cache
.store(checksum, module, module_size);
return cache.pinned_memory_cache.store(checksum, module);
}

// Re-compile from original Wasm bytecode
let code = self.load_wasm_with_path(&cache.wasm_path, checksum)?;
let module = compile(&code, Some(cache.instance_memory_limit))?;
// Store into the fs cache too
cache.fs_cache.store(checksum, &module)?;
let module_size = loupe::size_of_val(&module);
cache
.pinned_memory_cache
.store(checksum, module, module_size)
cache.pinned_memory_cache.store(checksum, module)
}

/// Unpins a Module, i.e. removes it from the pinned memory cache.
Expand All @@ -255,31 +250,37 @@ where
options: InstanceOptions,
) -> VmResult<Instance<A, S, Q>> {
let mut cache = self.inner.lock().unwrap();
let store = make_runtime_store(Some(cache.instance_memory_limit));
let instance_memory_limit = cache.instance_memory_limit;

// Try to get module from the pinned memory cache
if let Some(module) = cache.pinned_memory_cache.load(checksum, &store)? {
if let Some(module) = cache
.pinned_memory_cache
.load(checksum, Some(instance_memory_limit))?
{
cache.stats.hits_pinned_memory_cache += 1;
let instance =
Instance::from_module(&module, backend, options.gas_limit, options.print_debug)?;
return Ok(instance);
}

// Get module from memory cache
if let Some(module) = cache.memory_cache.load(checksum, &store)? {
if let Some(module) = cache
.memory_cache
.load(checksum, Some(instance_memory_limit))?
{
cache.stats.hits_memory_cache += 1;
let instance =
Instance::from_module(&module, backend, options.gas_limit, options.print_debug)?;
return Ok(instance);
}

// Get module from file system cache
let store = make_runtime_store(Some(cache.instance_memory_limit));
if let Some(module) = cache.fs_cache.load(checksum, &store)? {
cache.stats.hits_fs_cache += 1;
let instance =
Instance::from_module(&module, backend, options.gas_limit, options.print_debug)?;
let module_size = loupe::size_of_val(&module);
cache.memory_cache.store(checksum, module, module_size)?;
cache.memory_cache.store(checksum, module)?;
return Ok(instance);
}

Expand All @@ -294,8 +295,7 @@ where
let instance =
Instance::from_module(&module, backend, options.gas_limit, options.print_debug)?;
cache.fs_cache.store(checksum, &module)?;
let module_size = loupe::size_of_val(&module);
cache.memory_cache.store(checksum, module, module_size)?;
cache.memory_cache.store(checksum, module)?;
Ok(instance)
}
}
Expand Down
102 changes: 68 additions & 34 deletions packages/vm/src/modules/in_memory_cache.rs
@@ -1,9 +1,12 @@
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use std::collections::hash_map::RandomState;
use std::num::NonZeroUsize;
use wasmer::{Module, Store};
use std::sync::{Arc, Mutex};
use std::thread;
use wasmer::Module;

use super::sized_artifact::SizedArtifact;
use super::sized_artifact::{SharedModule, SizedArtifact};
use crate::wasm_backend::make_runtime_store;
use crate::{Checksum, Size, VmError, VmResult};

// Minimum module size.
Expand Down Expand Up @@ -49,14 +52,20 @@ impl InMemoryCache {
}
}

pub fn store(&mut self, checksum: &Checksum, module: Module, size: usize) -> VmResult<()> {
pub fn store(&mut self, checksum: &Checksum, module: Module) -> VmResult<()> {
if let Some(artifacts) = &mut self.artifacts {
let artifact = module.serialize()?;
let size = loupe::size_of_val(&module) + loupe::size_of_val(&artifact);
artifacts
.put_with_weight(
*checksum,
SizedArtifact {
artifact: module.serialize()?,
size,
artifact: Arc::new(artifact),
shared_module: Arc::new(Mutex::new(SharedModule {
module,
refreshing: false,
})),
},
)
.map_err(|e| VmError::cache_err(format!("{:?}", e)))?;
Expand All @@ -65,12 +74,35 @@ impl InMemoryCache {
}

/// Looks up a module in the cache and creates a new module
pub fn load(&mut self, checksum: &Checksum, store: &Store) -> VmResult<Option<Module>> {
pub fn load(
&mut self,
checksum: &Checksum,
instance_memory_limit: Option<Size>,
) -> VmResult<Option<Module>> {
if let Some(artifacts) = &mut self.artifacts {
match artifacts.get(checksum) {
Some(sized_artifact) => Ok(Some(unsafe {
Module::deserialize(store, &sized_artifact.artifact)
}?)),
Some(sized_artifact) => {
let mut shared_module = sized_artifact.shared_module.lock().unwrap();
let module = shared_module.module.clone();
if !shared_module.refreshing {
(*shared_module).refreshing = true;

// make background tread to recreate module from artifact
let artifact = sized_artifact.artifact.clone();
let shared_module = sized_artifact.shared_module.clone();
thread::spawn(move || {
let store = make_runtime_store(instance_memory_limit);
let module = unsafe { Module::deserialize(&store, &artifact) }.unwrap();

// hold lock to replace shared module
let mut shared_module = shared_module.lock().unwrap();
(*shared_module).refreshing = false;
(*shared_module).module = module;
});
}

Ok(Some(module))
}
None => Ok(None),
}
} else {
Expand Down Expand Up @@ -102,15 +134,14 @@ impl InMemoryCache {
mod tests {
use super::*;
use crate::size::Size;
use crate::wasm_backend::{compile, make_runtime_store};
use crate::wasm_backend::compile;
use std::mem;
use wasmer::{imports, Instance as WasmerInstance};
use wasmer_middlewares::metering::set_remaining_points;

const TESTING_MEMORY_LIMIT: Option<Size> = Some(Size::mebi(16));
const TESTING_GAS_LIMIT: u64 = 5_000;
// Based on `examples/module_size.sh`
const TESTING_WASM_SIZE_FACTOR: usize = 18;

#[test]
fn check_element_sizes() {
Expand Down Expand Up @@ -147,8 +178,7 @@ mod tests {
let checksum = Checksum::generate(&wasm);

// Module does not exist
let store = make_runtime_store(TESTING_MEMORY_LIMIT);
let cache_entry = cache.load(&checksum, &store).unwrap();
let cache_entry = cache.load(&checksum, TESTING_MEMORY_LIMIT).unwrap();
assert!(cache_entry.is_none());

// Compile module
Expand All @@ -164,12 +194,13 @@ mod tests {
}

// Store module
let size = wasm.len() * TESTING_WASM_SIZE_FACTOR;
cache.store(&checksum, original, size).unwrap();
cache.store(&checksum, original).unwrap();

// Load module
let store = make_runtime_store(TESTING_MEMORY_LIMIT);
let cached = cache.load(&checksum, &store).unwrap().unwrap();
let cached = cache
.load(&checksum, TESTING_MEMORY_LIMIT)
.unwrap()
.unwrap();

// Ensure cached module can be executed
{
Expand All @@ -183,7 +214,7 @@ mod tests {

#[test]
fn len_works() {
let mut cache = InMemoryCache::new(Size::mebi(2));
let mut cache = InMemoryCache::new(Size::kilo(8));

// Create module
let wasm1 = wat::parse_str(
Expand Down Expand Up @@ -224,26 +255,26 @@ mod tests {

// Add 1
cache
.store(&checksum1, compile(&wasm1, None).unwrap(), 900_000)
.store(&checksum1, compile(&wasm1, None).unwrap())
.unwrap();
assert_eq!(cache.len(), 1);

// Add 2
cache
.store(&checksum2, compile(&wasm2, None).unwrap(), 900_000)
.store(&checksum2, compile(&wasm2, None).unwrap())
.unwrap();
assert_eq!(cache.len(), 2);
assert_eq!(cache.len(), 1);

// Add 3 (pushes out the previous two)
cache
.store(&checksum3, compile(&wasm3, None).unwrap(), 1_500_000)
.store(&checksum3, compile(&wasm3, None).unwrap())
.unwrap();
assert_eq!(cache.len(), 1);
}

#[test]
fn size_works() {
let mut cache = InMemoryCache::new(Size::mebi(2));
let mut cache = InMemoryCache::new(Size::mebi(6));

// Create module
let wasm1 = wat::parse_str(
Expand Down Expand Up @@ -283,21 +314,24 @@ mod tests {
assert_eq!(cache.size(), 0);

// Add 1
cache
.store(&checksum1, compile(&wasm1, None).unwrap(), 900_000)
.unwrap();
assert_eq!(cache.size(), 900_000);
let module1 = compile(&wasm1, None).unwrap();
let module1_size =
loupe::size_of_val(&module1) + loupe::size_of_val(&module1.serialize().unwrap());
cache.store(&checksum1, module1).unwrap();
assert_eq!(cache.size(), module1_size);

// Add 2
cache
.store(&checksum2, compile(&wasm2, None).unwrap(), 800_000)
.unwrap();
assert_eq!(cache.size(), 1_700_000);
let module2 = compile(&wasm2, None).unwrap();
let module2_size =
loupe::size_of_val(&module2) + loupe::size_of_val(&module2.serialize().unwrap());
cache.store(&checksum2, module2).unwrap();
assert_eq!(cache.size(), module2_size + module1_size);

// Add 3 (pushes out the previous two)
cache
.store(&checksum3, compile(&wasm3, None).unwrap(), 1_500_000)
.unwrap();
assert_eq!(cache.size(), 1_500_000);
let module3 = compile(&wasm3, None).unwrap();
let module3_size =
loupe::size_of_val(&module3) + loupe::size_of_val(&module3.serialize().unwrap());
cache.store(&checksum3, module3).unwrap();
assert_eq!(cache.size(), module3_size + module2_size + module1_size);
}
}

0 comments on commit 78026fd

Please sign in to comment.