Skip to content

Commit bd90f11

Browse files
feat(KDP): Enhance Dynamic Preprocessing Pipeline (#24)
2 parents ad91096 + 1d7e48a commit bd90f11

File tree

2 files changed

+130
-12
lines changed

2 files changed

+130
-12
lines changed

kdp/dynamic_pipeline.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import tensorflow as tf
2+
3+
4+
class DynamicPreprocessingPipeline:
5+
"""
6+
Dynamically initializes and manages a sequence of Keras preprocessing layers, with selective retention of outputs
7+
based on dependencies among layers, and supports streaming data through the pipeline.
8+
"""
9+
10+
def __init__(self, layers):
11+
"""
12+
Initializes the pipeline with a list of preprocessing layers.
13+
14+
Args:
15+
layers (list): A list of TensorFlow preprocessing layers.
16+
"""
17+
self.layers = layers
18+
self.dependency_map = self._analyze_dependencies()
19+
20+
def _analyze_dependencies(self):
21+
"""
22+
Analyzes and determines the dependencies of each layer on the outputs of previous layers.
23+
24+
Returns:
25+
dict: A dictionary mapping each layer's name to the set of layer outputs it depends on.
26+
"""
27+
dependencies = {}
28+
all_outputs = set()
29+
for i, layer in enumerate(self.layers):
30+
# If the layer has an input_spec (which is common in Keras layers) we inspect it.
31+
if hasattr(layer, "input_spec") and layer.input_spec is not None:
32+
# Use a safe getter so that if an element does not have a 'name' attribute, we get None.
33+
# Then filter out the Nones.
34+
required_inputs = set(
35+
[
36+
name
37+
for name in tf.nest.flatten(
38+
tf.nest.map_structure(
39+
lambda x: getattr(x, "name", None), layer.input_spec
40+
)
41+
)
42+
if name is not None
43+
]
44+
)
45+
else:
46+
# Otherwise, assume that the layer depends on all outputs seen so far.
47+
required_inputs = all_outputs
48+
dependencies[layer.name] = required_inputs
49+
all_outputs.update(required_inputs)
50+
all_outputs.add(layer.name)
51+
return dependencies
52+
53+
def process(self, dataset):
54+
"""
55+
Processes the dataset through the pipeline using tf.data API.
56+
57+
Args:
58+
dataset (tf.data.Dataset): The dataset where each element is a dictionary of features.
59+
60+
Returns:
61+
tf.data.Dataset: The processed dataset with outputs of each layer stored by key.
62+
"""
63+
64+
def _apply_transformations(features):
65+
current_data = features
66+
for i, layer in enumerate(self.layers):
67+
# Get the required input keys for the current layer.
68+
required_keys = self.dependency_map[layer.name]
69+
# Prepare the input by selecting the keys if they exist in the current data.
70+
current_input = {
71+
k: current_data[k] for k in required_keys if k in current_data
72+
}
73+
# Process each required input through the layer.
74+
# Here we assume the layer accepts one tensor per key.
75+
transformed_output = {
76+
layer.name: layer(current_input[k])
77+
for k in required_keys
78+
if k in current_input
79+
}
80+
# Merge transformed output into the working data dictionary.
81+
current_data.update(transformed_output)
82+
return current_data
83+
84+
return dataset.map(_apply_transformations)

kdp/pipeline.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from loguru import logger
55

66
from kdp.layers_factory import PreprocessorLayerFactory
7+
from kdp.dynamic_pipeline import DynamicPreprocessingPipeline
78

89

910
class ProcessingStep:
@@ -87,44 +88,77 @@ def transform(self, input_data: tf.Tensor) -> tf.Tensor:
8788

8889

8990
class FeaturePreprocessor:
90-
def __init__(self, name: str) -> None:
91-
"""Initialize a feature preprocessor.
91+
def __init__(self, name: str, use_dynamic: bool = False) -> None:
92+
"""
93+
Initializes a feature preprocessor.
9294
9395
Args:
9496
name (str): The name of the feature preprocessor.
97+
use_dynamic (bool): Whether to use the dynamic preprocessing pipeline.
9598
"""
9699
self.name = name
97-
self.pipeline = Pipeline(name=name)
100+
self.use_dynamic = use_dynamic
101+
if not self.use_dynamic:
102+
self.pipeline = Pipeline(name=name)
103+
else:
104+
self.layers = [] # for dynamic pipeline
98105

99106
def add_processing_step(
100107
self, layer_creator: Callable[..., tf.keras.layers.Layer] = None, **layer_kwargs
101108
) -> None:
102-
"""Add a processing step to the feature preprocessor.
109+
"""
110+
Add a preprocessing layer to the feature preprocessor pipeline.
111+
If using the standard pipeline, a ProcessingStep is added.
112+
Otherwise, the layer is added to a list for dynamic handling.
103113
104114
Args:
105115
layer_creator (Callable[..., tf.keras.layers.Layer]): A callable that creates a layer.
106116
If not provided, the default layer creator is used.
107117
**layer_kwargs: Additional keyword arguments for the layer creator.
108118
"""
109119
layer_creator = layer_creator or PreprocessorLayerFactory.create_layer
110-
step = ProcessingStep(layer_creator=layer_creator, **layer_kwargs)
111-
self.pipeline.add_step(step=step)
120+
if self.use_dynamic:
121+
layer = layer_creator(**layer_kwargs)
122+
logger.info(f"Adding {layer.name} to dynamic preprocessing pipeline")
123+
self.layers.append(layer)
124+
else:
125+
step = ProcessingStep(layer_creator=layer_creator, **layer_kwargs)
126+
self.pipeline.add_step(step=step)
112127

113128
def chain(self, input_layer) -> tf.keras.layers.Layer:
114-
"""Chain the preprocessor's pipeline steps starting from the input layer.
129+
"""
130+
Chains the processing steps starting from the given input_layer.
115131
116-
Args:
117-
input_layer: The input layer to start the chain from.
132+
For a static pipeline, this delegates to the internal Pipeline's chain() method.
133+
For the dynamic pipeline, it constructs the dynamic pipeline on the fly.
118134
"""
119-
return self.pipeline.chain(input_layer)
135+
if not self.use_dynamic:
136+
return self.pipeline.chain(input_layer)
137+
else:
138+
dynamic_pipeline = DynamicPreprocessingPipeline(self.layers)
139+
# In the dynamic case, we use a dict for the input.
140+
output_dict = dynamic_pipeline.initialize_and_transform(
141+
{"input": input_layer}
142+
)
143+
# Return the transformed data at key "input" (or adjust as needed).
144+
return output_dict.get("input", input_layer)
120145

121146
def transform(self, input_data: tf.Tensor) -> tf.Tensor:
122-
"""Apply the feature preprocessor to the input data.
147+
"""
148+
Process the input data through the pipeline.
149+
For the dynamic pipeline, wrap input in a dictionary and extract final output.
123150
124151
Args:
125152
input_data: The input data to process.
126153
127154
Returns:
128155
tf.Tensor: The processed data.
129156
"""
130-
return self.pipeline.transform(input_data)
157+
if not self.use_dynamic:
158+
return self.pipeline.transform(input_data)
159+
else:
160+
dynamic_pipeline = DynamicPreprocessingPipeline(self.layers)
161+
output_dict = dynamic_pipeline.initialize_and_transform(
162+
{"input": input_data}
163+
)
164+
return output_dict.get("input", input_data)

0 commit comments

Comments
 (0)