In [1]:
import pandas as pd
from axisfuzzy.analysis._pipeline import FuzzyPipeline
from axisfuzzy.analysis._components.basic import (
    NormalizationTool,
    StatisticsTool,
    SimpleAggregationTool
)
# Create sample data
data = pd.DataFrame({
    'Cost': [100, 200, 150, 180],
    'Quality': [8, 9, 7, 8.5],
    'Speed': [50, 60, 55, 58]
}, index=['Product_A', 'Product_B', 'Product_C', 'Product_D'])

print("✅ Setup complete. All _components are defined and data is ready.")

✅ Setup complete. All components are defined and data is ready.


### Pipeline iterator based on `FuzzyPipelineIterator` performs a single-step iteration

In [2]:
p = FuzzyPipeline()
data_input = p.input('data')

normalized = p.add(NormalizationTool(method='min_max').run, data=data_input)
stats = p.add(StatisticsTool().run, data=normalized)
aggregated = p.add(SimpleAggregationTool(operation='mean').run, data=normalized)

In [3]:
for step_result in p.step_by_step({'data': data}):
    print(f"Step {step_result['step_index'] + 1}/{step_result['total_steps']}: {step_result['step_name']}")
    print(f"Execution time: {step_result['execution_time']:.4f} seconds")

    if isinstance(step_result['result'], dict):
        for key, value in step_result['result'].items():
            if isinstance(value, pd.DataFrame):
                print(f"Result '{key}' shape: {value.shape}")
            elif isinstance(value, dict):
                print(f"Result '{key}': {value}")
            else:
                print(f"Result '{key}' type: {type(value)}")
    else:
        print(f"Result type: {type(step_result['result'])}")

    print("-" * 30)

print("Pipeline execution completed!")

Step 1/3: NormalizationTool.run
Execution time: 0.0035 seconds
Result 'normalized_data' shape: (4, 3)
------------------------------
Step 2/3: StatisticsTool.run
Execution time: 0.0005 seconds
Result 'statistics': {'mean': 0.4456526266008652, 'std': 0.41757354586690704, 'min': 0.0, 'max': 1.0, 'median': 0.3121470366368326, 'count': 12}
------------------------------
Step 3/3: SimpleAggregationTool.run
Execution time: 0.0005 seconds
Result 'aggregated_values' type: <class 'pandas.core.series.Series'>
------------------------------
Pipeline execution completed!


In [4]:
# You can also inspect the pipeline structure
# p.steps

### Step-by-step execution using interactive `.next()` and `.run_all()` methods of `ExecutionState`

In [5]:
# Build pipeline
p = FuzzyPipeline()
data_input = p.input("data")
normalized = p.add(NormalizationTool(method='min_max').run, data=data_input)
stats = p.add(StatisticsTool().run, data=normalized)
aggregated = p.add(SimpleAggregationTool(operation='mean').run, data=normalized)

In [6]:
print(f"Pipeline created with {len(p.steps) - len(p._input_nodes)} executable steps.")
print("Starting chainable step-by-step execution...\n")

try:
    # 1. Get the initial execution state
    state0 = p.start_execution({"data": data})
    print(f"Initial State: {state0}")
    print("Ready to execute first step.\n")

    # 2. Execute the first step
    print("--- Running Step 1 ---")
    state1 = state0.run_next()
    print(f"New State: {state1}")
    print(f"Result of Step 1 ('{state1.pipeline.get_step_info(state1.latest_step_id)['display_name']}'):")
    # The result is a dict, e.g., {'normalized_data': DataFrame}
    print(state1.latest_result['normalized_data'].head())
    print("-" * 30 + "\n")

    # 3. Execute the second step
    print("--- Running Step 2 ---")
    state2 = state1.run_next()
    print(f"New State: {state2}")
    print(f"Result of Step 2 ('{state2.pipeline.get_step_info(state2.latest_step_id)['display_name']}'):")
    print(state2.latest_result)
    print("-" * 30 + "\n")

    # 4. Run all remaining steps
    print("--- Running all remaining steps ---")
    final_state = state2.run_all()
    print(f"Final State: {final_state}")

    # Inspect final results
    print("\n--- Final Results ---")
    all_results = final_state.step_results
    for step_id, result in all_results.items():
        step_name = final_state.pipeline.get_step_info(step_id)['display_name']
        print(f"Result of '{step_name}': {type(result)}")

except StopIteration:
    print("Execution finished earlier than expected.")
except Exception as e:
    print(f"An error occurred: {e}")

print("\nPipeline execution finished.")

Pipeline created with 3 executable steps.
Starting chainable step-by-step execution...

Initial State: <ExecutionState (Next step 1/3), latest_step_completed='Initial'>
Ready to execute first step.

--- Running Step 1 ---
New State: <ExecutionState (Next step 2/3), latest_step_completed='NormalizationTool.run'>
An error occurred: 'StepMetadata' object is not subscriptable

Pipeline execution finished.
