---
title: PARAFAC2 Pipeline Orchastrator Demonstration


description: A prototype for a sklearn based PARAFAC2 pipeline Orchastrator with preprocessing, pipeline, postprocessing and results demonstration


project: parafac2


status: closed


conclusion: "orchastrator successfully completes the PARAFAC2 pipeline and displays results, storing them in a database in a datamart-like model"
---

# Introduction

Multistage preprocessing of data prior to modeling can quickly become difficult to manage - keeping track of the order of execution of the individual stages, modifying stages, and inspecting the data in between stages are some requirements that need to be managed. One popular method is the sklearn Pipeline object. It can be used to organise and inspect the individual stages. However, its most powerful feature is that it provides a grid search capability, allowing the user to **test** different hyperparameter combinations across the different stages. Say you wanted to see how a more aggressive baseline correction strategy would affect the binning of the signal, and thus the decomposition result. it does require a modicrum of work to set up, but the payoff is large. In this notebook we will set up a pipeline framework that will enable us to organise the stages of the pipeline and inspect the pipeline if an error is encountered.

# Setup


In [1]:
%reload_ext autoreload
%autoreload 2

# get the test data as two tables: metadata and a samplewise stacked img table

import logging

import duckdb as db
import polars as pl
from sqlalchemy import create_engine

from tests.test_definitions import TEST_DB_PATH

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

with db.connect(TEST_DB_PATH) as conn:
    ids = [x[0] for x in conn.execute("select distinct runid from inc_chm").fetchall()]
    conn.close()

ids


['54', '0131', '61', '89']

# Define Test Data

We want a representative dataset small enough to enable quick iterations.

In [2]:
testdata_filter_expr = pl.col("mins").is_between(0, 2) & pl.col("nm").is_between(
    230, 270
)


# Decomposition


visual inspection hints at 8 peaks translating to a 8 + 1 rank, with 1 for noise.


In [3]:
import sys

print(sys.path)


['/Users/jonathan/.pyenv/versions/3.12.2/lib/python312.zip', '/Users/jonathan/.pyenv/versions/3.12.2/lib/python3.12', '/Users/jonathan/.pyenv/versions/3.12.2/lib/python3.12/lib-dynload', '', '/Users/jonathan/Library/Caches/pypoetry/virtualenvs/pca-analysis-6KQS4gUX-py3.12/lib/python3.12/site-packages', '/Users/jonathan/mres_thesis/pca_analysis']


In [4]:
import logging
from copy import deepcopy
from pathlib import Path

import polars as pl
from sklearn.pipeline import Pipeline
from sqlalchemy import Engine, create_engine

from pca_analysis.notebooks.experiments.parafac2_pipeline.data import Data
from pca_analysis.notebooks.experiments.parafac2_pipeline.input_data import (
    InputDataGetter,
)
from pca_analysis.notebooks.experiments.parafac2_pipeline.pipeline import (
    create_pipeline,
)
from pca_analysis.notebooks.experiments.parafac2_pipeline.pipeline_defs import DCols
from pca_analysis.notebooks.experiments.parafac2_pipeline.results_db import (
    load_new_results,
)


In [5]:
from pca_analysis.definitions import ROOT

exec_id = "1"
input_db_path = TEST_DB_PATH
runids = ids
filter_expr = testdata_filter_expr
raw_data_extractor = InputDataGetter(input_db_path=input_db_path, ids=runids)
input_data = Data(
    time_col=str(DCols.TIME),
    runid_col=str(DCols.RUNID),
    nm_col=str(DCols.NM),
    abs_col=str(DCols.ABS),
    scalar_cols=[str(DCols.PATH), str(DCols.ID)],
)  # ignore
input_data = input_data.load_data(raw_data_extractor)
input_data = input_data.filter_nm_tbl(expr=filter_expr)
logfile = Path(ROOT) / "pipeline_log"

pipeline = create_pipeline()
pipeline.set_params(
    parafac2__nn_modes="all",
    bcorr__lam=1e5,
    parafac2__linesearch=False,
    parafac2__rank=12,
)
X = input_data.to_X()
runids = X.runids.get_column("runid").to_list()
wavelength_labels = X.wavelength_labels.get_column("wavelength").to_list()
time_labels = X.time_labels

# to capture print for logs. See <https://johnpaton.net/posts/redirect-logging/>
import contextlib

with open(logfile, "w") as h, contextlib.redirect_stdout(h):
    pipeline.fit_transform(X.data)

# display last two lines of the fit report (PARAFAC2)
with open(logfile, "r") as f:
    logger.info("\n".join(f.readlines()[-2:]))


  warn(


A demonstration of some visualisation of the decomposition results.


## Postprocessing


In [203]:
from pca_analysis.notebooks.experiments.parafac2_pipeline.parafac2_viz import (
    Parafac2Viz,
)
from pca_analysis.notebooks.experiments.parafac2_pipeline.parafac2postprocessing import (
    Parafac2PostProcessor,
)

pp = Parafac2PostProcessor(decomp=pipeline.named_steps["parafac2"].decomp_)


rectified_df = pp.combine_parafac2_results_input_signal(
    runid_order=X.runids, input_signal=input_data._nm_tbl
)

wavelength = 256
runid = rectified_df.get_column("runid")[0]

pv = Parafac2Viz()

pv.overlay_components_input_signal(
    rectified_df=rectified_df, wavelength=wavelength, runid=runid
)
