In [1]:
from __future__ import annotations

import argparse
import logging
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)

In [2]:
class BaseHandler(ABC):
    """Абстрактный обработчик в цепочке ответственности.


    Каждый обработчик реализует `_process` и при необходимости передаёт результат дальше.
    """

    def __init__(self) -> None:
        self._next: Optional[BaseHandler] = None

    def set_next(self, handler: "BaseHandler") -> "BaseHandler":
        self._next = handler
        return handler

    def handle(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        processed = self._process(data)
        if self._next is not None:
            return self._next.handle(processed)
        return processed

    @abstractmethod
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        raise NotImplementedError

In [3]:
class RawCSVLoader(BaseHandler):
    """Загружает CSV-файл в DataFrame.

    Особенности:
    - Пытается несколько кодировок (utf-8, cp1251, latin1).
    - Использует pandas.read_csv(engine='python') для корректной обработки
      полей в кавычках, включая новые строки внутри полей.
    - Возвращает DataFrame.
    """

    _DEFAULT_ENCODINGS: Sequence[str] = ("utf-8", "cp1251", "latin1")

    def __init__(
        self,
        csv_path: Path,
        encodings: Optional[Sequence[str]] = None,
        **read_csv_kwargs,
    ) -> None:
        super().__init__()
        self.csv_path = Path(csv_path)
        self.encodings = (
            tuple(encodings) if encodings is not None else self._DEFAULT_ENCODINGS
        )
        self.read_csv_kwargs = read_csv_kwargs

    def _process(self, data: Optional[pd.DataFrame]) -> pd.DataFrame:
        if not self.csv_path.exists():
            raise FileNotFoundError(f"CSV not found: {self.csv_path}")

        for enc in self.encodings:
            try:
                logger.info("Reading %s with encoding=%s", self.csv_path, enc)
                df = pd.read_csv(
                    self.csv_path,
                    sep=",",
                    quotechar='"',
                    engine="python",
                    encoding=enc,
                    index_col=0,
                    **self.read_csv_kwargs,
                )
                # Небольшая постобработка имён колонок
                df.columns = [str(c).strip() for c in df.columns]
                logger.info("Loaded dataframe shape=%s (encoding=%s)", df.shape, enc)

                return df
            except Exception as exc:
                logger.debug("Failed to read with encoding %s: %s", enc, exc)
                continue

        raise RuntimeError(f"Unable to parse CSV with encodings {self.encodings}")

In [4]:
class TextClean(BaseHandler):
    """Очищает весь текст в DataFrame от мусорных Unicode и спецсимволов."""

    @staticmethod
    def _clean_text(s: str) -> str:
        if not isinstance(s, str):
            return s
        # BOM и неразрывные пробелы
        s = s.replace("\ufeff", "").replace("\xa0", "")
        # управляющие символы
        s = re.sub(r"[\t\n\r]", " ", s)
        # непечатаемые unicode
        s = "".join(ch for ch in s if ch.isprintable())
        # лишние пробелы
        s = re.sub(r"\s+", " ", s)
        return s.strip()

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        df = data.copy()
        text_cols = df.select_dtypes(include=["object", "category"]).columns

        for col in text_cols:
            df[col] = df[col].map(self._clean_text)
        return df

In [5]:
class TargetSalary(BaseHandler):
    """Преобразует колонку 'ЗП' в числовую зарплату в рублях."""

    EXCHANGE_RATES = {
        "RUB": 1.0,
        "руб": 1.0,
        "руб.": 1.0,
        "rub": 1.0,
        "USD": 77.8332,
        "EUR": 90.5366,
        "KZT": 0.15233,
        "UAH": 1.79369,
        "KGS": 0.890031,
        "BYN": 26.8381,
        "AZN": 45.7842,
        "GBP": 104.1953,
        "CNY": 11.151,
    }

    def __init__(self, column: str = "ЗП") -> None:
        super().__init__()
        self.column = column

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None or self.column not in data.columns:
            return data

        df = data.copy()

        df["ЗП_num"] = df[self.column].str.extract(r"(\d+)").astype(float)
        df["Валюта"] = df[self.column].str.extract(r"([A-ZА-Я]+|руб)")[0]

        df[self.column] = df.apply(
            lambda row: (
                row["ЗП_num"] * self.EXCHANGE_RATES.get(row["Валюта"], 1.0)
                if pd.notna(row["ЗП_num"])
                else None
            ),
            axis=1,
        )

        df.drop(columns=["ЗП_num", "Валюта"], inplace=True)
        return df

In [6]:
class TargetOutlier(BaseHandler):
    """Обработка выбросов в таргете (IQR)."""

    def __init__(
        self,
        target: str = "ЗП",
        iqr_factor: float = 1.5,
        strategy: str = "clip",  # 'clip' | 'remove' | 'nan'
    ) -> None:
        """
        :param target: имя таргет-колонки
        :param iqr_factor: множитель IQR
        :param strategy:
            - 'clip'   → обрезать до границ
            - 'remove' → удалить строки
            - 'nan'    → заменить на NaN
        """
        super().__init__()
        self.target = target
        self.iqr_factor = iqr_factor
        self.strategy = strategy

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None or self.target not in data.columns:
            return data

        df = data.copy()

        q1 = df[self.target].quantile(0.25)
        q3 = df[self.target].quantile(0.75)
        iqr = q3 - q1

        lower = q1 - self.iqr_factor * iqr
        upper = q3 + self.iqr_factor * iqr

        mask = (df[self.target] < lower) | (df[self.target] > upper)

        if self.strategy == "clip":
            df[self.target] = df[self.target].clip(lower, upper)
        elif self.strategy == "remove":
            df = df.loc[~mask]
        elif self.strategy == "nan":
            df.loc[mask, self.target] = None
        else:
            raise ValueError(f"Unknown strategy: {self.strategy}")
        return df

In [7]:
class Completeness(BaseHandler):
    """Работа с полнотой данных: дубликаты, NaN, плохие колонки."""

    def __init__(
        self, drop_duplicates: bool = True, drop_threshold: float = 0.5
    ) -> None:
        super().__init__()
        self.drop_duplicates = drop_duplicates
        self.drop_threshold = drop_threshold

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        df = data.copy()

        # 1. Дубликаты
        if self.drop_duplicates and len(df) > 100:
            df = df.drop_duplicates()

        # 2. Плохие колонки
        thresh = int(self.drop_threshold * len(df))
        df = df.dropna(axis=1, thresh=thresh)

        # 3. Числовые NaN → медиана
        num_cols = df.select_dtypes(include=["number"]).columns
        for c in num_cols:
            df[c] = df[c].fillna(df[c].median())

        # 4. Категориальные NaN
        cat_cols = df.select_dtypes(include=["object", "category"]).columns
        for c in cat_cols:
            df[c] = df[c].fillna("__missing__")
        return df

In [8]:
class EncoderHelpers:

    @staticmethod
    def _parse_sex(s: str) -> Optional[int]:
        if not isinstance(s, str):
            return None
        s = s.lower()
        if "муж" in s or "male" in s:
            return 1
        if "жен" in s or "female" in s:
            return 0
        return None

    @staticmethod
    def _parse_age(s: str) -> Optional[float]:
        if not isinstance(s, str):
            return None
        match = re.search(r"(\d{2})\s*(лет|года|год|years?)", s.lower())
        if match:
            return float(match.group(1))
        return None

    @staticmethod
    def _map_position(title: str) -> str:
        if not isinstance(title, str):
            return "other"
        t = title.lower()
        if any(
            x in t
            for x in [
                "программист",
                "разработчик",
                "developer",
                "frontend",
                "backend",
                "web",
                "java",
                "python",
                "php",
                "1с",
                "1c",
                "qa",
                "тестировщик",
                "верстальщик",
            ]
        ):
            return "dev"
        if any(
            x in t
            for x in [
                "системн",
                "администратор",
                "it-",
                "ит-",
                "it ",
                "сетев",
                "devops",
                "безопасност",
                "информационн",
                "баз данных",
                "dba",
            ]
        ):
            return "admin"
        if any(
            x in t
            for x in [
                "менеджер",
                "manager",
                "руководител",
                "начальник",
                "project",
                "product",
                "координатор",
                "lead",
                "тимлид",
            ]
        ):
            return "manager"
        if any(x in t for x in ["аналитик", "analysis", "data", "bi"]):
            return "analyst"
        if any(
            x in t for x in ["поддерж", "support", "helpdesk", "оператор", "клиент"]
        ):
            return "support"
        if any(x in t for x in ["маркет", "seo", "контент", "дизайн"]):
            return "marketing"
        if any(x in t for x in ["инженер", "монтаж", "техник", "электрик", "слаботоч"]):
            return "engineer"
        return "other"

    @staticmethod
    def _extract_city(s: str) -> str:
        if not isinstance(s, str):
            return "unknown"
        return s.split(",")[0].strip().lower()

    @staticmethod
    def _categorize_city(city: str) -> str:
        if not isinstance(city, str):
            return "unknown"
        c = city.lower()
        if any(
            x in c
            for x in ["москва", "zelenograd", "подольск", "люберцы", "домодедово"]
        ):
            return "moscow"
        if any(
            x in c
            for x in ["санкт-петербург", "saint petersburg", "st. petersburg", "spb"]
        ):
            return "spb"
        large_cities = {
            "новосибирск",
            "екатеринбург",
            "казань",
            "нижний новгород",
            "челябинск",
            "красноярск",
            "омск",
            "самара",
            "уфа",
            "воронеж",
            "пермь",
            "ростов-на-дону",
            "краснодар",
        }
        if c in large_cities:
            return "large_city"
        medium_cities = {
            "волгоград",
            "тюмень",
            "саратов",
            "тольятти",
            "ижевск",
            "иркутск",
            "хабаровск",
            "барнаул",
            "ульяновск",
            "ярославль",
            "томск",
            "владивосток",
            "махачкала",
            "балашиха",
            "кемерово",
            "оренбург",
            "новокузнецк",
            "астрахань",
            "киров",
            "пенза",
            "рязань",
            "чебоксары",
            "набережные челны",
            "калининский",
            "липецк",
        }
        if c in medium_cities:
            return "medium_city"
        return "small_city"

    @staticmethod
    def _extract_business_trip(s: str) -> str:
        if not isinstance(s, str):
            return np.nan
        s = s.lower()
        if re.search(
            r"не готов[а]? к командировкам|not prepared for business trips", s
        ):
            return "no"
        if re.search(
            r"готов[а]? к редким командировкам|prepared for rare business trips", s
        ):
            return "rare"
        if re.search(r"готов[а]? к командировкам|prepared for business trips", s):
            return "yes"
        return np.nan

    employment_map = {
        "полная занятость": "full_time",
        "full time": "full_time",
        "частичная занятость": "part_time",
        "part time": "part_time",
        "проектная работа": "project",
        "project work": "project",
        "стажировка": "project",
        "work placement": "project",
        "волонтерство": "project",
        "volunteering": "project",
    }
    schedule_map = {
        "полный день": "full_day",
        "full day": "full_day",
        "гибкий график": "flexible",
        "flexible schedule": "flexible",
        "сменный график": "shift_or_rotation",
        "вахтовый метод": "shift_or_rotation",
        "удаленная работа": "remote",
        "remote working": "remote",
        "rotation based work": "shift_or_rotation",
    }
    position_map = {
        "dev": [
            "программист",
            "разработчик",
            "developer",
            "frontend",
            "backend",
            "web",
            "java",
            "python",
            "php",
            "1с",
            "1c",
            "qa",
            "тестировщик",
            "верстальщик",
            "инженер-программист",
            "ведущий программист",
            "ведущий инженер-программист",
            "специалист по it",
            "web-разработчик",
            "frontend-разработчик",
        ],
        "sysadmin": [
            "системн",
            "администратор",
            "it-",
            "ит-",
            "it ",
            "сетев",
            "devops",
            "безопасност",
            "информационн",
            "баз данных",
            "dba",
            "системный администратор",
            "ведущий системный администратор",
            "системный инженер",
        ],
        "manager": [
            "руководител",
            "начальник",
            "директор",
            "генеральный директор",
            "начальник отдела информационных технологий",
            "руководитель направления",
            "менеджер",
            "lead",
            "тимлид",
            "project",
            "product",
            "координатор",
        ],
        "analyst": ["аналитик", "analysis", "data", "bi"],
        "support": [
            "поддерж",
            "support",
            "helpdesk",
            "оператор",
            "клиент",
            "специалист технической поддержки",
            "инженер технической поддержки",
        ],
        "marketing_sales": [
            "маркет",
            "seo",
            "контент",
            "дизайн",
            "контент-менеджер",
            "интернет-маркетолог",
            "менеджер интернет-магазина",
            "продаж",
            "продавец",
        ],
        "engineer": [
            "инженер",
            "монтаж",
            "техник",
            "электрик",
            "слаботоч",
            "сервис",
            "монтажник слаботочных систем",
            "сервисный инженер",
            "технический специалист",
            "ведущий инженер",
            "старший инженер",
        ],
        "other": [
            "специалист",
            "ведущий специалист",
            "главный специалист",
            "не указано",
            "freelance",
            "фриланс",
        ],
    }
    edu_map = {
        "высшее": [
            "высшее",
            "higher education",
            "бакалавр",
            "bachelor",
            "магистр",
            "master",
            "магистратура",
            "diploma",
            "graduate",
            "университет",
            "academy",
            "институт",
            "college",
            "school of",
        ],
        "среднее специальное": [
            "среднее специальное",
            "special",
            "колледж",
            "technical college",
            "техникум",
            "vocational",
            "профессиональный лицей",
            "пту",
            "профессиональное училище",
            "school",
            "vocational school",
        ],
        "неоконченное высшее": [
            "неоконченное высшее",
            "incomplete",
            "н/высшее",
            "unfinished",
            "не закончено",
            "non-complete",
        ],
        "среднее образование": [
            "среднее образование",
            "secondary education",
            "high school",
            "школа",
            "general education",
        ],
    }

    @staticmethod
    def _parse_employment(s: str) -> list:
        if not isinstance(s, str):
            return []
        items = [x.strip() for x in s.lower().split(",")]
        normalized = [
            EncoderHelpers.employment_map[x]
            for x in items
            if x in EncoderHelpers.employment_map
        ]
        return list(set(normalized))

    @staticmethod
    def _parse_schedule(s: str) -> list:
        if not isinstance(s, str):
            return []
        items = [x.strip() for x in re.split(r",|\s*,\s*", s.lower())]
        normalized = [
            EncoderHelpers.schedule_map[x]
            for x in items
            if x in EncoderHelpers.schedule_map
        ]
        return list(set(normalized))

    @staticmethod
    def _parse_experience(text: str) -> Optional[float]:
        if not isinstance(text, str) or "Не указано" in text:
            return None
        keywords = r"(?:опыт\s*работ[а-я]*|work\s*experience|exp)"
        pattern = re.compile(
            rf"{keywords}"
            r"(?:\s*(\d+)\s*(?:лет|г\.|год|года|years?))?"
            r"(?:\s*(\d+)\s*(?:месяц[а-я]*|мес\.|месяц|месяцев|months?))?",
            flags=re.IGNORECASE,
        )
        match = pattern.search(text)
        if match:
            years = int(match.group(1)) if match.group(1) else 0
            months = int(match.group(2)) if match.group(2) else 0
            return years + months / 12
        return None

    @staticmethod
    def _group_position(position: str) -> str:
        if not isinstance(position, str):
            return "other"
        pos = position.lower()
        for group, keywords in EncoderHelpers.position_map.items():
            if any(k in pos for k in keywords):
                return group
        return "other"

    @staticmethod
    def _parse_education_levels(edu: str) -> list:
        if not isinstance(edu, str):
            return ["other"]
        e = edu.lower()
        categories = set()
        for group, keywords in EncoderHelpers.edu_map.items():
            if any(k in e for k in keywords):
                categories.add(group)
        if not categories:
            categories.add("other")
        return list(categories)

In [9]:
class EncodeSexAge(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Пол, возраст"
        if col not in data.columns:
            return data

        df = data.copy()
        df["Пол"] = df[col].map(self._parse_sex).fillna(-1)
        ages = df[col].map(self._parse_age)
        median_age = ages.median()

        df["Возраст"] = ages.clip(lower=18, upper=75).fillna(median_age)
        df.drop(columns=[col], inplace=True)
        return df

In [10]:
class EncodeTargetPosition(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Ищет работу на должность:"
        if col not in data.columns:
            return data

        df = data.copy()
        df["position_group"] = df[col].map(self._map_position)
        df = pd.get_dummies(df, columns=["position_group"], drop_first=True)
        df.drop(columns=[col], inplace=True)
        return df

In [11]:
class EncodeCityTrip(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Город"
        if col not in data.columns:
            return data

        df = data.copy()
        df["city"] = df[col].map(self._extract_city).map(self._categorize_city)
        df["business_trip"] = df[col].map(self._extract_business_trip)

        most_common = df["business_trip"].mode()[0]
        df["business_trip"] = df["business_trip"].fillna(most_common)

        df = pd.get_dummies(df, columns=["city", "business_trip"], drop_first=True)
        df.drop(columns=[col], inplace=True)
        return df

In [12]:
class EncodeEmployment(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Занятость"
        if col not in data.columns:
            return data

        df = data.copy()
        df["employment_list"] = df[col].map(self._parse_employment)
        df["full_time"] = df["employment_list"].apply(lambda x: int("full_time" in x))
        df["part_time"] = df["employment_list"].apply(lambda x: int("part_time" in x))
        df["project_like"] = df["employment_list"].apply(lambda x: int("project" in x))
        df.drop(columns=[col, "employment_list"], inplace=True)
        return df

In [13]:
class EncodeSchedule(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "График"
        if col not in data.columns:
            return data

        df = data.copy()
        df["schedule_list"] = df[col].map(self._parse_schedule)
        for c in ["full_day", "flexible", "shift_or_rotation", "remote"]:
            df[c] = df["schedule_list"].apply(lambda x: int(c in x))
        df.drop(columns=[col, "schedule_list"], inplace=True)
        return df

In [14]:
class EncodeExperience(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Опыт (двойное нажатие для полной версии)"
        if col not in data.columns:
            return data

        df = data.copy()
        df["years_experience"] = df[col].map(self._parse_experience)
        df["years_experience"] = (
            df["years_experience"].fillna(df["years_experience"].median()).clip(0, 45)
        )
        df.drop(columns=[col], inplace=True)
        return df

In [15]:
class EncodeAuto(BaseHandler):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Авто"
        if col not in data.columns:
            return data

        df = data.copy()
        df["Авто_наличие"] = (df[col] == "Имеется собственный автомобиль").astype(int)
        df.drop(columns=[col], inplace=True)
        return df

In [16]:
class DropUnusedColumns(BaseHandler):
    COLUMNS = ["Последенее/нынешнее место работы", "Обновление резюме"]

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None
        return data.drop(columns=[c for c in self.COLUMNS if c in data.columns])

In [17]:
class EncodeLastPosition(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Последеняя/нынешняя должность"
        if col not in data.columns:
            return data

        df = data.copy()
        df["position_group_last"] = df[col].map(self._group_position)
        df = pd.get_dummies(df, columns=["position_group_last"], drop_first=True)
        df.drop(columns=[col], inplace=True)
        return df

In [18]:
class EncodeEducation(BaseHandler, EncoderHelpers):
    def _process(self, data: Optional[pd.DataFrame]) -> Optional[pd.DataFrame]:
        if data is None:
            return None

        col = "Образование и ВУЗ"
        if col not in data.columns:
            return data

        df = data.copy()
        df["education_levels"] = df[col].map(self._parse_education_levels)
        df["edu_has_higher"] = df["education_levels"].apply(
            lambda x: int("высшее" in x)
        )
        df.drop(columns=[col, "education_levels"], inplace=True)
        return df

In [19]:
@dataclass
class SplitData:
    X: pd.DataFrame
    y: pd.Series

In [20]:
class TargetSplitter(BaseHandler):
    def __init__(self, target: Optional[str] = None) -> None:
        super().__init__()
        self.target = target
        # популярные имена для целевой переменной
        self._candidates = ["target", "y", "label", "salary", "ЗП", "Зарплата"]

    def _process(self, data: Optional[pd.DataFrame]) -> Optional[SplitData]:
        if data is None:
            return None

        if self.target is None:
            # Поиск подходящей колонки
            for c in self._candidates:
                if c in data.columns:
                    target_col = c
                    break
            else:
                # по умолчанию — последняя колонка
                target_col = data.columns[-1]
        else:
            target_col = self.target
            if target_col not in data.columns:
                raise KeyError(f"Target '{target_col}' not found")

        X = data.drop(columns=[target_col])
        y = data[target_col]

        return SplitData(X=X, y=y)

In [21]:
class NpySaver(BaseHandler):
    def __init__(
        self,
        output_dir: Path,
        x_name: str = "x_data.npy",
        y_name: str = "y_data.npy",
    ) -> None:
        super().__init__()
        self.output_dir = output_dir
        self.x_name = x_name
        self.y_name = y_name

    def _process(self, data: Optional[SplitData]) -> Optional[SplitData]:
        if data is None:
            return None

        x_path = self.output_dir / self.x_name
        y_path = self.output_dir / self.y_name

        logger.info("Saving X -> %s, y -> %s", x_path, y_path)

        np.save(x_path, data.X.to_numpy(dtype=float))
        np.save(y_path, data.y.to_numpy(dtype=float))

        return data

In [22]:
def build_pipeline(csv_path: Path, target: Optional[str]):
    loader = RawCSVLoader(csv_path)
    text_cleaner = TextClean()
    target_salary = TargetSalary("ЗП")
    target_outliers = TargetOutlier(target="ЗП", strategy="clip")
    completeness = Completeness()

    enc_sex_age = EncodeSexAge()
    enc_target_pos = EncodeTargetPosition()
    enc_city = EncodeCityTrip()
    enc_employment = EncodeEmployment()
    enc_schedule = EncodeSchedule()
    enc_experience = EncodeExperience()
    enc_auto = EncodeAuto()
    enc_last_pos = EncodeLastPosition()
    enc_education = EncodeEducation()
    drop_unused = DropUnusedColumns()

    splitter = TargetSplitter(target=target)
    saver = NpySaver(csv_path.parent)

    loader.set_next(text_cleaner).set_next(target_salary).set_next(
        target_outliers
    ).set_next(completeness).set_next(enc_sex_age).set_next(enc_target_pos).set_next(
        enc_city
    ).set_next(
        enc_employment
    ).set_next(
        enc_schedule
    ).set_next(
        enc_experience
    ).set_next(
        enc_auto
    ).set_next(
        enc_last_pos
    ).set_next(
        enc_education
    ).set_next(
        drop_unused
    ).set_next(
        splitter
    ).set_next(
        saver
    )

    return loader


def run_pipeline(csv_path: Path, target: Optional[str]) -> pd.DataFrame:
    pipeline = build_pipeline(csv_path, target)
    return pipeline.handle(None)

In [23]:
input_path = Path("../data/hh.csv")
target = "ЗП"

if not input_path.exists():
    raise FileNotFoundError(input_path)

if input_path.suffix == ".csv":
    df = run_pipeline(input_path, target)
else:
    raise ValueError("Unsupported file type. Use .csv")