-
-
Notifications
You must be signed in to change notification settings - Fork 832
/
send.rs
137 lines (114 loc) · 2.86 KB
/
send.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use sd_core_sync::{SyncMessage, NTP64};
use sd_cloud_api::RequestConfigProvider;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tokio::{sync::Notify, time::sleep};
use tracing::debug;
use uuid::Uuid;
use super::{err_break, CompressedCRDTOperations};
pub async fn run_actor(
library_id: Uuid,
sync: Arc<sd_core_sync::Manager>,
cloud_api_config_provider: Arc<impl RequestConfigProvider>,
state: Arc<AtomicBool>,
state_notify: Arc<Notify>,
) {
loop {
state.store(true, Ordering::Relaxed);
state_notify.notify_waiters();
loop {
// all available instances will have a default timestamp from create_instance
let instances = sync
.timestamps
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>();
// obtains a lock on the timestamp collections for the instances we have
let req_adds = err_break!(
sd_cloud_api::library::message_collections::request_add(
cloud_api_config_provider.get_request_config().await,
library_id,
instances,
)
.await
);
let mut instances = vec![];
use sd_cloud_api::library::message_collections::do_add;
debug!(
"Preparing to send {} instances' operations to cloud",
req_adds.len()
);
// gets new operations for each instance to send to cloud
for req_add in req_adds {
let ops = err_break!(
sync.get_instance_ops(
1000,
req_add.instance_uuid,
NTP64(
req_add
.from_time
.unwrap_or_else(|| "0".to_string())
.parse()
.expect("couldn't parse ntp64 value"),
)
)
.await
);
if ops.is_empty() {
continue;
}
let start_time = ops[0].timestamp.0.to_string();
let end_time = ops[ops.len() - 1].timestamp.0.to_string();
let ops_len = ops.len();
use base64::prelude::*;
debug!(
"Instance {}: {} to {}",
req_add.instance_uuid, start_time, end_time
);
instances.push(do_add::Input {
uuid: req_add.instance_uuid,
key: req_add.key,
start_time,
end_time,
contents: BASE64_STANDARD.encode(
rmp_serde::to_vec_named(&CompressedCRDTOperations::new(ops))
.expect("CompressedCRDTOperation should serialize!"),
),
ops_count: ops_len,
})
}
if instances.is_empty() {
break;
}
// uses lock we acquired earlier to send the operations to the cloud
err_break!(
do_add(
cloud_api_config_provider.get_request_config().await,
library_id,
instances,
)
.await
);
}
state.store(false, Ordering::Relaxed);
state_notify.notify_waiters();
{
// recreate subscription each time so that existing messages are dropped
let mut rx = sync.subscribe();
// wait until Created message comes in
loop {
if let Ok(SyncMessage::Created) = rx.recv().await {
break;
};
}
}
sleep(Duration::from_millis(1000)).await;
}
}