Skip to content

Commit

Permalink
Merge pull request #14 from dahlbaek/remove-any
Browse files Browse the repository at this point in the history
Reduce use of Any
  • Loading branch information
laysakura authored Apr 25, 2023
2 parents f7df217 + 903c0c2 commit e5caac3
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 241 deletions.
175 changes: 79 additions & 96 deletions sdks/rust/src/internals/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,131 +1,114 @@
use std::collections::HashMap;
use std::iter::Iterator;
use std::marker::PhantomData;

use std::any::Any;
use std::boxed::Box;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};

use once_cell::sync::Lazy;

use crate::elem_types::kv::KV;
use crate::elem_types::ElemType;
use crate::transforms::group_by_key::KeyExtractor;
use crate::transforms::pardo::DoFn;
use crate::worker::operators::{DynamicGroupedValues, DynamicWindowedValue, WindowedValue};
use crate::worker::Receiver;

static SERIALIZED_FNS: Lazy<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>> =
static DO_FNS: Lazy<Mutex<HashMap<String, &'static dyn DynamicDoFn>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn serialize_fn<T: Any + Sync + Send>(obj: Box<T>) -> String {
let mut serialized_fns = SERIALIZED_FNS.lock().unwrap();
let name = format!("object{}", serialized_fns.len());
serialized_fns.insert(name.to_string(), obj);
static KEY_EXTRACTORS: Lazy<Mutex<HashMap<String, &'static dyn DynamicKeyExtractor>>> =
Lazy::new(|| Mutex::new(HashMap::new()));

pub fn store_do_fn(do_fn: impl DoFn + 'static) -> String {
let mut do_fns = DO_FNS.lock().unwrap();
let name = format!("object{}", do_fns.len());
do_fns.insert(name.to_string(), Box::leak(Box::new(do_fn)));
name
}

pub fn deserialize_fn<T: Any + Sync + Send>(name: &String) -> Option<&'static T> {
let binding = SERIALIZED_FNS.lock().unwrap();
let untyped = binding.get(name);
let typed = match untyped {
Some(x) => x.downcast_ref::<T>(),
None => None,
};

unsafe { std::mem::transmute::<Option<&T>, Option<&'static T>>(typed) }
pub fn get_do_fn(name: &str) -> Option<&'static dyn DynamicDoFn> {
let binding = DO_FNS.lock().unwrap();
binding.get(name).copied()
}

// ******* DoFn Wrappers, perhaps move elsewhere? *******

// TODO: Give these start/finish_bundles, etc.
pub type GenericDoFn =
Box<dyn Fn(&dyn Any) -> Box<dyn Iterator<Item = Box<dyn Any>>> + Send + Sync>;

struct GenericDoFnWrapper {
_func: GenericDoFn,
pub fn store_key_extractor(ke: impl DynamicKeyExtractor + 'static) -> String {
let mut kes = KEY_EXTRACTORS.lock().unwrap();
let name = format!("object{}", kes.len());
kes.insert(name.to_string(), Box::leak(Box::new(ke)));
name
}

unsafe impl std::marker::Send for GenericDoFnWrapper {}

struct BoxedIter<O: Any, I: IntoIterator<Item = O>> {
typed_iter: I::IntoIter,
pub fn get_extractor(name: &str) -> Option<&'static dyn DynamicKeyExtractor> {
KEY_EXTRACTORS.lock().unwrap().get(name).copied()
}

impl<O: Any, I: IntoIterator<Item = O>> Iterator for BoxedIter<O, I> {
type Item = Box<dyn Any>;
pub trait DynamicDoFn: Send + Sync {
fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]);
fn start_bundle_dyn(&self);
fn finish_bundle_dyn(&self);
}

fn next(&mut self) -> Option<Box<dyn Any>> {
if let Some(x) = self.typed_iter.next() {
Some(Box::new(x))
} else {
None
impl<D: DoFn> DynamicDoFn for D {
fn process_dyn(&self, elem: DynamicWindowedValue, receivers: &[Arc<Receiver>]) {
let typed_elem = elem.downcast_ref::<D::In>();
for value in self.process(&typed_elem.value) {
let windowed_value = typed_elem.with_value(value);
for receiver in receivers {
receiver.receive(DynamicWindowedValue::new(&windowed_value))
}
}
}
}

pub fn to_generic_dofn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
func: fn(&T) -> I,
) -> GenericDoFn {
Box::new(
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
Box::new(BoxedIter::<O, I> {
typed_iter: func(typed_input).into_iter(),
})
},
)
}

pub fn to_generic_dofn_dyn<T: Any, O: Any, I: IntoIterator<Item = O> + 'static>(
func: Box<dyn Fn(&T) -> I + Send + Sync>,
) -> GenericDoFn {
Box::new(
move |untyped_input: &dyn Any| -> Box<dyn Iterator<Item = Box<dyn Any>>> {
let typed_input: &T = untyped_input.downcast_ref::<T>().unwrap();
Box::new(BoxedIter::<O, I> {
typed_iter: func(typed_input).into_iter(),
})
},
)
}
fn start_bundle_dyn(&self) {
self.start_bundle()
}

