In [1]:
# Wevelet


import numpy as np
from sklearn.base import TransformerMixin
import pywt

class WaveletPackage(TransformerMixin):
  '''
  Extracts Wavelet Package features.
  The features are calculated by the energy of the recomposed signal
  of the leaf nodes coefficients.
  '''
  def fit(self, X, y=None):
    return self
  def transform(self, X, y=None):
    def Energy(coeffs, k):
      return np.sqrt(np.sum(np.array(coeffs[-k]) ** 2)) / len(coeffs[-k])
    def getEnergy(wp):
      coefs = np.asarray([n.data for n in wp.get_leaf_nodes(True)])
      return np.asarray([Energy(coefs,i) for i in range(2**wp.maxlevel)])
    return np.array([getEnergy(pywt.WaveletPacket(data=x, wavelet='db4',
                                                  mode='symmetric', maxlevel=4)
                                                  ) for x in X[:]])

In [None]:
import numpy as np
import pandas as pd

from datasets import UORED

domains = {
    '1': ('H_1_0', 'I_1_1', 'O_6_1', 'B_11_1'),
    '2': ('H_2_0', 'I_1_2', 'O_6_2', 'B_11_2'),
    '3': ('H_3_0', 'I_2_1', 'O_7_1', 'B_12_1'),
    '4': ('H_4_0', 'I_2_2', 'O_7_2', 'B_12_2'),
    '5': ('H_5_0', 'I_3_1', 'O_8_1', 'B_13_1'),
    '6': ('H_6_0', 'I_3_2', 'O_8_2', 'B_13_2'),
    '7': ('H_7_0', 'I_4_1', 'O_9_1', 'B_14_1'),
    '8': ('H_8_0', 'I_4_2', 'O_9_2', 'B_14_2'),
    '9': ('H_9_0', 'I_5_1', 'O_10_1', 'B_15_1'),
    '10': ('H_10_0', 'I_5_2', 'O_10_2', 'B_15_2'),
}

train_domain = [(1, 3, 5, 7), (1, 3, 5, 9), (1, 3, 7, 9), 
                (1, 5, 7, 9), (3, 5, 7, 9), (2, 4, 6, 8), 
                (2, 4, 6, 10), (2, 4, 8, 10), (2, 6, 8, 10), (4, 5, 8, 10)]
test_domain = [9, 7, 5, 3, 1, 10, 8, 6, 4, 2]

def get_signals(filepath, output_path, max_allowed_sample_size=420_000, segment_size=4096, transform=None, transform_segment=None):
    dataset = UORED()
    signals = []
    signal = dataset.load_file(filepath)[0]
    if transform is not None:
        signal = transform(signal)
    max_allowed_sample_size = len(signal)
    num_segments = max_allowed_sample_size//segment_size
    for i in range(num_segments):
        segment = signal[i*(segment_size):(i+1)*segment_size]
        if transform_segment is not None:
            segment = transform_segment(segment)
        signals.append(segment)
    return signals

classes = ("B", "I", "N", "O")
dsname_target = "uored_wt"
def create_wt_dataset():
    root_dir = "data/raw/uored"
    for i, tr_domain in enumerate(train_domain):
        tr = [domains[str(t)] for t in tr_domain]
        ts = domains[str(test_domain[i])]
        print(f"Round: {i+1}")
        # TRAIN
        for domain in tr:              
            output_dir = root_dir.replace('raw', 'processed').replace('uored', dsname_target) + f'/setup_{i+1}/train/'
            os.makedirs(output_dir, exist_ok=True)
            for basename in domain:
                path = f"{root_dir}/{basename}"
                output_path = os.path.join(output_dir, basename.replace('H', 'N'))
                get_segments(path, output_path, segment_size=2048)
        # TEST
        for basename in ts: 
            output_dir = root_dir.replace('raw', 'processed').replace('uored', dsname_target) + f'/setup_{i+1}/test/'
            os.makedirs(output_dir, exist_ok=True)
            path = f"{root_dir}/{basename}"
            output_path = os.path.join(output_dir, basename if basename[0]!='H' else basename.replace('H', 'N'))
            get_segments(path, output_path, segment_size=1750)


