Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added option to only run the compressor on a range of state groups in a room #44

Merged
merged 14 commits into from
Aug 12, 2021
19 changes: 19 additions & 0 deletions src/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ impl<'a> Compressor<'a> {
pb.enable_steady_tick(100);

for (&state_group, entry) in self.original_state_map {
// Check whether this entry is in_range or is just present in the map due to being
// a predecessor of a group that IS in_range for compression
if !entry.in_range {
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
let new_entry = StateGroupEntry {
// in_range is kept the same so that the new entry is equal to the old entry
// otherwise it might trigger a useless database transaction
in_range: entry.in_range,
prev_state_group: entry.prev_state_group,
state_map: entry.state_map.clone(),
};
// Paranoidly assert that not making changes to this entry
// could probably be removed...
assert!(new_entry == *entry);
self.new_state_group_map.insert(state_group, new_entry);

continue;
}
let mut prev_state_group = None;
for level in &mut self.levels {
if level.has_space() {
Expand All @@ -162,6 +179,7 @@ impl<'a> Compressor<'a> {
self.new_state_group_map.insert(
state_group,
StateGroupEntry {
in_range: true,
prev_state_group,
state_map: delta,
},
Expand Down Expand Up @@ -239,6 +257,7 @@ fn test_new_map() {
initial.insert(
i,
StateGroupEntry {
in_range: true,
prev_state_group: prev,
state_map: StateMap::new(),
},
Expand Down
201 changes: 150 additions & 51 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,62 @@ use super::StateGroupEntry;
/// specific room.
///
/// - Connects to the database
/// - Fetches rows with group id lower than max
/// - Fetches the first [group] rows with group id after [min]
/// - Recursively searches for missing predecessors and adds those
///
/// Returns with the state_group map and the id of the last group that was used
///
/// # Arguments
///
/// * `room_id` - The ID of the room in the database
/// * `db_url` - The URL of a Postgres database. This should be of the
/// form: "postgresql://user:pass@domain:port/database"
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number. (N.B. all
/// predecessors are also fetched)
/// * `room_id` - The ID of the room in the database
/// * `db_url` - The URL of a Postgres database. This should be of the
/// form: "postgresql://user:pass@domain:port/database"
/// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number.
pub fn get_data_from_db(
db_url: &str,
room_id: &str,
min_state_group: Option<i64>,
groups_to_compress: Option<i64>,
max_state_group: Option<i64>,
) -> BTreeMap<i64, StateGroupEntry> {
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
// connect to the database
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
builder.set_verify(SslVerifyMode::NONE);
let connector = MakeTlsConnector::new(builder.build());

let mut client = Client::connect(db_url, connector).unwrap();

let mut state_group_map = get_initial_data_from_db(&mut client, room_id, max_state_group);
// Search for the group id of the groups_to_compress'th group after min_state_group
// If this is saved, then the compressor can continue by having min_state_group being
// set to this maximum
let max_group_found = find_max_group(
&mut client,
room_id,
min_state_group,
groups_to_compress,
max_state_group,
);

let mut state_group_map =
get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found);

println!("Got initial state from database. Checking for any missing state groups...");

// Due to reasons some of the state groups appear in the edges table, but
// not in the state_groups_state table. This means they don't get included
// in our DB queries, so we have to fetch any missing groups explicitly.
// not in the state_groups_state table.
//
// Also it is likely that the predecessor of a node will not be within the
// chunk that was specified by min_state_group and groups_to_compress.
// This means they don't get included in our DB queries, so we have to fetch
// any missing groups explicitly.
//
// Since the returned groups may themselves reference groups we don't have,
// we need to do this recursively until we don't find any more missing.
//
// N.B. This does NOT currently fetch the deltas for the missing groups!
// By carefully chosen max_state_group this might cause issues...?
loop {
let mut missing_sgs: Vec<_> = state_group_map
.iter()
Expand All @@ -76,41 +98,90 @@ pub fn get_data_from_db(
.collect();

if missing_sgs.is_empty() {
println!("No missing state groups");
// println!("No missing state groups");
break;
}

missing_sgs.sort_unstable();
missing_sgs.dedup();

println!("Missing {} state groups", missing_sgs.len());
// println!("Missing {} state groups", missing_sgs.len());

let map = get_missing_from_db(&mut client, &missing_sgs);
state_group_map.extend(map.into_iter());
// find state groups not picked up already and add them to the map
let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found);
for (k, v) in map {
state_group_map.entry(k).or_insert(v);
}
}

state_group_map
(state_group_map, max_group_found)
}

/// Returns the group ID of the last group to be compressed
///
/// This can be saved so that future runs of the compressor only
/// continue from after this point
///
/// # Arguments
///
/// * `client` - A Postgres client to make requests with
/// * `room_id` - The ID of the room in the database
/// * `min_state_group` - The lower limit (non inclusive) of group id's to compress
/// * 'groups_to_compress' - How many groups to compress
/// * `max_state_group` - The upper bound on what this method can return
fn find_max_group(
client: &mut Client,
room_id: &str,
min_state_group: Option<i64>,
groups_to_compress: Option<i64>,
max_state_group: Option<i64>,
) -> i64 {
// Get list of state_id's in a certain room
let mut sql = "SELECT id FROM state_groups WHERE room_id = $1".to_string();

if let Some(max) = max_state_group {
sql = format!("{} AND id <= {}", sql, max)
}

// Adds additional constraint if a groups_to_compress has been specified
// Then sends query to the datatbase
let rows = if let (Some(min), Some(count)) = (min_state_group, groups_to_compress) {
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &count];
client.query_raw(format!(r"{} AND id > $2 LIMIT $3", sql).as_str(), params)
Azrenbeth marked this conversation as resolved.
Show resolved Hide resolved
} else {
client.query_raw(
Azrenbeth marked this conversation as resolved.
Show resolved Hide resolved
format!(r"{} ORDER BY id DESC LIMIT 1", sql).as_str(),
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
&[room_id],
)
}
.unwrap();

let final_row = rows.last().unwrap().unwrap();
final_row.get(0)
Azrenbeth marked this conversation as resolved.
Show resolved Hide resolved
}

/// Fetch the entries in state_groups_state and immediate predecessors for
/// a specific room.
///
/// - Fetches rows with group id lower than max
/// - Fetches first [groups_to_compress] rows with group id higher than min
/// - Stores the group id, predecessor id and deltas into a map
/// - returns map and maximum row that was considered
///
/// # Arguments
///
/// * `client` - A Postgres client to make requests with
/// * `room_id` - The ID of the room in the database
/// * `max_state_group` - If specified, then only fetch the entries for state
/// groups lower than or equal to this number. (N.B. doesn't
/// fetch IMMEDIATE predecessors if ID is above this number)
/// * `min_state_group` - If specified, then only fetch the entries for state
/// groups greater than (but not equal) to this number. It
/// also requires groups_to_compress to be specified
/// * 'max_group_found' - The upper limit on state_groups ids to get from the database
fn get_initial_data_from_db(
client: &mut Client,
room_id: &str,
max_state_group: Option<i64>,
min_state_group: Option<i64>,
max_group_found: i64,
) -> BTreeMap<i64, StateGroupEntry> {
// Query to get id, predecessor and delta for each state group
// Query to get id, predecessor and deltas for each state group
let sql = r#"
SELECT m.id, prev_state_group, type, state_key, s.event_id
FROM state_groups AS m
Expand All @@ -119,18 +190,21 @@ fn get_initial_data_from_db(
WHERE m.room_id = $1
"#;

// Adds additional constraint if a max_state_group has been specified
// Then sends query to the datatbase
let mut rows = if let Some(s) = max_state_group {
let params: Vec<&dyn ToSql> = vec![&room_id, &s];
client.query_raw(format!(r"{} AND m.id <= $2", sql).as_str(), params)
// Adds additional constraint if minimum state_group has been specified.
// note that the maximum group only affects queries if there is also a minimum
// otherwise it is assumed that ALL groups should be fetched
let mut rows = if let Some(min) = min_state_group {
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found];
client.query_raw(
format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(),
params,
)
} else {
client.query_raw(sql, &[room_id])
}
.unwrap();

// Copy the data from the database into a map

let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();

let pb = ProgressBar::new_spinner();
Expand All @@ -143,8 +217,10 @@ fn get_initial_data_from_db(
// The row in the map to copy the data to
let entry = state_group_map.entry(row.get(0)).or_default();

// Save the predecessor (this may already be there)
// Save the predecessor and mark for compression (this may already be there)
// TODO: slightly fewer redundant rewrites
entry.prev_state_group = row.get(1);
entry.in_range = true;

// Copy the single delta from the predecessor stored in this row
if let Some(etype) = row.get::<_, Option<String>>(2) {
Expand Down Expand Up @@ -172,34 +248,57 @@ fn get_initial_data_from_db(
///
/// * `client` - A Postgres client to make requests with
/// * `missing_sgs` - An array of missing state_group ids
fn get_missing_from_db(client: &mut Client, missing_sgs: &[i64]) -> BTreeMap<i64, StateGroupEntry> {
let mut rows = client
.query_raw(
r#"
SELECT state_group, prev_state_group
FROM state_group_edges
WHERE state_group = ANY($1)
"#,
&[missing_sgs],
)
.unwrap();
/// * 'min_state_group' - Minimum state_group id to mark as in range
/// * 'max_group_found' - Maximum state_group id to mark as in range
fn get_missing_from_db(
client: &mut Client,
missing_sgs: &[i64],
min_state_group: Option<i64>,
max_group_found: i64,
) -> BTreeMap<i64, StateGroupEntry> {
// "Due to reasons" it is possible that some states only appear in edges table and not in state_groups table
// so since we know the IDs we're looking for as they are the missing predecessors, we can find them by
// left joining onto the edges table (instead of the state_group table!)
let sql = r#"
SELECT target.prev_state_group, source.prev_state_group, state.type, state.state_key, state.event_id
FROM state_group_edges AS target
LEFT JOIN state_group_edges AS source ON (target.prev_state_group = source.state_group)
LEFT JOIN state_groups_state AS state ON (target.prev_state_group = state.state_group)
WHERE target.prev_state_group = ANY($1)
"#;

let mut rows = client.query_raw(sql, &[missing_sgs]).unwrap();

// initialise the map with empty entries (the missing group may not
// have a prev_state_group either)
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = missing_sgs
.iter()
.map(|sg| (*sg, StateGroupEntry::default()))
.collect();
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();

while let Some(row) = rows.next().unwrap() {
let state_group = row.get(0);
let entry = state_group_map.get_mut(&state_group).unwrap();
let id = row.get(0);
// The row in the map to copy the data to
let entry = state_group_map.entry(id).or_default();

// Save the predecessor and mark for compression (this may already be there)
// Also may well not exist!
entry.prev_state_group = row.get(1);
if let Some(min) = min_state_group {
if min < id && id <= max_group_found {
entry.in_range = true
}
}

// Copy the single delta from the predecessor stored in this row
if let Some(etype) = row.get::<_, Option<String>>(2) {
entry.state_map.insert(
&etype,
&row.get::<_, String>(3),
row.get::<_, String>(4).into(),
);
}
}

state_group_map
}

// TODO: find a library that has an existing safe postgres escape function
/// Helper function that escapes the wrapped text when writing SQL
pub struct PGEscape<'a>(pub &'a str);

Expand Down
6 changes: 3 additions & 3 deletions src/graphing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ fn output_csv(groups: &Graph, edges_output: &mut File, nodes_output: &mut File)
/// * `after` - A map from state group ids to StateGroupEntries
/// the information from this map goes into after_edges.csv
/// and after_nodes.csv
pub fn make_graphs(before: Graph, after: Graph) {
pub fn make_graphs(before: &Graph, after: &Graph) {
// Open all the files to output to
let mut before_edges_file = File::create("before_edges.csv").unwrap();
let mut before_nodes_file = File::create("before_nodes.csv").unwrap();
let mut after_edges_file = File::create("after_edges.csv").unwrap();
let mut after_nodes_file = File::create("after_nodes.csv").unwrap();

// Write before's information to before_edges and before_nodes
output_csv(&before, &mut before_edges_file, &mut before_nodes_file);
output_csv(before, &mut before_edges_file, &mut before_nodes_file);
// Write afters's information to after_edges and after_nodes
output_csv(&after, &mut after_edges_file, &mut after_nodes_file);
output_csv(after, &mut after_edges_file, &mut after_nodes_file);
}
Loading