**2\. Анализ данных (Data Understanding)**

Цель шага – понять слабые и сильные стороны предоставленных данных, определить их достаточность, предложить идеи, как  
их использовать, и лучше понять процессы заказчика. Для этого мы строим графики, делаем выборки и рассчитываем  
статистики.

2.1 Сбор данных (Data collection)  
Выгружаем необходимые данные (или срез данных если их объем слишком велик) из источников.  
Версионируем данные средствами DVC.

2.2 Исследование данных (Data exploration)  
Исследуем данные, чтобы сформулировать гипотезы относительно того, как эти данные помогут решить задачу. Проверяем  
качество данных.

Ориентировочный список для проверки данных:

1. Загрузить репрезентативную выборку из набора данных  
2. Провести предварительный анализ всей выборки.  
* определить тип данных в каждом столбце  
* Категориальные данные (Номинальные, Порядковые)  
* Числовые данные (дискретные, непрерывные, интервальные, отношения)  
* При необходимости преобразовать данные к нужным типам  
* Проверить на выбросы, отсутствующие значения, невалидные значения(например в системе случился сбой и в поле с именемпопала длина просмотра).

(по результатам предварительного анализа сделать визуализацию, обычно это табличка с характеристиками или какой то из  
профайлеров)

3. На основе предыдущего анализа выполнить очистку данных (обработать выбросы, отсутствующие значения, удалить невалидные значения)  
4. Удалить из рассмотрения неинформативные данные. (лишние идентификаторы, служебные поля, поля с очень малым количеством значений)  
5. Провести статистический анализ оставшихся данных  
   1. рассчитать ключевые статистики для каждого типа данных  
   2. построить распределения (тип графика выбрать в зависимости от данных, часто полезно построить гистограмму, но иногда лучше воспользоваться линейным графиком или посмотреть распределение во времени с помощью scaterplot)  
6. Провести корреляционный анализ  
   1. Для количественных данных нормализовать данные и построить матрицу корреляции Пирсона  
   2. сделать выводы на основе матрицы (найти утечки данных, найти важные признаки линейно влияющие на целевой показатель, определить гипотезы по конструированию признаков)  
   3. Для количественных и порядковых данных \- построить матрицу корреляции Спирмена  сделать выводы аналогично предыдущему анализу, только учесть тип данных  
   4. Для всех данных построить матрицу корреляции Пфика  
   5. Сделать выводы на основе анализа, сделать оценку между всеми типами корреляции(на пересекающихся данных),  
   6. попытаться найти объяснения различиям.  
   7. Сделать выводы о наличии или отсутствии нелинейных связей.  
        
7.  Провести обработку данных, на основе выводов полученных в прошлых шагах.  
   1. провести дополнительную очистку  
   2. выполнить нужный тип энкодинга(если требуется)  
   3. Сконструировать новые признаки.

8. Построить Графики взаимодействия полученных данных с целевым показателем  
   1. Для количественных данных линейные графики на нормализованных данных  
   2. для категориальных данных с малым количеством категорий построить ScaterPlot во времени  
   3. для категориальных данных с большим количеством показателей построить heatmap во времени(обычно строят, только для ризнаков которые показывают высокие коэффициенты корреляции и потенциально интересны, пример такого графика спектрограмма в анализе звука)  
9. Сделать выводы на основе проведенного анализа и учитывая особенности планируемой архитектуры модели.  
   1. Какие данные и почему нельзя использовать в модели  
   2. какие данные можно использовать без преобразования  
   3. какие данные можно использовать выполнив преобразование  
   4. какие новые признаки нужно использовать и почему  
   5. есть ли смысл использовать один набор признаков или построить разные модели на подмножестве признаков и почему.  
   6. Выделить итоговый список необходимых данных  
10. Описать ожидания от модели на проанализированных данных

In [None]:
from lakefs_client import Configuration
from dotenv import load_dotenv
import os

load_dotenv()  # Загружает переменные из .env

def get_lakefs_config():
    config = Configuration()
    config.host = os.getenv("LAKEFS_HOST")
    config.username = os.getenv("LAKEFS_ACCESS_KEY")
    config.password = os.getenv("LAKEFS_SECRET_KEY")
    return config


In [None]:
from lakefs_client import Configuration
from lakefs_client import ApiClient
from lakefs_client.api import objects_api

def load_dataframe(configuration: Configuration, repository: str, ref: str, path: str):
    # Создаем клиента
    with ApiClient(configuration) as api_client:
        obj_api = objects_api.ObjectsApi(api_client)

        # Получаем объект
        #repository = "mfdp-fin-fraud-detection-data"
        #ref = "main"  # или другой commit/branch/tag
        #path = "ieee-fraud-detection/sample_submission.csv"

        response = obj_api.get_object(repository, ref, path)
        content = response.read()

        # Например, если CSV
        import pandas as pd
        from io import StringIO
        df = pd.read_csv(StringIO(content.decode('utf-8')))
        return df

