Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use Arc<Version> during pin and unpin #549

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/storage/secondary/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ impl Compactor {
loop {
{
let tables = self.storage.tables.read().clone();
let (epoch, snapshot) = self.storage.version.pin();
let version = self.storage.version.pin();
let snapshot = version.as_ref().snapshot.as_ref();
for (_, table) in tables {
if let Some(_guard) = self
.storage
Expand All @@ -189,7 +190,6 @@ impl Compactor {
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break,
_ => {}
}
self.storage.version.unpin(epoch);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
10 changes: 2 additions & 8 deletions src/storage/secondary/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ pub struct SecondaryTransaction {
/// Snapshot content
snapshot: Arc<Snapshot>,

/// Epoch of the snapshot
epoch: u64,

/// The rowsets produced in the txn.
to_be_committed_rowsets: Vec<DiskRowset>,

Expand All @@ -70,15 +67,15 @@ impl SecondaryTransaction {
update: bool,
) -> StorageResult<Self> {
// pin a snapshot at version manager
let (epoch, snapshot) = table.version.pin();
let version = table.version.pin();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should store version inside txn struct, otherwise it will get immediately unpinned once txn is created.

let snapshot = version.as_ref().snapshot.clone();

Ok(Self {
finished: false,
mem: None,
delete_buffer: vec![],
table: table.clone(),
version: table.version.clone(),
epoch,
snapshot,
delete_lock: if update {
Some(table.lock_for_deletion().await)
Expand Down Expand Up @@ -187,7 +184,6 @@ impl SecondaryTransaction {
self.version.commit_changes(changeset).await?;

self.finished = true;
self.version.unpin(self.epoch);

Ok(())
}
Expand Down Expand Up @@ -384,7 +380,6 @@ impl Transaction for SecondaryTransaction {
fn abort<'a>(mut self) -> Self::AbortResultFuture<'a> {
async move {
self.finished = true;
self.version.unpin(self.epoch);
Ok(())
}
}
Expand All @@ -394,7 +389,6 @@ impl Drop for SecondaryTransaction {
fn drop(&mut self) {
if !self.finished {
warn!("Transaction dropped without committing or aborting");
self.version.unpin(self.epoch);
}
}
}
60 changes: 38 additions & 22 deletions src/storage/secondary/version_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ pub struct VersionManagerInner {
pub struct VersionManager {
/// Inner structure of `VersionManager`. This structure is protected by a parking lot Mutex, so
/// as to support quick lock and unlock.
inner: PLMutex<VersionManagerInner>,
inner: Arc<PLMutex<VersionManagerInner>>,

/// Manifest file. We only allow one thread to commit changes, and `commit_changes` will hold
/// this lock until complete. As the commit procedure involves async waiting, we need to use an
Expand All @@ -159,7 +159,7 @@ impl VersionManager {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Self {
manifest: Mutex::new(manifest),
inner: PLMutex::new(VersionManagerInner::default()),
inner: Arc::from(PLMutex::new(VersionManagerInner::default())),
tx,
rx: PLMutex::new(Some(rx)),
storage_options,
Expand Down Expand Up @@ -255,29 +255,18 @@ impl VersionManager {
}

/// Pin a snapshot of one epoch, so that all files at this epoch won't be deleted.
pub fn pin(&self) -> (u64, Arc<Snapshot>) {
/// Unpin will be done during Dropping Arc<Version>
pub fn pin(&self) -> Arc<Version> {
let mut inner = self.inner.lock();
let epoch = inner.epoch;
*inner.ref_cnt.entry(epoch).or_default() += 1;
(epoch, inner.status.get(&epoch).unwrap().clone())
}

/// Unpin a snapshot of one epoch. When reference counter becomes 0, files might be vacuumed.
pub fn unpin(&self, epoch: u64) {
let mut inner = self.inner.lock();
let ref_cnt = inner
.ref_cnt
.get_mut(&epoch)
.expect("epoch not registered!");
*ref_cnt -= 1;
if *ref_cnt == 0 {
inner.ref_cnt.remove(&epoch).unwrap();

if epoch != inner.epoch {
// TODO: precisely pass the epoch number that can be vacuum.
self.tx.send(()).unwrap();
}
}
let v = Version {
epoch,
snapshot: inner.status.get(&epoch).unwrap().clone(),
inner: self.inner.clone(),
tx: self.tx.clone(),
};
Arc::from(v)
}

pub fn get_rowset(&self, table_id: u32, rowset_id: u32) -> Arc<DiskRowset> {
Expand Down Expand Up @@ -348,3 +337,30 @@ impl VersionManager {
Ok(())
}
}

pub struct Version {
pub epoch: u64,
pub snapshot: Arc<Snapshot>,
inner: Arc<PLMutex<VersionManagerInner>>,
tx: tokio::sync::mpsc::UnboundedSender<()>,
}

impl Drop for Version {
/// Unpin a snapshot of one epoch. When reference counter becomes 0, files might be vacuumed.
fn drop(&mut self) {
let mut inner = self.inner.lock();
let ref_cnt = inner
.ref_cnt
.get_mut(&self.epoch)
.expect("epoch not registered!");
*ref_cnt -= 1;
if *ref_cnt == 0 {
inner.ref_cnt.remove(&self.epoch).unwrap();

if self.epoch != inner.epoch {
// TODO: precisely pass the epoch number that can be vacuum.
self.tx.send(()).unwrap();
}
}
}
}