Skip to content

Commit

Permalink
[wip] paged serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelwoerister committed Sep 15, 2020
1 parent ce63252 commit 35dde74
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 125 deletions.
43 changes: 33 additions & 10 deletions analyzeme/src/profiling_data.rs
Expand Up @@ -6,7 +6,10 @@ use measureme::file_header::{
read_file_header, write_file_header, CURRENT_FILE_FORMAT_VERSION, FILE_HEADER_SIZE,
FILE_MAGIC_EVENT_STREAM,
};
use measureme::{EventId, ProfilerFiles, RawEvent, SerializationSink, StringTableBuilder};
use measureme::{
EventId, ProfilerFiles, RawEvent, SerializationSink, Shared, StringTableBuilder,
PAGE_TAG_EVENTS, PAGE_TAG_STRINGTABLE_DATA, PAGE_TAG_STRINGTABLE_INDEX,
};
use serde::{Deserialize, Deserializer};
use std::error::Error;
use std::fs;
Expand Down Expand Up @@ -46,12 +49,30 @@ impl ProfilingData {
pub fn new(path_stem: &Path) -> Result<ProfilingData, Box<dyn Error>> {
let paths = ProfilerFiles::new(path_stem);

let string_data = fs::read(paths.string_data_file).expect("couldn't read string_data file");
let index_data =
fs::read(paths.string_index_file).expect("couldn't read string_index file");
let event_data = fs::read(paths.events_file).expect("couldn't read events file");

ProfilingData::from_buffers(string_data, index_data, event_data)
let paged_path = path_stem.with_extension("mm_raw");

if paged_path.exists() {
let data = fs::read(paged_path).expect("couldn't read paged file");
let mut split_data = measureme::split_streams(&data[..]);

let string_data = split_data
.remove(&measureme::PAGE_TAG_STRINGTABLE_DATA)
.unwrap();
let index_data = split_data
.remove(&measureme::PAGE_TAG_STRINGTABLE_INDEX)
.unwrap();
let event_data = split_data.remove(&measureme::PAGE_TAG_EVENTS).unwrap();

ProfilingData::from_buffers(string_data, index_data, event_data)
} else {
let string_data =
fs::read(paths.string_data_file).expect("couldn't read string_data file");
let index_data =
fs::read(paths.string_index_file).expect("couldn't read string_index file");
let event_data = fs::read(paths.events_file).expect("couldn't read events file");

ProfilingData::from_buffers(string_data, index_data, event_data)
}
}

pub fn from_buffers(
Expand Down Expand Up @@ -207,9 +228,11 @@ pub struct ProfilingDataBuilder {

impl ProfilingDataBuilder {
pub fn new() -> ProfilingDataBuilder {
let event_sink = SerializationSink::new_in_memory();
let string_table_data_sink = Arc::new(SerializationSink::new_in_memory());
let string_table_index_sink = Arc::new(SerializationSink::new_in_memory());
let sink_shared = Shared::new_in_memory();

let event_sink = sink_shared.new_sink(PAGE_TAG_EVENTS);
let string_table_data_sink = Arc::new(sink_shared.new_sink(PAGE_TAG_STRINGTABLE_DATA));
let string_table_index_sink = Arc::new(sink_shared.new_sink(PAGE_TAG_STRINGTABLE_INDEX));

// The first thing in every file we generate must be the file header.
write_file_header(&event_sink, FILE_MAGIC_EVENT_STREAM);
Expand Down
12 changes: 7 additions & 5 deletions analyzeme/src/testing_common.rs
Expand Up @@ -96,18 +96,20 @@ fn generate_profiling_data(
})
.collect();

let expected_events: Vec<_> = threads
.into_iter()
.flat_map(|t| t.join().unwrap())
.collect();

// An example of allocating the string contents of an event id that has
// already been used
profiler.map_virtual_to_concrete_string(
event_id_virtual.to_string_id(),
profiler.alloc_string("SomeQuery"),
);

drop(profiler);

let expected_events: Vec<_> = threads
.into_iter()
.flat_map(|t| t.join().unwrap())
.collect();

expected_events
}

Expand Down
2 changes: 1 addition & 1 deletion measureme/src/file_header.rs
Expand Up @@ -5,7 +5,7 @@ use crate::SerializationSink;
use std::convert::TryInto;
use std::error::Error;

pub const CURRENT_FILE_FORMAT_VERSION: u32 = 5;
pub const CURRENT_FILE_FORMAT_VERSION: u32 = 6;
pub const FILE_MAGIC_EVENT_STREAM: &[u8; 4] = b"MMES";
pub const FILE_MAGIC_STRINGTABLE_DATA: &[u8; 4] = b"MMSD";
pub const FILE_MAGIC_STRINGTABLE_INDEX: &[u8; 4] = b"MMSI";
Expand Down
5 changes: 4 additions & 1 deletion measureme/src/lib.rs
Expand Up @@ -50,5 +50,8 @@ pub mod rustc;
pub use crate::event_id::{EventId, EventIdBuilder};
pub use crate::profiler::{Profiler, ProfilerFiles, TimingGuard};
pub use crate::raw_event::{RawEvent, MAX_INSTANT_TIMESTAMP, MAX_INTERVAL_TIMESTAMP};
pub use crate::serialization::{Addr, SerializationSink};
pub use crate::serialization::{
split_streams, Addr, SerializationSink, Shared, PAGE_SIZE, PAGE_TAG_EVENTS,
PAGE_TAG_STRINGTABLE_DATA, PAGE_TAG_STRINGTABLE_INDEX,
};
pub use crate::stringtable::{SerializableString, StringComponent, StringId, StringTableBuilder};
17 changes: 12 additions & 5 deletions measureme/src/profiler.rs
@@ -1,7 +1,7 @@
use crate::event_id::EventId;
use crate::file_header::{write_file_header, FILE_MAGIC_EVENT_STREAM};
use crate::raw_event::RawEvent;
use crate::serialization::SerializationSink;
use crate::serialization::{self, SerializationSink};
use crate::stringtable::{SerializableString, StringId, StringTableBuilder};
use std::error::Error;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -32,15 +32,22 @@ pub struct Profiler {

impl Profiler {
pub fn new<P: AsRef<Path>>(path_stem: P) -> Result<Profiler, Box<dyn Error + Send + Sync>> {
let paths = ProfilerFiles::new(path_stem.as_ref());
let event_sink = Arc::new(SerializationSink::from_path(&paths.events_file)?);
// let paths = ProfilerFiles::new(path_stem.as_ref());

let path = path_stem.as_ref().with_extension("mm_raw");

let sink_shared = serialization::Shared::from_path(&path)?;

let event_sink = Arc::new(sink_shared.new_sink(serialization::PAGE_TAG_EVENTS));

// let event_sink = Arc::new(SerializationSink::from_path(&paths.events_file)?);

// The first thing in every file we generate must be the file header.
write_file_header(&*event_sink, FILE_MAGIC_EVENT_STREAM);

let string_table = StringTableBuilder::new(
Arc::new(SerializationSink::from_path(&paths.string_data_file)?),
Arc::new(SerializationSink::from_path(&paths.string_index_file)?),
Arc::new(sink_shared.new_sink(serialization::PAGE_TAG_STRINGTABLE_DATA)),
Arc::new(sink_shared.new_sink(serialization::PAGE_TAG_STRINGTABLE_INDEX)),
);

let profiler = Profiler {
Expand Down

0 comments on commit 35dde74

Please sign in to comment.