As Seen on YouTube DataEngineerOne:
- Kedro Wings: It's almost too easy to write pipelines this way.
- Easy Stateful Pipelines with Chronocoding and Kedro Wings
Give your next kedro project Wings! The perfect plugin for brand new pipelines, and new kedro users. This plugin enables easy and fast creation of datasets so that you can get straight into coding your pipelines.
The following example is a recreation of the iris example pipeline.
Kedro Wings enables super fast creation of pipelines by taking care of all the catalog work for you. Catalog entries are automatically created by parsing the values for your nodes' inputs and outputs.
This pipeline automatically creates a dataset that reads from the iris.csv
and then it creates 12 more datasets, corresponding to the outputs and inputs of the other datasets.
wing_example = Pipeline([
node(
split_data,
inputs=['01_raw/iris.csv', 'params:example_test_data_ratio'],
outputs=dict(
train_x="02_intermediate/example_train_x.csv",
train_y="02_intermediate/example_train_y.csv",
test_x="02_intermediate/example_test_x.csv",
test_y="02_intermediate/example_test_y.csv")
),
node(
train_model,
["02_intermediate/example_train_x.csv", "02_intermediate/example_train_y.csv", "parameters"],
outputs="06_models/example_model.pkl",
),
node(
predict,
inputs=dict(
model="06_models/example_model.pkl",
test_x="02_intermediate/example_test_x.csv"
),
outputs="07_model_output/example_predictions.pkl",
),
node(
report_accuracy,
inputs=["07_model_output/example_predictions.pkl", "02_intermediate/example_test_y.csv"],
None
),
])
Watch the video on Chronocoding here: Easy Stateful Pipelines with Chronocoding and Kedro Wings
Sometimes, there arises a need to rewrite data to the same path. This makes it easier to save state between kedro runs. Using KedroWings, you can automatically generate chronocoded datasets which temporally separates a read and write to a dataset.
By adding an !
at the end of a dataset, we signal to kedro that we wish to overwrite the data in that same filepath. Thus, we get around kedro's DAG requirement for datasets.
In Depth Breakdown on Chronocoding here: [KED-1667] Chronocoding: Solving the Problem of State Tracking with Temporally Sensitive DAGs
def state_modifier(state: str) -> str:
current_value = int(state)
new_value = current_value + 1
return str(new_value)
def create_pipelines(**kwargs):
return Pipeline([
node(
state_modifier,
inputs="01_raw/state.txt",
outputs="01_raw/state.txt!"
),
])
Kedro Wings is available on pypi, and is installed with kedro hooks.
pip install kedro-wings
Simply add a KedroWings
instance to the ProjectContext
hooks
tuple.
from kedro_wings import KedroWings
class ProjectContext(KedroContext):
hooks = (
KedroWings(),
)
Simply pass the kedro context into KedroWings
, and it will automatically add all catalog entries from all available pipelines.
# Load the context if not using a kedro jupyter notebook
from kedro.framework.context import load_context
context = load_context('./')
# Pass the context into KedroWings
from kedro_wings import KedroWings
KedroWings(context=context)
# context catalog now has all wings datasets available.
context.catalog.list()
Catalog entries are created using dataset input and output strings. The API is simple:
inputs="[PATH]/[NAME].[EXT]"
The PATH
portion determines the directory where a file will be saved.
The NAME
portion determines the final output name of the file to be saved.
The EXT
portion determines the dataset used to save and load that particular data.
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')
This will create a pandas.CSVDataSet
pointing at the 01_raw/iris.csv
file.
# pipeline.py
node(split_data, inputs='01_raw/iris.csv', outputs='split_data_output')
# catalog.yml
01_raw/iris.csv':
type: pandas.CSVDataSet
filepath: data/01_raw/iris.csv
If a catalog entry already exists inside of catalog.yml
, with a name that matches the wing catalog name,
KedroWings will NOT create that catalog, and will instead defer to the catalog.yml
entry.
The following are the datasets available by default.
default_dataset_configs={
".csv": {"type": "pandas.CSVDataSet"},
".yml": {"type": "yaml.YAMLDataSet"},
".yaml": {"type": "yaml.YAMLDataSet"},
".xls": {"type": "pandas.ExcelDataSet"},
".txt": {"type": "text.TextDataSet"},
".png": {"type": "pillow.ImageDataSet"},
".jpg": {"type": "pillow.ImageDataSet"},
".jpeg": {"type": "pillow.ImageDataSet"},
".img": {"type": "pillow.ImageDataSet"},
".pkl": {"type": "pickle.PickleDataSet"},
".parquet": {"type": "pandas.ParquetDataSet"},
".json": {"type": "json.JSONDataSet"}, # Only available in kedro 0.16.3
}
Kedro Wings supports configuration on instantiation of the hook.
KedroWings(dataset_configs, paths, root, namespaces, enabled, context)
:param dataset_configs: A mapping of file name extensions to the type of dataset to be created.
This allows the default dataset configurations to be overridden. This also allows the default extension to dataset mapping to be overridden or extended for other datasets.
Longer extensions are prioritized over shorter extensions, meaning multiple encoding methods can be applied to a single filetype.
KedroWings(dataset_configs={
'.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
'.csv': pandas.CSVDataSet,
})
from kedro.extras.dataset import pandas
KedroWings(dataset_configs={
'.comma.csv': pandas.CSVDataSet,
'.pipe.csv': {'type': 'pandas.CSVDataSet', 'sep': '|'},
})
This allows specified paths to be remapped
:param paths: A mapping of old path names to new path names.
KedroWings(paths={
'06_models': 'new_models',
})
This setting is prepended to any paths parsed. This is useful if the dataset supports fsspec
.
:param root: The root directory to save files to. Default: data
KedroWings(root='s3a://my-bucket/kedro-data')
KedroWings(root=None)
Namespaces from modular pipelines are supported. This parameter should be a list of the namespaces that KedroWings should account for. If a namespace is encountered, the output filepath will include the namespace in the extension.
The determined file paths would be iris.example1.csv
and iris2.example2.csv
.
KedroWings(namespaces=['example1'])
pipeline(Pipeline([node(lambda x: x, inputs='iris.csv', outputs='iris2.csv')]), namespace="example1")
This setting allows easy enabling and disabling of the plugin.
:param enabled: Convenience flag to enable or disable this plugin.
KedroWings(enabled=os.getenv('ENABLE_WINGS'))