pub trait KeyExtractor: Sync + Send {
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>>;
fn recombine(
&self,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send>;
fn finish_bundle_dyn(&self) {
self.finish_bundle()
}
}

pub struct TypedKeyExtractor<V: Clone + Sync + Send + 'static> {
phantom_data: PhantomData<V>,
pub trait DynamicKeyExtractor: Sync + Send {
fn new_grouped_values(&self) -> DynamicGroupedValues;
fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues);
fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues);
fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]);
}

impl<V: Clone + Sync + Send + 'static> Default for TypedKeyExtractor<V> {
fn default() -> Self {
Self {
phantom_data: PhantomData,
}
impl<V: ElemType> DynamicKeyExtractor for KeyExtractor<V> {
fn new_grouped_values(&self) -> DynamicGroupedValues {
DynamicGroupedValues::new::<V>()
}
}
fn clear_grouped_values(&self, grouped_values: &mut DynamicGroupedValues) {
grouped_values.downcast_mut::<V>().clear()
}
fn extract(&self, kv: DynamicWindowedValue, grouped_values: &mut DynamicGroupedValues) {
let KV { k, v } = &kv.downcast_ref::<KV<String, V>>().value;
let grouped_values = grouped_values.downcast_mut::<V>();

impl<V: Clone + Sync + Send + 'static> KeyExtractor for TypedKeyExtractor<V> {
fn extract(&self, kv: &dyn Any) -> KV<String, Box<dyn Any + Sync + Send>> {
let typed_kv = kv.downcast_ref::<KV<String, V>>().unwrap();
KV {
k: typed_kv.k.clone(),
v: Box::new(typed_kv.v.clone()),
if !grouped_values.contains_key(k) {
grouped_values.insert(k.clone(), Vec::new());
}
grouped_values.get_mut(k).unwrap().push(v.clone());
}
fn recombine(
&self,
key: &str,
values: &Box<Vec<Box<dyn Any + Sync + Send>>>,
) -> Box<dyn Any + Sync + Send> {
let mut typed_values: Vec<V> = Vec::new();
for untyped_value in values.iter() {
typed_values.push(untyped_value.downcast_ref::<V>().unwrap().clone());

fn recombine(&self, grouped_values: &DynamicGroupedValues, receivers: &[Arc<Receiver>]) {
let typed_grouped_values = grouped_values.downcast_ref::<V>();
for (key, values) in typed_grouped_values.iter() {
// TODO: timestamp and pane info are wrong
for receiver in receivers.iter() {
// TODO: End-of-window timestamp, only firing pane.
let mut typed_values: Vec<V> = Vec::new();
for value in values.iter() {
typed_values.push(value.clone());
}
let res = KV {
k: key.to_string(),
v: typed_values,
};
receiver.receive(DynamicWindowedValue::new(&WindowedValue::in_global_window(
res,
)));
}
}
Box::new(KV {
k: key.to_string(),
v: typed_values,
})
}
}
18 changes: 9 additions & 9 deletions sdks/rust/src/transforms/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use crate::{
internals::pvalue::{PTransform, PValue},
};

pub struct Create<T> {
elements: Vec<T>,
pub struct Create<Out> {
elements: Vec<Out>,
}

impl<T: Clone> Create<T> {
pub fn new(elements: &[T]) -> Self {
impl<Out: ElemType> Create<Out> {
pub fn new(elements: &[Out]) -> Self {
Self {
elements: elements.to_vec(),
}
Expand All @@ -39,15 +39,15 @@ impl<T: Clone> Create<T> {
// https://github.com/rust-lang/rust/issues/35121
pub type Never = ();

impl<E: ElemType> PTransform<Never, E> for Create<E> {
fn expand(&self, input: &PValue<Never>) -> PValue<E> {
impl<Out: ElemType> PTransform<Never, Out> for Create<Out> {
fn expand(&self, input: &PValue<Never>) -> PValue<Out> {
let elements = self.elements.to_vec();
// TODO: Consider reshuffling.
input
.clone()
.apply(Impulse::new())
.apply(ParDo::from_flatmap_with_context(Box::new(
move |_x| -> Vec<E> { elements.to_vec() },
)))
.apply(ParDo::from_flat_map(move |_x| -> Vec<Out> {
elements.to_vec()
}))
}
}
12 changes: 5 additions & 7 deletions sdks/rust/src/transforms/group_by_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ use crate::proto::beam_api::pipeline as proto_pipeline;

pub struct GroupByKey<K, V> {
payload: String,
phantom_k: PhantomData<K>,
phantom_v: PhantomData<V>,
phantom: PhantomData<(K, V)>,
}

pub struct KeyExtractor<V: ElemType>(PhantomData<V>);

// TODO: Use coders to allow arbitrary keys.
impl<V: ElemType> Default for GroupByKey<String, V> {
fn default() -> Self {
Self {
payload: serialize::serialize_fn::<Box<dyn serialize::KeyExtractor>>(Box::new(
Box::new(serialize::TypedKeyExtractor::<V>::default()),
)),
phantom_k: PhantomData,
phantom_v: PhantomData,
payload: serialize::store_key_extractor(KeyExtractor::<V>(PhantomData)),
phantom: PhantomData,
}
}
}
Expand Down
Loading

0 comments on commit e5caac3

Please sign in to comment.