Skip to content
This repository has been archived by the owner on Feb 27, 2024. It is now read-only.

Commit

Permalink
pag-construction: Group summaries by crossing snapshot boundary
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <moritz.hoffmann@inf.ethz.ch>
  • Loading branch information
Moritz Hoffmann committed Dec 15, 2017
1 parent e9e7c07 commit ab6dc11
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pag-construction/config_heron.py
Expand Up @@ -107,7 +107,7 @@ def src_act_name(x):
# Join summary with `worker_to_operator` to add `operator_dst` column
summary_dst_operator = summaries.join(worker_to_operator, on='dst', rsuffix='_dst')

d = self.group_summaries(summaries, ['operator',], ['normalized_count_inside_epoch'], None, None, message_filter, False, op_name)
d = self.group_summaries(summaries, ['operator',], ['crosses'], None, None, message_filter, False, op_name)
self.plot(d, titles=["Rate"], normalized=False, kind='line', stacked=False, sharey=True)

d = self.group_summaries(summaries, ['src', 'dst', 'operator'], ['count'], None, None, message_filter, False, None)
Expand Down
1 change: 0 additions & 1 deletion pag-construction/run_benchmark.py
Expand Up @@ -228,7 +228,6 @@ def normalize_paths(row, name):
# Divide weight by window size
summaries['normalized_weight'] = summaries['weight'] / window_ns
summaries['normalized_count'] = summaries['count'] / float(configuration.window)
summaries['normalized_count_inside_epoch'] = summaries['count_inside_epoch'] / float(configuration.window)
# normalize epoch number
epochs['index'] -= epoch_offset
# rename index column to epoch
Expand Down
8 changes: 5 additions & 3 deletions pag-construction/run_test.py
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python3

import argparse
import inspect
Expand Down Expand Up @@ -65,8 +65,10 @@ def parse(self):
with open(self.filename(), 'r') as txt:
for line in txt:
if line.startswith("SUMMARY"):
# epoch,activity,operator,src,dst,bc,weighted_bc,count,weight
epoch, activity, operator, worker, dst, bc, weighted_bc, count, weight = map(int, line.split(' ')[1].split(',')[0:9])
# epoch,activity,operator,src,dst,crosses,bc,weighted_bc,count,weight
epoch, activity, operator, worker, dst, crosses, bc, weighted_bc, count, weight = line.split(' ')[1].split(',')[0:10]
epoch, activity, operator, worker, dst, bc, weighted_bc, count, weight = \
map(int, [epoch, activity, operator, worker, dst, bc, weighted_bc, count, weight])
epoch_data = data.setdefault(epoch, defaultdict(lambda: 0))
epoch_data[(ACTIVITIES[activity], operator)] += bc

Expand Down
36 changes: 17 additions & 19 deletions pag-construction/src/dataflow.rs
Expand Up @@ -68,7 +68,6 @@ struct Summary<T: Abomonation> {
weighted_bc: T,
weight: u64,
count: u64,
count_inside_epoch: u64,
}

/// Trait defining `from` similarly to `From` but allowed to lose precision.
Expand Down Expand Up @@ -107,7 +106,6 @@ impl<T: Abomonation + std::ops::Add<Output = T> + Copy> std::ops::AddAssign for
weighted_bc: self.weighted_bc + other.weighted_bc,
weight: self.weight + other.weight,
count: self.count + other.count,
count_inside_epoch: self.count_inside_epoch + other.count_inside_epoch,
}
}
}
Expand Down Expand Up @@ -517,6 +515,17 @@ pub fn build_dataflow<'a, A, S>
.give_iterator(data.drain(..)
.map(|(edge, bc)| {
let w = edge.weight();
let window_size_ns = config.window_size_ns;
let window_start_time = time.time().inner;
let crosses_start = edge.source_timestamp() == window_start_time * window_size_ns - 1;
let crosses_end = edge.destination_timestamp() ==
window_start_time * window_size_ns + window_size_ns;
let crosses = match (crosses_start, crosses_end) {
(true, true) => 'B',
(true, false) => 'S',
(false, true) => 'E',
(false, false) => 'N',
};
let edge_type = match edge {
PagOutput::Edge(ref e) => {
(e.edge_type as u8,
Expand All @@ -526,26 +535,16 @@ pub fn build_dataflow<'a, A, S>
} else {
ActivityWorkers::Remote(e.source.worker_id,
e.destination.worker_id)
})
},
crosses)
}
et => panic!("Unknown input: {:?}", et),
};
let window_size_ns = config.window_size_ns;
let window_start_time = time.time().inner;
let crosses_window_boundary =
if edge.source_timestamp() == window_start_time * window_size_ns - 1 ||
edge.destination_timestamp() ==
window_start_time * window_size_ns + window_size_ns {
1
} else {
0
};
let summary = Summary {
weight: w,
bc: bc,
weighted_bc: bc * bc.same_type(ImpreciseFrom::from(w)),
count: 1,
count_inside_epoch: crosses_window_boundary,
};
(edge_type, summary)
}));
Expand All @@ -557,11 +556,11 @@ pub fn build_dataflow<'a, A, S>
|key| hash_code(key));

if index == 0 {
println!("# SUMMARY epoch,activity,operator,src,dst,bc,weighted_bc,count,weight,count_inside_epoch",);
println!("# SUMMARY epoch,activity,operator,src,dst,crosses,bc,weighted_bc,count,weight",);
}
summary_triples
.exchange(|_| 0)
.inspect_batch(move |ts, output| for &((activity_type, operator_id, ref workers),
.inspect_batch(move |ts, output| for &((activity_type, operator_id, ref workers, crosses),
ref summary) in output {
let worker_csv = match *workers {
ActivityWorkers::Local(w_id) => format!("{},{}", w_id, w_id),
Expand All @@ -572,11 +571,11 @@ pub fn build_dataflow<'a, A, S>
activity_type,
operator_id,
worker_csv,
crosses,
summary.bc,
summary.weighted_bc,
summary.count,
summary.weight,
summary.count_inside_epoch);
summary.weight);

println!("SUMMARY {}", data.to_string());
})
Expand All @@ -596,7 +595,6 @@ pub fn build_dataflow<'a, A, S>
bc: From::from(1u8),
weighted_bc: w,
count: 1,
count_inside_epoch: 0,
})
});
let sp_summary =
Expand Down

0 comments on commit ab6dc11

Please sign in to comment.