In [50]:
# wp_from_filelist.py
# ---------------------------------------------------------
# Apply your WaveletPackage to a list of .npy files and save a CSV
# No extra transforms/normalization. Exactly your class as provided.
# ---------------------------------------------------------

import numpy as np
import pandas as pd
from pathlib import Path
import pywt
from sklearn.base import TransformerMixin

# ---------- WaveletPackage (UNCHANGED) ----------
class WaveletPackage(TransformerMixin):
  '''
  Extracts Wavelet Package features.
  The features are calculated by the energy of the recomposed signal
  of the leaf nodes coefficients.
  '''
  def fit(self, X, y=None):
    return self
  def transform(self, X, y=None):
    def Energy(coeffs, k):
      return np.sqrt(np.sum(np.array(coeffs[-k]) ** 2)) / len(coeffs[-k])
    def getEnergy(wp):
      coefs = np.asarray([n.data for n in wp.get_leaf_nodes(True)])
      return np.asarray([Energy(coefs,i) for i in range(2**wp.maxlevel)])
    return np.array([getEnergy(pywt.WaveletPacket(data=x, wavelet='db4',
                                                  mode='symmetric', maxlevel=3)
                                                  ) for x in X[:]])
# ------------------------------------------------

def run(params: dict):
    """
    params expected keys:
      - file_paths: list of str/Path to .npy files (1D signals)
      - out_csv: output CSV path (str/Path)
      - add_path: (optional, bool) include original file path column. Default: True
      - add_label: (optional, bool) include 'label' = first char of filename. Default: True
    """
    file_paths = [Path(p) for p in params["file_paths"]]
    out_csv = Path(params["out_csv"])
    add_path = params.get("add_path", True)
    add_label = params.get("add_label", True)

    # Load signals (expects 1D; non-1D are skipped)
    signals, kept_files = [], []
    for f in file_paths:
        try:
            x = np.load(f)
            x = np.asarray(x).squeeze()   # keep it 1D if possible
            if x.ndim != 1:
                print(f"[SKIP] {f} not 1D (shape={x.shape})")
                continue
            signals.append(x)
            kept_files.append(f)
        except Exception as e:
            print(f"[ERROR] {f}: {e}")

    if not signals:
        print("[WARN] No valid signals to process.")
        return

    # Stack to (N, L) and apply WaveletPackage
    X = np.stack(signals, axis=0)
    wp = WaveletPackage()
    F = wp.transform(X)  # shape (N, 2**maxlevel) -> (N, 16) with maxlevel=4

    # Build DataFrame with features f0..fK
    df = pd.DataFrame(F, columns=[f"f{i}" for i in range(F.shape[1])])

    # Optional columns
    if add_path:
        df.insert(0, "path", [p.as_posix() for p in kept_files])
    if add_label:
        # Class is the first letter of the filename (e.g., B_..., I_..., N_..., O_...)
        labels = [(p.stem[0].upper() if p.stem else "") for p in kept_files]
        df.insert(1 if add_path else 0, "label", labels)

    # Save CSV
    out_csv.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_csv, index=False)
    print(f"[OK] Saved: {out_csv} | rows={len(df)}, cols={len(df.columns)}")


# ------------- Example usage -------------
import os
from pathlib import Path

if __name__ == "__main__":
    mode = 'test'
    for n_stp in range(4,11):            
        root=f"data/processed/uored_4096/setup_{n_stp}/{mode}"
        for file in os.listdir(root):
            paths = [Path(f'{root}/{file}') for file in os.listdir(root)]
        params = {
            "file_paths": paths,
            "out_csv": f"wp_features/setup_{n_stp}/{mode}.csv",
            "add_path": True,
            "add_label": True
        }
        run(params)


[OK] Saved: wp_features/setup_4/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_5/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_6/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_7/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_8/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_9/test.csv | rows=408, cols=10
[OK] Saved: wp_features/setup_10/test.csv | rows=408, cols=10


In [21]:
# train_rf_wp.py
# ---------------------------------------------------------
# Train a Random Forest on WaveletPackage features
# Train on:  wp_features/train.csv
# Test on:   wp_features/test.csv
# ---------------------------------------------------------

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

