In [None]:
import logging
import os
from pathlib import Path

import imker
import nltk
import pandas as pd
import wandb
from hydra import compose, initialize
from hydra.core.hydra_config import HydraConfig
from hydra.utils import instantiate
from kaggle import KaggleApi
from lightning import seed_everything
from sklearn.model_selection import BaseCrossValidator

from src.imker.tasks.preprocess import ExtractTfIdfFeaturesTask, TextCleansingTask, ExtractRawFeaturesTask
from src.utils.kaggle_utils import download_kaggle_competition_dataset, download_kaggle_datasets

In [None]:
OVERRIDES: list[str] = os.getenv("OVERRIDES", "experiment=004-tabular").split(",")
WANDB_KEY = os.getenv("WANDB_KEY", None)  # input your wandb key as environment variable

In [None]:
if OVERRIDES is None:
    raise ValueError("OVERRIDES is not set")

with initialize(version_base=None, config_path="../../configs"):
    CFG = compose(
        config_name="config.yaml",
        return_hydra_config=True,
        overrides=OVERRIDES,
    )
    HydraConfig.instance().set_config(CFG)  # use HydraConfig for notebook to use hydra job

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

if not logger.handlers:
    handler = logging.StreamHandler()
    logger.addHandler(handler)

KAGGLE_CLIENT = KaggleApi()
KAGGLE_CLIENT.authenticate()

INPUT_DIR = Path(CFG.paths.input_dir)

logger.info(f"start {OVERRIDES} 🚀")
seed_everything(CFG.seed)
wandb.login(key=WANDB_KEY)

nltk.data.path.append(CFG.paths.output_dir)
nltk.download("punkt", download_dir=CFG.paths.output_dir)

os.environ["CUML_LOG_LEVEL"] = "error"

### Load Data


In [None]:
download_kaggle_competition_dataset(
    client=KAGGLE_CLIENT,
    competition=CFG.meta.competition,
    out_dir=Path(CFG.paths.input_dir),
)

download_kaggle_datasets(
    client=KAGGLE_CLIENT,
    datasets=CFG.kaggle.external_datasets,
    out_dir=INPUT_DIR,
)

In [None]:
train_df = pd.read_csv(INPUT_DIR / "thedrcat/daigt-v2-train-dataset/train_v2_drcat_02.csv")

if CFG.debug:
    train_df = train_df.sample(100, random_state=CFG.seed).reset_index(drop=True)
    if "debug" not in CFG.lightning.logger.wandb.group:
        CFG.lightning.logger.wandb.group = CFG.experiment_name + "_debug"

logger.debug(f"train shape : {train_df.shape}")
logger.debug(f"train generated label : {train_df['label'].sum()}")

### CV Split


In [None]:
def assign_fold_index(train_df: pd.DataFrame, kfold: BaseCrossValidator) -> pd.DataFrame:
    train_df["fold"] = -1
    for fold_index, (_, valid_index) in enumerate(kfold.split(X=train_df, y=train_df["label"])):
        train_df.loc[valid_index, "fold"] = fold_index
    return train_df


kfold = instantiate(CFG.cv)
train_df = assign_fold_index(train_df=train_df, kfold=kfold)
train_df

### Feature Engineering


In [None]:
class Preprocessor(imker.BasePreProcessor):
    def __init__(self):
        self.extract_raw_features_task = imker.Task(
            imker.TaskConfig(
                task=ExtractRawFeaturesTask,
                init_params={"base_columns": ["label", "fold"]},
            )
        )
        self.text_cleansing_task = imker.Task(
            imker.TaskConfig(
                task=TextCleansingTask,
                init_params={"text_columns": ["text"]},
            )
        )
        self.extract_tfidf_features_task = imker.Task(
            imker.TaskConfig(
                task=ExtractTfIdfFeaturesTask,
                init_params={
                    "text_columns": ["text_cleansed"],
                    "max_features": 100,
                    "ngram_range": (1, 1),
                    "use_gpu": False,
                },
                cache=True,
            )
        )

    def forward(self, X, y=None):
        raw_features_x = self.extract_raw_features_task(X)

        # flow for text columns
        cleansed_texts_x = self.text_cleansing_task(X)
        cleansed_texts_x = self.extract_tfidf_features_task(cleansed_texts_x)
        return pd.concat([raw_features_x, cleansed_texts_x], axis=1)


pipe = imker.Pipeline(repo_dir=CFG.paths.output_dir, exp_name=CFG.experiment_name, pipeline_name=CFG.meta.competition)
pipe.set_preprocessor(Preprocessor)

In [None]:
pipe.test_preprocessing(X=train_df, reset_identifier=False)

In [None]:
pipe.dump()

In [None]:
pipe2 = imker.Pipeline.load(
    repo_dir=CFG.paths.output_dir,
    exp_name=CFG.experiment_name,
    pipeline_name=CFG.meta.competition,
    preprocessor=Preprocessor,
)
pipe2.test_preprocessing(X=train_df)