In [None]:
# Получаем объект
configuration = get_lakefs_config()
repository = "mfdp-fin-fraud-detection-data"
ref = "main"  # или другой commit/branch/tag
path_input = "ieee-fraud-detection/sample_submission.csv"

df = load_dataframe(configuration, repository, ref, path_input)
df.head()

In [None]:
!pip install lakefs

In [None]:
import lakefs
from lakefs.client import Client
import pandas as pd

clt = Client(
    host=os.getenv("LAKEFS_HOST"),
    username=os.getenv("LAKEFS_ACCESS_KEY"),
    password=os.getenv("LAKEFS_SECRET_KEY"),
)

# Укажите имя репозитория и ветки
repo_name = "mfdp-fin-fraud-detection-data"
branch_name = "main"
file_path = "ieee-fraud-detection/sample_submission.csv"

# Получите объект файла
obj = lakefs.repository(repo_name, client=clt).branch(branch_name).object(path=file_path)

# Прочитайте содержимое файла
with obj.reader(mode='r') as file:
    df = pd.read_csv(file)

df.head()


In [None]:
import pandas as pd

# Read into pandas directly by supplying the lakeFS URI...
sample_submission = pd.read_csv(f"lakefs://mfdp-fin-fraud-detection-data/main/ieee-fraud-detection/sample_submission.csv")
sample_submission.head()

In [None]:
from dotenv import load_dotenv

load_dotenv()  # Загружает переменные из .env

# 1. create_lakefs_branch.py

In [None]:
import os
import requests
from requests.auth import HTTPBasicAuth

# Переменные окружения
LAKEFS_ENDPOINT = os.getenv("LAKEFS_ENDPOINT")
LAKEFS_REPO = os.getenv("LAKEFS_REPO")
LAKEFS_BRANCH = os.getenv("LAKEFS_BRANCH", "processing_branch")
LAKEFS_ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
LAKEFS_SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")

def create_lakefs_branch():
    auth = HTTPBasicAuth(LAKEFS_ACCESS_KEY, LAKEFS_SECRET_KEY)
    headers = {"Content-Type": "application/json"}

    branch_url = f"{LAKEFS_ENDPOINT}/api/v1/repositories/{LAKEFS_REPO}/branches"

    # Пытаемся создать ветку
    resp = requests.post(branch_url, json={"name": LAKEFS_BRANCH, "source": "main"}, auth=auth, headers=headers)

    if resp.status_code == 201:
        print(f"✅ Branch '{LAKEFS_BRANCH}' created successfully.")
    elif resp.status_code == 409:
        print(f"⚠️ Branch '{LAKEFS_BRANCH}' already exists.")
    else:
        print(f"❌ Failed to create branch: {resp.status_code}")
        print(resp.text)
        return

    # Получаем список веток
    list_resp = requests.get(branch_url, auth=auth)
    print("📂 Available branches:")
    for branch in list_resp.json().get("results", []):
        print(" -", branch['id'])

if __name__ == "__main__":
    create_lakefs_branch()


# 2. data_preprocessing.py

In [16]:
from lakefs_client import Configuration
from lakefs_client import ApiClient
from lakefs_client.api import objects_api

def load_dataframe(configuration: Configuration, repository: str, ref: str, path: str):
    # Создаем клиента
    with ApiClient(configuration) as api_client:
        obj_api = objects_api.ObjectsApi(api_client)

        response = obj_api.get_object(repository, ref, path)
        content = response.read()

        # Например, если CSV
        import pandas as pd
        from io import StringIO
        df = pd.read_csv(StringIO(content.decode('utf-8')))
        return df

In [17]:
from io import BytesIO
import pandas as pd
from lakefs_client import Configuration, ApiClient
from lakefs_client.api.objects_api import ObjectsApi
from lakefs_client.models import ObjectStats
from lakefs_client.rest import ApiException

def save_dataframe(df: pd.DataFrame, configuration: Configuration, repository: str, ref: str, path: str):
    try:
        with ApiClient(configuration) as api_client:
            obj_api = ObjectsApi(api_client)

            buffer = BytesIO()
            df.to_csv(buffer, index=False)
            buffer.seek(0)

            # Добавляем атрибут .name (имитация реального файла)
            buffer.name = path.split('/')[-1]

            obj_api.upload_object(
                repository=repository,
                branch=ref,
                path=path,
                content=buffer
            )

            print(f"✅ DataFrame saved to lakeFS at {repository}/{ref}/{path}")
    except ApiException as e:
        print(f"❌ Failed to upload object: {e}")


