# Caching the dataframe(s) in case of a pipeline failure

This notebook shows a couple of useful features to help the user to debug a broken pipeline.

Please note that this debugging process is intended for execution in a notebook, as it relies on nebula storage, which resides within the Python kernel and cannot be utilized in a recipe or Airflow.

There two main situations in which a pipeline may break:
- a transformer fails
- In a split pipeline, the split dataframes become unmergeable due to varying schemas or columns.

In the first case nebula stores the input dataframe of the failed transformer, in the latter one all the dataframes that should be merged are retained, allowing the user to retrieve them and address the issue.

In [4]:
import polars as pl

from nebula import nebula_storage as ns
from nebula.pipelines.pipelines import TransformerPipeline
from nebula.base import Transformer
from nebula.transformers import (
    AssertNotEmpty,
    DropColumns,
    RenameColumns,
    SelectColumns,
)

In [24]:
getattr(TransformerPipeline([]), "__name__", None) == "TransformerPipeline"

False

In [31]:
type(TransformerPipeline([])).__name__

'TransformerPipeline'

In [5]:
data = [
    [0.1234, "a", "b"],
    [4.1234, "", ""],
    [5.1234, None, None],
    [6.1234, "", None],
    [8.1234, "a", None],
    [9.1234, "a", ""],
    [10.1234, "", "b"],
    [11.1234, "a", None],
    [12.1234, None, "b"],
    [14.1234, "", None],
]

df_input = pl.DataFrame(data, orient="row", schema=["c1", "c2", "c3"])
print(df_input.schema)
df_input

Schema([('c1', Float64), ('c2', String), ('c3', String)])


c1,c2,c3
f64,str,str
0.1234,"""a""","""b"""
4.1234,"""""",""""""
5.1234,,
6.1234,"""""",
8.1234,"""a""",
9.1234,"""a""",""""""
10.1234,"""""","""b"""
11.1234,"""a""",
12.1234,,"""b"""
14.1234,"""""",


In [6]:
def _split_function_with_null(df: pl.DataFrame) -> dict[str, pl.DataFrame]:
    """Split dataframe into 'low', 'hi', and 'null' subsets."""
    ret = _split_function(df)
    # Include both actual nulls and NaN values in the 'null' split
    cond_null = pl.col("c1").is_null() | pl.col("c1").is_nan()
    return {**ret, "null": df.filter(cond_null)}


In [7]:
dict_transformers = {"low": [], "hi": []}

pipe = TransformerPipeline(
    dict_transformers,
    split_function=_split_function_with_null,
    splits_no_merge={"hi"},
)


In [9]:
dir(pipe)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_config',
 '_ir',
 '_should_skip',
 'find_node',
 'get_node_ids',
 'get_number_transformers',
 'name',
 'plot',
 'run',
 'show',
 'to_string']

In [18]:
pipe._ir.steps

[InputNode(id='input_88f438@0', node_type=<NodeType.INPUT: 1>, position=(0,), metadata={'name': 'DF input'}, children=[], name='DF input'),
 ForkNode(id='fork_split_b4f1d2@1', node_type=<NodeType.FORK: 6>, position=(1,), metadata={}, children=[], fork_type='split', config={'splits_no_merge': {'hi'}, 'splits_skip_if_empty': set(), 'cast_subsets_to_input_schema': False, 'repartition_output_to_original': False, 'coalesce_output_to_original': False}, branches={'hi': [], 'low': []}, otherwise=None, split_function=<function _split_function_with_null at 0x0000019BFF2C1090>),
 MergeNode(id='merge_append_c29f49@2', node_type=<NodeType.MERGE: 7>, position=(2,), metadata={}, children=[], merge_type='append', config={'allow_missing_columns': False, 'cast_subsets_to_input_schema': False, 'repartition_output_to_original': False, 'coalesce_output_to_original': False, 'dead_end_splits': {'hi'}}),
 OutputNode(id='output_2a4e73@3', node_type=<NodeType.OUTPUT: 2>, position=(3,), metadata={'name': 'DF out

In [20]:
callable(lambda x: x)

True

In [None]:
schema = [
    StructField("c1", FloatType(), True),
    StructField("c2", StringType(), True),
    StructField("c3", StringType(), True),
]

data = [
    [0.1234, "a", "b"],
    [0.1234, "a", "b"],
    [0.1234, "a", "b"],
    [1.1234, "a", "  b"],
    [2.1234, "  a  ", "  b  "],
    [3.1234, "", ""],
    [4.1234, "   ", "   "],
    [5.1234, None, None],
    [6.1234, " ", None],
    [7.1234, "", None],
    [8.1234, "a", None],
    [9.1234, "a", ""],
    [10.1234, "   ", "b"],
    [11.1234, "a", None],
    [12.1234, None, "b"],
    [13.1234, None, "b"],
    [14.1234, None, None],
]

df_input = spark.createDataFrame(data, schema=StructType(schema)).cache()
df_input.show()

## Transformer failure

In [None]:
class ThisTransformerIsBroken:
    @staticmethod
    def transform(df):
        """Public transform method w/o parent class."""        
        return df.select("wrong")


# clear the cache
ns.clear()

pipe = TransformerPipeline([
    NanToNull(input_columns="*"),
    ThisTransformerIsBroken(),
    Distinct(),
])

pipe.show_pipeline(add_transformer_params=True)

### Retrieve the input dataframe of the failed transformer as the pipe breaks.

The error message will contain the key(s) associated with storing the aforementioned dataframe. 

A few lines above, the original exception is documented.

In [None]:
pipe.run(df_input)

In [None]:
ns.get("FAIL_DF_ThisTransformerIsBroken").show()

## Unable to merge splits

In this example the transformers work properly, but they modified the dataframes in a way that is not possible to merge them back anymore.

To address this issue, all the dataframes before the union process are stored, allowing the user to investigate the problem.

In this example one split drops the column `c2`, the other one the column `c3`, hence they cannot be merged.

In [None]:
def my_split_function(df):
    cond = F.col("c1") < 10
    return {
        "low": df.filter(cond),
        "hi": df.filter(~cond),
    }


dict_transf = {
    "low": [DropColumns(columns="c2")],
    "hi": [DropColumns(columns="c3")],
}

# clear the cache
ns.clear()

pipe = TransformerPipeline(dict_transf, split_function=my_split_function)

pipe.show_pipeline(add_transformer_params=True)

In [None]:
_ = pipe.run(df_input)

In [None]:
ns.get("FAIL_DF_low").show()

In [None]:
ns.get("FAIL_DF_hi").show()

### Overwriting keys associated with the failed dataframes

The keys used for storing dataframes are generated with a method that prevent any form of overwriting by adding a numerical suffix to them, hence the user should not worry about that.

Rerruning the same broken pipeline, without clearing the cache, the keys associated with the failed dataframes do not overwrite the previous ones.

In [None]:
_ = pipe.run(df_input)

The keys associated with the failed dataframes are now:
- `FAIL_DF_low_0`
- `FAIL_DF_hi_0`