Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: use Arc<Version> during pin and unpin #685

Merged
merged 1 commit into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/storage/secondary/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ impl Compactor {
loop {
{
let tables = self.storage.tables.read().clone();
let (epoch, snapshot) = self.storage.version.pin();
let pin_version = self.storage.version.pin();
for (_, table) in tables {
if let Some(_guard) = self
.storage
.txn_mgr
.try_lock_for_compaction(table.table_id())
{
if let Err(err) = self.compact_table(&snapshot, table).await {
if let Err(err) = self.compact_table(&pin_version.snapshot, table).await {
warn!("failed to compact: {:?}", err);
}
}
Expand All @@ -196,7 +196,6 @@ impl Compactor {
Err(tokio::sync::oneshot::error::TryRecvError::Closed) => break,
_ => {}
}
self.storage.version.unpin(epoch);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
11 changes: 6 additions & 5 deletions src/storage/secondary/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,16 +278,19 @@ impl SecondaryStorage {

changeset.push(EpochOp::DropTable(entry));

let (epoch, snapshot) = self.version.pin();
let pin_version = self.version.pin();

if let Some(rowsets) = snapshot.get_rowsets_of(table_id.table_id) {
if let Some(rowsets) = pin_version.snapshot.get_rowsets_of(table_id.table_id) {
for rowset_id in rowsets {
changeset.push(EpochOp::DeleteRowSet(DeleteRowsetEntry {
table_id,
rowset_id: *rowset_id,
}));

if let Some(dvs) = snapshot.get_dvs_of(table_id.table_id, *rowset_id) {
if let Some(dvs) = pin_version
.snapshot
.get_dvs_of(table_id.table_id, *rowset_id)
{
for dv_id in dvs {
changeset.push(EpochOp::DeleteDV(DeleteDVEntry {
table_id,
Expand All @@ -299,8 +302,6 @@ impl SecondaryStorage {
}
}

self.version.unpin(epoch);

// and then persist to manifest
self.version.commit_changes(changeset).await?;

Expand Down
18 changes: 7 additions & 11 deletions src/storage/secondary/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use risinglight_proto::rowset::DeleteRecord;
use tokio::sync::OwnedMutexGuard;
use tracing::{info, warn};

use super::version_manager::{Snapshot, VersionManager};
use super::version_manager::{Snapshot, Version, VersionManager};
use super::{
AddDVEntry, AddRowSetEntry, ColumnBuilderOptions, ConcatIterator, DeleteVector, DiskRowset,
EpochOp, MergeIterator, RowSetIterator, SecondaryMemRowsetImpl, SecondaryRowHandler,
Expand Down Expand Up @@ -45,9 +45,6 @@ pub struct SecondaryTransaction {
/// Snapshot content
snapshot: Arc<Snapshot>,

/// Epoch of the snapshot
epoch: u64,

/// The rowsets produced in the txn.
to_be_committed_rowsets: Vec<DiskRowset>,

Expand All @@ -59,6 +56,9 @@ pub struct SecondaryTransaction {
///
/// TODO: we only calculate batch insert here. Need to estimate delete vector size.
total_size: usize,

/// Reference version.
_pin_version: Arc<Version>,
}

impl SecondaryTransaction {
Expand All @@ -70,16 +70,14 @@ impl SecondaryTransaction {
update: bool,
) -> StorageResult<Self> {
// pin a snapshot at version manager
let (epoch, snapshot) = table.version.pin();

let pin_version = table.version.pin();
Ok(Self {
finished: false,
mem: None,
delete_buffer: vec![],
table: table.clone(),
version: table.version.clone(),
epoch,
snapshot,
snapshot: pin_version.snapshot.clone(),
delete_lock: if update {
Some(table.lock_for_deletion().await)
} else {
Expand All @@ -88,6 +86,7 @@ impl SecondaryTransaction {
to_be_committed_rowsets: vec![],
read_only,
total_size: 0,
_pin_version: pin_version,
})
}

Expand Down Expand Up @@ -214,7 +213,6 @@ impl SecondaryTransaction {
self.version.commit_changes(changeset).await?;

self.finished = true;
self.version.unpin(self.epoch);

Ok(())
}
Expand Down Expand Up @@ -409,7 +407,6 @@ impl Transaction for SecondaryTransaction {
fn abort<'a>(mut self) -> Self::AbortResultFuture<'a> {
async move {
self.finished = true;
self.version.unpin(self.epoch);
Ok(())
}
}
Expand All @@ -419,7 +416,6 @@ impl Drop for SecondaryTransaction {
fn drop(&mut self) {
if !self.finished {
warn!("Transaction dropped without committing or aborting");
self.version.unpin(self.epoch);
}
}
}
58 changes: 36 additions & 22 deletions src/storage/secondary/version_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub struct VersionManagerInner {
pub struct VersionManager {
/// Inner structure of `VersionManager`. This structure is protected by a parking lot Mutex, so
/// as to support quick lock and unlock.
inner: PLMutex<VersionManagerInner>,
inner: Arc<PLMutex<VersionManagerInner>>,

/// Manifest file. We only allow one thread to commit changes, and `commit_changes` will hold
/// this lock until complete. As the commit procedure involves async waiting, we need to use an
Expand All @@ -172,7 +172,7 @@ impl VersionManager {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Self {
manifest: Mutex::new(manifest),
inner: PLMutex::new(VersionManagerInner::default()),
inner: Arc::new(PLMutex::new(VersionManagerInner::default())),
tx,
rx: PLMutex::new(Some(rx)),
storage_options,
Expand Down Expand Up @@ -268,29 +268,16 @@ impl VersionManager {
}

/// Pin a snapshot of one epoch, so that all files at this epoch won't be deleted.
pub fn pin(&self) -> (u64, Arc<Snapshot>) {
pub fn pin(&self) -> Arc<Version> {
let mut inner = self.inner.lock();
let epoch = inner.epoch;
*inner.ref_cnt.entry(epoch).or_default() += 1;
(epoch, inner.status.get(&epoch).unwrap().clone())
}

/// Unpin a snapshot of one epoch. When reference counter becomes 0, files might be vacuumed.
pub fn unpin(&self, epoch: u64) {
let mut inner = self.inner.lock();
let ref_cnt = inner
.ref_cnt
.get_mut(&epoch)
.expect("epoch not registered!");
*ref_cnt -= 1;
if *ref_cnt == 0 {
inner.ref_cnt.remove(&epoch).unwrap();

if epoch != inner.epoch {
// TODO: precisely pass the epoch number that can be vacuum.
self.tx.send(()).unwrap();
}
}
Arc::new(Version {
epoch,
snapshot: inner.status.get(&epoch).unwrap().clone(),
inner: self.inner.clone(),
tx: self.tx.clone(),
})
}

pub fn get_rowset(&self, table_id: u32, rowset_id: u32) -> Arc<DiskRowset> {
Expand Down Expand Up @@ -366,3 +353,30 @@ impl VersionManager {
Ok(())
}
}

pub struct Version {
pub epoch: u64,
pub snapshot: Arc<Snapshot>,
inner: Arc<PLMutex<VersionManagerInner>>,
tx: tokio::sync::mpsc::UnboundedSender<()>,
}

impl Drop for Version {
/// Unpin a snapshot of one epoch. When reference counter becomes 0, files might be vacuumed.
fn drop(&mut self) {
let mut inner = self.inner.lock();
let ref_cnt = inner
.ref_cnt
.get_mut(&self.epoch)
.expect("epoch not registered!");
*ref_cnt -= 1;
if *ref_cnt == 0 {
inner.ref_cnt.remove(&self.epoch).unwrap();

if self.epoch != inner.epoch {
// TODO: precisely pass the epoch number that can be vacuum.
self.tx.send(()).unwrap();
}
}
}
}