In [18]:
from sklearn.impute import SimpleImputer


def clean_data(df: pd.DataFrame):
    # Загрузка данных (предположим, что DataFrame называется df)
    #df = pd.read_csv('your_data.csv')
    df = df_cleaned

    # Рассчитываем процент пропущенных значений для каждого столбца
    missing_percentage = df.isnull().mean() * 100

    # Фильтруем столбцы, у которых меньше 90% пропусков
    df_cleaned = df.loc[:, missing_percentage < 90]

    # Разделяем данные на числовые и категориальные столбцы
    numerical_cols = df_cleaned.select_dtypes(include=['float64', 'int64']).columns
    categorical_cols = df_cleaned.select_dtypes(include=['object']).columns

    # Импутация для числовых столбцов (замена пропусков на медиану)
    numerical_imputer = SimpleImputer(strategy='median')
    df_cleaned[numerical_cols] = numerical_imputer.fit_transform(df_cleaned[numerical_cols])

    # Импутация для категориальных столбцов (замена пропусков на наиболее частое значение)
    categorical_imputer = SimpleImputer(strategy='most_frequent')
    df_cleaned[categorical_cols] = categorical_imputer.fit_transform(df_cleaned[categorical_cols])
    return df_cleaned

In [19]:
from sklearn.calibration import LabelEncoder


def encoder_data(df: pd.DataFrame):
    # Если в числовых столбцах есть текстовые значения, можно преобразовать их в числовые или удалить.
    # Например, если колонка 'category' - категориальная, можно ее закодировать.
    categorical_cols = df.select_dtypes(include=['object']).columns

    # Применяем LabelEncoder для категориальных признаков
    encoder = LabelEncoder()
    for col in categorical_cols:
        df[col] = encoder.fit_transform(df[col].astype(str))  # Преобразуем в строки перед кодированием

    return df

In [20]:
def impute_data(df: pd.DataFrame):
    # Например, добавим фичу как логарифм числового признака
    if 'feature' in df.columns:
        df['log_feature'] = df['feature'].apply(lambda x: np.log(x + 1))
    return df

In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder


# Переменные окружения
LAKEFS_ENDPOINT = os.getenv("LAKEFS_ENDPOINT")
LAKEFS_REPO = os.getenv("LAKEFS_REPO")
LAKEFS_BRANCH = os.getenv("LAKEFS_BRANCH", "processing_branch")
LAKEFS_ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
LAKEFS_SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")

path_train_transaction_input = "ieee-fraud-detection/train_transaction.csv"
path_train_identity_input = "ieee-fraud-detection/train_identity.csv"

path_output = "ieee-fraud-detection/clean_merge_dataset.csv"


def get_lakefs_config():
    config = Configuration()
    config.host = os.getenv("LAKEFS_HOST")
    config.username = os.getenv("LAKEFS_ACCESS_KEY")
    config.password = os.getenv("LAKEFS_SECRET_KEY")
    return config



def preprocess_data():
    # Чтение данных из CSV
    #df = pd.read_csv(INPUT_URI)
    configuration = get_lakefs_config()
    df_train_transaction = load_dataframe(configuration, LAKEFS_REPO, LAKEFS_BRANCH, path_train_transaction_input)
    print(f"✅ DataFrame transactions loaded frome lakeFS at {LAKEFS_REPO}/{LAKEFS_BRANCH}/{path_train_transaction_input}")

    df_train_identity = load_dataframe(configuration, LAKEFS_REPO, LAKEFS_BRANCH, path_train_identity_input)
    print(f"✅ DataFrame identity loaded frome lakeFS at {LAKEFS_REPO}/{LAKEFS_BRANCH}/{path_train_identity_input}")
    
    # Объединение по TransactionID
    df = df_train_transaction.merge(df_train_identity, on="TransactionID", how="left")
    print(f"✅ DataFrames transactions and identity merged")

    # Очистка данных (удаляем пропуски)
    df = clean_data(df)
    
    # Преобразование категориальных признаков (если есть)
    df = encoder_data(df)
    
    # Генерация фичей (если нужно)
    df = impute_data(df)
    
    # Сохранение обработанных данных
    save_dataframe(df, configuration, LAKEFS_REPO, LAKEFS_BRANCH, path_output)

    print(f"Processed data saved to {path_output}")

if __name__ == "__main__":
    preprocess_data()


# 3. validate_processed_data.py

In [5]:
from lakefs_client import Configuration
from lakefs_client import ApiClient
from lakefs_client.api import objects_api

