This repository has been archived by the owner on Jul 3, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmod.rs
350 lines (307 loc) · 11.4 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
//! Generators for the models usd in the Nexmark benchmark suite.
//!
//! Based on the equivalent [Nexmark Flink generator API](https://github.com/nexmark/nexmark/blob/v0.2.0/nexmark-flink/src/main/java/com/github/nexmark/flink/generator).
use self::config::Config;
use super::model::Event;
use anyhow::Result;
use arcstr::ArcStr;
use bids::CHANNELS_NUMBER;
use cached::SizedCache;
use rand::Rng;
mod auctions;
mod bids;
pub mod config;
mod people;
mod price;
mod strings;
pub struct NexmarkGenerator<R: Rng> {
/// Configuration to generate events against. Note that it may be replaced
/// by a call to `splitAtEventId`.
config: Config,
rng: R,
/// The memory cache used when creating bid channels.
bid_channel_cache: SizedCache<u32, (ArcStr, ArcStr)>,
/// Number of events generated by this generator.
/// Note that when there are multiple generators working in parallel, the
/// events count for each generator is less than the events generated by the
/// source.
events_count_so_far: u64,
/// Wallclock time at which we emit the first event (ms since epoch).
/// Set when generator created.
wallclock_base_time: u64,
}
impl<R: Rng> NexmarkGenerator<R> {
pub fn has_next(&self) -> bool {
self.get_next_event_id() < self.config.max_events
}
pub fn next_event(&mut self) -> Result<Option<NextEvent>> {
let new_event_id = self.get_next_event_id();
if new_event_id >= self.config.max_events {
return Ok(None);
}
// When, in event time, we should generate the event. Monotonic.
let event_timestamp = self
.config
.timestamp_for_event(self.config.next_event_number(self.events_count_so_far));
// When, in event time, the event should say it was generated. Depending on
// outOfOrderGroupSize may have local jitter.
let adjusted_event_timestamp = self.config.timestamp_for_event(
self.config
.next_adjusted_event_number(self.events_count_so_far),
);
// The minimum of this and all future adjusted event timestamps. Accounts for
// jitter in the event timestamp.
let watermark = self.config.timestamp_for_event(
self.config
.next_event_number_for_watermark(self.events_count_so_far),
);
// When, in wallclock time, we should emit the event.
let wallclock_timestamp =
self.wallclock_base_time + event_timestamp - self.config.base_time;
let (auction_proportion, person_proportion, total_proportion) = (
self.config.nexmark_config.auction_proportion as u64,
self.config.nexmark_config.person_proportion as u64,
self.config.nexmark_config.total_proportion() as u64,
);
let rem = new_event_id % total_proportion;
let event = if rem < person_proportion {
Event::Person(self.next_person(new_event_id, adjusted_event_timestamp))
} else if rem < person_proportion + auction_proportion {
Event::Auction(self.next_auction(
self.events_count_so_far,
new_event_id,
adjusted_event_timestamp,
)?)
} else {
Event::Bid(self.next_bid(new_event_id, adjusted_event_timestamp))
};
self.events_count_so_far += 1;
Ok(Some(NextEvent {
wallclock_timestamp,
event_timestamp,
event,
watermark,
}))
}
pub fn new(config: Config, rng: R, wallclock_base_time: u64) -> NexmarkGenerator<R> {
NexmarkGenerator {
config,
rng,
bid_channel_cache: SizedCache::with_size(CHANNELS_NUMBER as usize),
events_count_so_far: 0,
wallclock_base_time,
}
}
// Returns the sum of the first event id and the next (adjusted) event number,
// to return an id that is globally unique (across generators) that is used
// to calculate the next event typ deterministically.
fn get_next_event_id(&self) -> u64 {
self.config.first_event_id
+ self
.config
.next_adjusted_event_number(self.events_count_so_far)
}
}
/// The next event and its various timestamps. Ordered by increasing wallclock
/// timestamp, then (arbitrary but stable) event hash order.
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct NextEvent {
/// When, in wallclock time, should this event be emitted?
pub wallclock_timestamp: u64,
/// When, in event time, should this event be considered to have occured?
pub event_timestamp: u64,
/// The event itself.
pub event: Event,
/// The minimum of this and all future event timestamps.
pub watermark: u64,
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::nexmark::{
config::Config as NexmarkConfig,
model::{Auction, Bid, Person},
};
use rand::{rngs::mock::StepRng, thread_rng};
use rstest::rstest;
pub fn make_test_generator() -> NexmarkGenerator<StepRng> {
NexmarkGenerator::new(
Config {
nexmark_config: NexmarkConfig {
num_event_generators: 1,
..NexmarkConfig::default()
},
..Config::default()
},
StepRng::new(0, 1),
0,
)
}
pub fn make_person() -> Person {
Person {
id: 1,
name: String::from("AAA BBBB").into(),
email_address: String::from("AAABBB@example.com").into(),
credit_card: String::from("1111 2222 3333 4444").into(),
city: String::from("Phoenix").into(),
state: String::from("OR").into(),
date_time: 0,
extra: String::from("").into(),
}
}
pub fn make_bid() -> Bid {
Bid {
auction: 1,
bidder: 1,
price: 99,
channel: String::from("my-channel").into(),
url: String::from("https://example.com").into(),
date_time: 0,
extra: String::new().into(),
}
}
pub fn make_auction() -> Auction {
Auction {
id: 1,
item_name: String::from("item-name").into(),
description: String::from("description").into(),
initial_bid: 5,
reserve: 10,
date_time: 0,
expires: 2000,
seller: 1,
category: 1,
extra: String::new().into(),
}
}
pub fn make_next_event() -> NextEvent {
NextEvent {
wallclock_timestamp: 0,
event_timestamp: 0,
event: Event::Bid(make_bid()),
watermark: 0,
}
}
/// Generates a specified number of next events using the default test
/// generator.
pub fn generate_expected_next_events(
wallclock_base_time: u64,
num_events: usize,
) -> Vec<Option<NextEvent>> {
let mut ng = make_test_generator();
ng.wallclock_base_time = wallclock_base_time;
(0..num_events).map(|_| ng.next_event().unwrap()).collect()
}
#[test]
fn test_has_next() {
let mut ng = make_test_generator();
ng.config.max_events = 2;
assert!(ng.has_next());
ng.next_event().unwrap();
assert!(ng.has_next());
ng.next_event().unwrap();
assert!(!ng.has_next());
}
#[rstest]
#[case::single_generator_start_zero(1, 0, 0, vec![0, 1, 2])]
#[case::single_generator_start_1000(1, 1000, 0, vec![1000, 1001, 1002])]
#[case::first_of_three_generators_start_0(3, 0, 0, vec![0, 3, 6, 9])]
#[case::third_of_three_generators_start_0(3, 0, 2, vec![2, 5, 8, 11])]
#[case::third_of_three_generators_start_1000(3, 1000, 2, vec![1002, 1005, 1008, 1011])]
fn test_next_event_id(
#[case] num_event_generators: usize,
#[case] first_event_id: u64,
#[case] first_event_number: usize,
#[case] expected_next_event_ids: Vec<u64>,
) {
let config = Config {
nexmark_config: NexmarkConfig {
num_event_generators,
..NexmarkConfig::default()
},
first_event_id,
first_event_number,
..Config::default()
};
let mut generator = NexmarkGenerator::new(config, StepRng::new(0, 1), 0);
for expected_id in expected_next_event_ids.into_iter() {
assert_eq!(generator.get_next_event_id(), expected_id);
generator.next_event().unwrap();
}
}
// Tests the first five expected events without relying on any test
// helper for the data.
#[test]
fn test_next_event() {
let mut ng = NexmarkGenerator::new(
Config {
nexmark_config: NexmarkConfig {
num_event_generators: 1,
..NexmarkConfig::default()
},
..Config::default()
},
thread_rng(),
0,
);
// The first event with the default config is the person
let next_event = ng.next_event().unwrap();
assert!(next_event.is_some());
let next_event = next_event.unwrap();
assert!(
matches!(next_event.event, Event::Person(_)),
"got: {:?}, want: Event::NewPerson(_)",
next_event.event
);
assert_eq!(next_event.event_timestamp, 0);
// The next 3 events with the default config are auctions
for event_num in 1..=3 {
let next_event = ng.next_event().unwrap();
assert!(next_event.is_some());
let next_event = next_event.unwrap();
assert!(
matches!(next_event.event, Event::Auction(_)),
"got: {:?}, want: Event::NewAuction(_)",
next_event.event
);
assert_eq!(next_event.event_timestamp, event_num / 10);
}
// And the rest of the events in the first epoch are bids.
for _ in 4..=49 {
let next_event = ng.next_event().unwrap();
assert!(next_event.is_some());
let next_event = next_event.unwrap();
assert!(
matches!(next_event.event, Event::Bid(_)),
"got: {:?}, want: Event::NewBid(_)",
next_event.event
);
}
// The next epoch begins with another person etc.
let next_event = ng.next_event().unwrap();
assert!(next_event.is_some());
let next_event = next_event.unwrap();
assert!(
matches!(next_event.event, Event::Person(_)),
"got: {:?}, want: Event::NewPerson(_)",
next_event.event
);
}
// Verifies that the `generate_expected_next_events()` test helper does
// indeed output predictable results matching the order verified manually in
// the above `test_next_events` (at least for the first 5 events). Together
// with the manual test above of next_events, this ensures the order is
// correct for external call-sites.
#[test]
fn test_generate_expected_next_events() {
let mut ng = make_test_generator();
ng.wallclock_base_time = 1_000_000;
let expected_events = generate_expected_next_events(1_000_000, 100);
assert_eq!(
(0..100)
.map(|_| ng.next_event().unwrap())
.collect::<Vec<Option<NextEvent>>>(),
expected_events
);
}
}