/
backup.rs
139 lines (117 loc) · 4.5 KB
/
backup.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use backup_entity::BackupEntity;
use std::thread;
pub struct Backuper {
entities: Vec<Arc<Mutex<BackupEntity>>>
}
impl Backuper {
pub fn new(entities: Vec<BackupEntity>) -> Self {
Backuper {
entities: entities.into_iter()
.map(|entity| Arc::new(Mutex::new(entity)))
.collect()
}
}
pub fn start(self) {
if self.entities.is_empty() {
error!("no backup entities, nothing to be done.");
return;
}
// Spawn fs watcher thread
let cloned_entities = self.entities.clone();
thread::spawn(|| Self::watcher_thread_main(cloned_entities));
loop {
for entity in &self.entities {
let mut entity = entity.lock().unwrap();
let need_backup = {
let time_trigger = match entity.last_triggered.elapsed() {
Ok(duration) => duration >= Duration::from_secs(entity.trigger_timer * 60),
Err(_) => false
};
let changes_trigger = entity.changed >= entity.trigger_changes;
time_trigger || changes_trigger
};
if need_backup {
entity.backup().unwrap_or_else(|error| {
error!("fatal error for backup target {:?}: {}", entity.path, error);
})
}
}
thread::sleep(Duration::from_secs(60));
}
}
fn watcher_thread_main(entities: Vec<Arc<Mutex<BackupEntity>>>) {
use notify::*;
let mut lookup_table = entities.iter()
.map(|entity| (
entity.lock().unwrap().path.clone(),
entity.clone()
))
.collect::<HashMap<_, _>>();
trace!("watching pathes:");
for (path, _) in &lookup_table {
trace!("{:?}", path);
}
// Create watcher
let (tx, rx) = mpsc::channel();
let mut watcher = watcher(tx, Duration::from_secs(60))
.map_err(|e| error!("{}", e))
.expect("failed to start file watcher");
// Register watch pathes
for (path, entity) in &lookup_table {
let recursive_mode = if entity.lock().unwrap().recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
watcher.watch(&path, recursive_mode)
.unwrap_or_else(|e| error!("failed to start watching {}: {}", path.display(), e));
}
use notify::DebouncedEvent::*;
loop {
let path = match rx.recv() {
Ok(event) => {
trace!("received fs event: {:?}", event);
match event {
NoticeWrite(path) |
NoticeRemove(path) |
Create(path) |
Write(path) |
Chmod(path) |
Remove(path) => path,
Rename(path, new_path) => {
// remove old path from lookup table
lookup_table.remove(&path).map(|entity| {
// modify path in the entity
entity.lock().unwrap().path = new_path.clone();
// push entity back into the lookup table
lookup_table.insert(new_path.clone(), entity);
});
new_path
},
_ => continue
}
},
Err(e) => {
error!("watching error: {}", e);
continue
}
};
// Find path starting with one of watching pathes
for (watching_path, entity) in &lookup_table {
if path.starts_with(watching_path) {
let mut entity = entity.lock().unwrap();
entity.changed += 1;
trace!("incremented change counter for {:?}: {} -> {}",
entity.path.display(),
entity.changed - 1,
entity.changed);
}
}
}
}
}