forked from nivaldoh/beam
-
Notifications
You must be signed in to change notification settings - Fork 7
/
serialize.rs
122 lines (103 loc) · 3.59 KB
/
serialize.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use std::collections::HashMap;
use std::iter::Iterator;
use std::marker::PhantomData;
use std::any::Any;
use std::boxed::Box;
use std::sync::Mutex;
use once_cell::sync::Lazy;
static SERIALIZED_FNS: Lazy<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub fn serialize_fn<T: Any + Sync + Send>(obj: Box<T>) -> String {
let name = format!("object{}", SERIALIZED_FNS.lock().unwrap().len());
SERIALIZED_FNS.lock().unwrap().insert(name.to_string(), obj);
name
}
pub fn deserialize_fn<T: Any + Sync + Send>(name: &String) -> Option<&'static T> {
let binding = SERIALIZED_FNS.lock().unwrap();
let untyped = binding.get(name);
let typed = match untyped {
Some(x) => x.downcast_ref::<T>(),
None => None,
};
unsafe { std::mem::transmute::<Option<&T>, Option<&'static T>>(typed) }
}
// ******* DoFn Wrappers, perhaps move elsewhere? *******
// TODO: Give these start/finish_bundles, etc.
pub type GenericDoFn =
Box<dyn Fn(&dyn Any) -> Box<dyn Iterator<Item = Box<dyn Any>>> + Send + Sync>;
struct GenericDoFnWrapper {
_func: GenericDoFn,
}
unsafe impl std::marker::Send for GenericDoFnWrapper {}
struct BoxedIter<O: Any, I: IntoIterator<Item = O>> {
typed_iter: I::IntoIter,
}
impl<O: Any, I: IntoIterator<Item = O>> Iterator for BoxedIter<O, I> {
type Item = Box<dyn Any>;
fn next(&mut self) -> Option<Box<dyn Any>> {
if let Some(x) = self.typed_iter.next() {
Some(Box::new(x))
} else {
None
}
}
}
pub fn to_generic_dofn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
func: fn(&T) -> I,
) -> GenericDoFn {
Box::new(
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
Box::new(BoxedIter::<O, I> {
typed_iter: func(typed_input).into_iter(),
})
},
)
}
pub fn to_generic_dofn_dyn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
func: Box<dyn Fn(&T) -> I + Send + Sync>,
) -> GenericDoFn {
Box::new(
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
Box::new(BoxedIter::<O, I> {
typed_iter: func(typed_input).into_iter(),
})
},
)
}
pub trait KeyExtractor: Sync + Send {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>);
fn recombine(
&self,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send>;
}
pub struct TypedKeyExtractor<V: Clone + Sync + Send + 'static> {
phantom_data: PhantomData<V>,
}
impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> {
fn default() -> Self {
Self {
phantom_data: PhantomData,
}
}
}
impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
fn extract(&self, kv: &dyn Any) -> (String, Box<dyn Any + Sync + Send>) {
let typed_kv = kv.downcast_ref::<(String, V)>().unwrap();
(typed_kv.0.clone(), Box::new(typed_kv.1.clone()))
}
fn recombine(
&self,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send> {
let mut typed_values: Vec<V> = Vec::new();
for untyped_value in values.iter() {
typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone());
}
Box::new((key.to_string(), typed_values))
}
}