def load_xy(csv_path):
    df = pd.read_csv(csv_path)
    # feature columns: all that start with 'f'
    feat_cols = [c for c in df.columns if c.startswith('f')]
    if not feat_cols:
        raise ValueError(f"No feature columns starting with 'f' in {csv_path}")
    # label column: prefer 'label', fallback to 'label_str'
    if 'label' in df.columns:
        y = df['label'].values
    elif 'label_str' in df.columns:
        y = df['label_str'].values
    else:
        raise ValueError(f"No 'label' or 'label_str' column in {csv_path}")
    X = df[feat_cols].to_numpy(dtype=np.float32)
    return X, y

def main():
    train_csv = "wp_features/train.csv"
    test_csv  = "wp_features/test.csv"

    print("[INFO] Loading datasets…")
    X_train, y_train = load_xy(train_csv)
    X_test,  y_test  = load_xy(test_csv)

    print(f"[INFO] Train shape: X={X_train.shape}, y={y_train.shape}")
    print(f"[INFO] Test  shape: X={X_test.shape},  y={y_test.shape}")

    # Random Forest — simples e direto
    clf = RandomForestClassifier(
        n_estimators=300,
        max_depth=None,
        random_state=42,
        n_jobs=-1
    )

    print("[INFO] Training RandomForest…")
    clf.fit(X_train, y_train)

    print("[INFO] Evaluating…")
    y_pred = clf.predict(X_test)

    acc = accuracy_score(y_test, y_pred)
    f1m = f1_score(y_test, y_pred, average='macro')
    cm  = confusion_matrix(y_test, y_pred)

    print("\n===== RESULTS =====")
    print(f"Accuracy   : {acc:.4f}")
    print(f"F1-macro   : {f1m:.4f}")
    print("Confusion Matrix (rows=true, cols=pred):")
    print(cm)
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred, digits=4))

    # (Opcional) salvar o modelo:
    # import joblib
    # joblib.dump(clf, "rf_wp_model.joblib")
    # print("[INFO] Saved model to rf_wp_model.joblib")

if __name__ == "__main__":
    main()


[INFO] Loading datasets…
[INFO] Train shape: X=(1632, 16), y=(1632,)
[INFO] Test  shape: X=(408, 16),  y=(408,)
[INFO] Training RandomForest…
[INFO] Evaluating…

===== RESULTS =====
Accuracy   : 0.6373
F1-macro   : 0.5704
Confusion Matrix (rows=true, cols=pred):
[[ 83  19   0   0]
 [  0 102   0   0]
 [  0  30  72   0]
 [  0   6  93   3]]

Classification Report:
              precision    recall  f1-score   support

           B     1.0000    0.8137    0.8973       102
           I     0.6497    1.0000    0.7876       102
           N     0.4364    0.7059    0.5393       102
           O     1.0000    0.0294    0.0571       102

    accuracy                         0.6373       408
   macro avg     0.7715    0.6373    0.5704       408
weighted avg     0.7715    0.6373    0.5704       408



In [None]:
# make_wp_csv_dict.py
# ---------------------------------------------------------
# Build a CSV of WaveletPackage features from 1D .npy signals
# Using parameters passed via a dictionary instead of argparse
# ---------------------------------------------------------

import numpy as np
import pandas as pd
from pathlib import Path
import pywt
from sklearn.base import TransformerMixin

# ---------- WaveletPackage (original) ----------
class WaveletPackage(TransformerMixin):
    '''
    Extracts Wavelet Package features.
    The features are calculated by the energy of the recomposed signal
    of the leaf nodes coefficients.
    '''
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        def Energy(coeffs, k):
            return np.sqrt(np.sum(np.array(coeffs[-k]) ** 2)) / len(coeffs[-k])
        def getEnergy(wp):
            coefs = np.asarray([n.data for n in wp.get_leaf_nodes(True)])
            return np.asarray([Energy(coefs,i) for i in range(2**wp.maxlevel)])
        return np.array([getEnergy(pywt.WaveletPacket(data=x, wavelet='db4',
                                                      mode='symmetric', maxlevel=4)
                                                      ) for x in X[:]])