def load_dataframe(configuration: Configuration, repository: str, ref: str, path: str):
    # Создаем клиента
    with ApiClient(configuration) as api_client:
        obj_api = objects_api.ObjectsApi(api_client)

        response = obj_api.get_object(repository, ref, path)
        content = response.read()

        # Например, если CSV
        import pandas as pd
        from io import StringIO
        df = pd.read_csv(StringIO(content.decode('utf-8')))
        return df

In [None]:
import os
import pandas as pd

path_input = "ieee-fraud-detection/clean_merge_dataset.csv"

def validate_data():
    # Загрузка обработанных данных
    configuration = get_lakefs_config()
    df = load_dataframe(configuration, LAKEFS_REPO, LAKEFS_BRANCH, path_input)
    
    # Проверка на пропуски
    if df.isnull().sum().any():
        print("Warning: Data contains missing values")
    else:
        print("No missing values in data")
    
    # Проверка типов данных
    print(f"Data types: \n{df.dtypes}")
    
    # Вывод статистики
    print(f"Data statistics: \n{df.describe()}")

if __name__ == "__main__":
    validate_data()


No missing values in data
Data types: 
TransactionID      int64
isFraud          float64
dtype: object
Data statistics: 
       TransactionID   isFraud
count   5.066910e+05  506691.0
mean    3.916894e+06       0.5
std     1.462692e+05       0.0
min     3.663549e+06       0.5
25%     3.790222e+06       0.5
50%     3.916894e+06       0.5
75%     4.043566e+06       0.5
max     4.170239e+06       0.5


## 4. commit_preprocessed_data.py

In [7]:
from lakefs_client import Configuration, ApiClient
from lakefs_client.api.commits_api import CommitsApi
from lakefs_client.models import CommitCreation

import os

# Переменные окружения
LAKEFS_REPO = os.getenv("LAKEFS_REPO")
LAKEFS_BRANCH = os.getenv("LAKEFS_BRANCH", "processing_branch")
LAKEFS_ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY")
LAKEFS_SECRET_KEY = os.getenv("LAKEFS_SECRET_KEY")
LAKEFS_HOST = os.getenv("LAKEFS_HOST")


def get_lakefs_config():
    config = Configuration()
    config.host = LAKEFS_HOST
    config.username = LAKEFS_ACCESS_KEY
    config.password = LAKEFS_SECRET_KEY
    return config


def commit_processed_data():
    configuration = get_lakefs_config()
    
    with ApiClient(configuration) as api_client:
        commit_api = CommitsApi(api_client)

        commit = CommitCreation(
            message="💾 Added cleaned sample_submission.csv after preprocessing"
        )

        response = commit_api.commit(
            repository=LAKEFS_REPO,
            branch=LAKEFS_BRANCH,
            commit_creation=commit
        )

        print(f"✅ Commit completed. ID: {response.id}")

if __name__ == "__main__":
    commit_processed_data()


✅ Commit completed. ID: d16d6c4809525b8a12e4700fcbc2e51d3578407b6ab4f49d207ce7ffdd0eda2e


## 5. train_model.py

In [8]:
from lakefs_client import Configuration
from lakefs_client import ApiClient
from lakefs_client.api import objects_api

def load_dataframe(configuration: Configuration, repository: str, ref: str, path: str):
    # Создаем клиента
    with ApiClient(configuration) as api_client:
        obj_api = objects_api.ObjectsApi(api_client)

        response = obj_api.get_object(repository, ref, path)
        content = response.read()

        # Например, если CSV
        import pandas as pd
        from io import StringIO
        df = pd.read_csv(StringIO(content.decode('utf-8')))
        return df

In [None]:
import os
import mlflow
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, roc_auc_score

#OUTPUT_URI = os.getenv("OUTPUT_URI")
MLFLOW_TRACKING_URI = os.getenv("MLFLOW_TRACKING_URI")

path_input = "ieee-fraud-detection/clean_merge_dataset.csv"

def train_model():
    # Загрузка обработанных данных
    configuration = get_lakefs_config()
    df = load_dataframe(configuration, LAKEFS_REPO, LAKEFS_BRANCH, path_input)
    
    X = df.drop("target", axis=1)
    y = df["target"]
    
    # Инициализация MLflow
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    mlflow.start_run()

    # Обучение модели
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)
    
    # Предсказания
    y_pred = model.predict(X)
    accuracy = accuracy_score(y, y_pred)
    auc = roc_auc_score(y, model.predict_proba(X)[:, 1])

    # Логирование метрик и модели
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("auc", auc)
    mlflow.sklearn.log_model(model, "model")
    
    # Завершаем запуск
    mlflow.end_run()

if __name__ == "__main__":
    train_model()


KeyError: "['target'] not found in axis"