# ------------------------------------------------

def list_npy_files(root: Path):
    return [p for p in root.rglob("*.npy") if p.is_file()]

def run(params: dict):
    """
    params = {
        "in_root": "data/uored",
        "out_csv": "out/wp_features.csv",
        "add_path": True,
        "infer_label": True
    }
    """
    in_root = Path(params["in_root"]).resolve()
    out_csv = Path(params["out_csv"]).resolve()
    add_path = params.get("add_path", False)
    infer_label = params.get("infer_label", False)

    files = list_npy_files(in_root)
    if not files:
        print(f"[WARN] No .npy found under {in_root}")
        return

    # Load signals
    signals, valid_files = [], []
    for f in files:
        try:
            x = np.load(f)
            x = np.asarray(x).squeeze()
            if x.ndim != 1:
                print(f"[SKIP] {f} not 1D (shape={x.shape})")
                continue
            signals.append(x)
            valid_files.append(f)
        except Exception as e:
            print(f"[ERROR] {f}: {e}")

    if not signals:
        print("[WARN] No valid signals loaded.")
        return

    # Apply WaveletPackage
    wp = WaveletPackage()
    X = np.stack(signals, axis=0)
    F = wp.transform(X)  # features (N, 16)

    # Build DataFrame
    df = pd.DataFrame(F, columns=[f"f{i}" for i in range(F.shape[1])])

    if add_path:
        df.insert(0, "path", [p.as_posix() for p in valid_files])

    if infer_label:
        LABELS = {"B": 0, "I": 1, "N": 2, "O": 3}
        labels = []
        for p in valid_files:
            lbl = None
            for part in p.parts[::-1]:
                up = part.upper()
                if up in LABELS:
                    lbl = LABELS[up]; break
            if lbl is None and p.stem:
                lbl = LABELS.get(p.stem[0].upper(), None)
            labels.append(lbl)
        df.insert(1 if add_path else 0, "label", labels)

    # Save CSV
    out_csv.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_csv, index=False)
    print(f"[OK] Saved {out_csv} | rows={len(df)}, cols={len(df.columns)}")


# ---------------- EXAMPLE USO ----------------
if __name__ == "__main__":
    params = {
        "in_root": "data/uored",
        "out_csv": "out/wp_features.csv",
        "add_path": True,
        "infer_label": True
    }
    run(params)


In [3]:
import pandas as pd
import numpy as np

def load_xy(csv_path):
    df = pd.read_csv(csv_path)
    feat_cols = [c for c in df.columns if c.startswith('f')]
    if not feat_cols:
        raise ValueError(f"No feature columns starting with 'f' in {csv_path}")

    # Prefer numeric label; fallback to string label
    if 'label' in df.columns:
        y = df['label'].values
    elif 'label_str' in df.columns:
        mapping = {s: i for i, s in enumerate(sorted(df['label_str'].astype(str).unique()))}
        y = df['label_str'].map(mapping).values
    else:
        raise ValueError(f"No 'label' or 'label_str' column in {csv_path}")

    X = df[feat_cols].to_numpy(dtype=np.float32)
    return X, y

In [4]:
load_xy("wp_features/setup_3/train.csv")

(array([[1.5009004 , 0.6179552 , 0.42690998, ..., 0.45193693, 0.35042167,
         1.3690679 ],
        [0.6017406 , 0.15788439, 0.10962422, ..., 0.20720935, 0.13360715,
         0.7689403 ],
        [1.2230427 , 0.6502544 , 0.46886766, ..., 0.338925  , 0.3303898 ,
         1.3351505 ],
        ...,
        [9.006909  , 1.2786608 , 1.7502707 , ..., 2.1276722 , 1.2145617 ,
         4.35925   ],
        [1.7720951 , 1.2276839 , 0.48793954, ..., 0.3615816 , 0.29286718,
         0.5441611 ],
        [0.85376006, 0.9950107 , 0.86190337, ..., 0.4059483 , 0.59895235,
         0.6419706 ]], dtype=float32),
 array(['O', 'B', 'O', ..., 'I', 'O', 'O'], dtype=object))