In [2]:
!pip install sdv deap




In [3]:
!unrar x data.rar


UNRAR 6.11 beta 1 freeware      Copyright (c) 1993-2022 Alexander Roshal


Extracting from data.rar

Creating    data                                                      OK
Extracting  data/california_housing.csv                                   24%  OK 
Extracting  data/insurance.csv                                            31%  OK 
Extracting  data/my_classification.csv                                    55%  OK 
Extracting  data/my_regression.csv                                        80%  OK 
Extracting  data/titanic.csv                                              89%  OK 
Extracting  data/two_moons.csv                                            99%  OK 
All OK


In [5]:
import time
import pandas as pd
import numpy as np
import torch
import random
import warnings
from datetime import timedelta
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score
from hyperopt import hp
from deap import base, creator, tools, algorithms
from sdv.single_table import CTGANSynthesizer as CTGAN
from sdv.metadata import SingleTableMetadata
from xgboost import XGBClassifier
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import LabelEncoder
import tqdm

In [6]:
dir_datasets = 'data/'

# Загрузка реальных датасетов
real_data_1 = pd.read_csv(dir_datasets+'insurance.csv')
real_data_2 = pd.read_csv(dir_datasets+'my_classification.csv')
real_data_3 = pd.read_csv(dir_datasets+'my_regression.csv')
real_data_4 = pd.read_csv(dir_datasets+'two_moons.csv')
real_data_5 = pd.read_csv(dir_datasets+'california_housing.csv')
real_data_6 = pd.read_csv(dir_datasets+'titanic.csv')

# Словарь датасетов для удобства
datasets = {
               'insurance': {
                        "data": real_data_1,
                        "task": "regression",
                        "target": "expenses",
                        "num_columns":
                        ["age", "bmi", "children"],
                        "cat_columns":
                        ["sex", "smoker", "region"]
                    },
               'my_regression': {
                        "data": real_data_3,
                        "task": "regression",
                        "target": "target",
                        "num_columns":
                        ["feature_0", "feature_1", "feature_2", "feature_3"],
                        "cat_columns":
                        []
                    },
              'two_moons': {
                              "data": real_data_4,
                              "task": "classification",
                              "target": "label",
                              "num_columns":
                              ["x1", "x2"],
                              "cat_columns":
                              []
                          },

}

for name, data in datasets.items():
    print(f" Информация о датасете {name}:")
    print(f" Количество строк: {data['data'].shape[0]}")
    print(f" Количество колонок: {data['data'].shape[1]}")
    print(f" Колонки: {list(data['data'].columns)}")
    print(f" Задача: {data['task']}")
    print(f" Целевая переменная: {data['target']}")
    print(f" Числовые признаки: {data['num_columns']}")
    print(f" Категориальные признаки: {data['cat_columns']}")
    print()

 Информация о датасете insurance:
 Количество строк: 1338
 Количество колонок: 7
 Колонки: ['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'expenses']
 Задача: regression
 Целевая переменная: expenses
 Числовые признаки: ['age', 'bmi', 'children']
 Категориальные признаки: ['sex', 'smoker', 'region']

 Информация о датасете my_regression:
 Количество строк: 900
 Количество колонок: 5
 Колонки: ['feature_0', 'feature_1', 'feature_2', 'feature_3', 'target']
 Задача: regression
 Целевая переменная: target
 Числовые признаки: ['feature_0', 'feature_1', 'feature_2', 'feature_3']
 Категориальные признаки: []

 Информация о датасете two_moons:
 Количество строк: 900
 Количество колонок: 3
 Колонки: ['x1', 'x2', 'label']
 Задача: classification
 Целевая переменная: label
 Числовые признаки: ['x1', 'x2']
 Категориальные признаки: []



In [7]:
RANDOM_SEED = 42

random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(RANDOM_SEED)

In [8]:
def process_data(dataset, num_columns, cat_columns, transformation_num_type='None', transformation_cat_type='None'):

    df_processed = dataset.copy()

    # Обработка числовых признаков
    if transformation_num_type == 'CDF':
        # CDF трансформация: преобразует значения в их эмпирическую функцию распределения
        for col in num_columns:
            # Правильная формула для эмпирической CDF
            df_processed[col] = (df_processed[col].rank(method='average') - 0.5) / len(df_processed)

    elif transformation_num_type == 'PLE_CDF':
        # PLE_CDF (Probability Logit Envelope CDF) - более сложная трансформация
        for col in num_columns:
            # Сначала применяем CDF
            cdf_values = (df_processed[col].rank(method='average') - 0.5) / len(df_processed)

            # Затем применяем logit трансформацию с небольшим сглаживанием
            # Избегаем 0 и 1 для предотвращения бесконечности в logit
            epsilon = 1e-6
            cdf_values = np.clip(cdf_values, epsilon, 1 - epsilon)

            # Логит трансформация: ln(p/(1-p))
            df_processed[col] = np.log(cdf_values / (1 - cdf_values))

    # Обработка категориальных признаков
    if transformation_cat_type == 'OHE':
        # One Hot Encoding для категориальных признаков
        for col in cat_columns:
            # Создаем dummy переменные с префиксом имени колонки
            dummy_df = pd.get_dummies(df_processed[col], prefix=col, dtype=int)

            # Удаляем исходную категориальную колонку
            df_processed = df_processed.drop(columns=[col])

            # Добавляем новые dummy колонки
            df_processed = pd.concat([df_processed, dummy_df], axis=1)

    return df_processed

In [9]:
ctgan_space = {
    'discriminator_lr': (4e-4, 2.1e-3, 5e-5),
    'generator_lr': (5e-5, 5e-3, 5e-5),
    'batch_size': [100, 500, 1000],
    'embedding_dim': [32, 128],
    'generator_dim': [[128, 128, 128], [128, 128, 128, 128]],
    'discriminator_dim': [[256, 256], [256, 256, 256]],
    'generator_decay': (1e-6, 6.4e-6, 1e-7),
    'discriminator_decay': (1e-6, 8e-6, 1e-6),
    'log_frequency': [False, True],
    'transformation_num_type': ['CDF', 'PLE_CDF'],
    'transformation_cat_type': ['OHE']

}

In [10]:
import sys
import os
import builtins
import contextlib

@contextlib.contextmanager
def suppress_all_output():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        old_print = builtins.print
        builtins.print = lambda *args, **kwargs: None
        sys.stdout = devnull
        sys.stderr = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr
            builtins.print = old_print

In [11]:
import numpy as np
import random
from deap import base, creator, tools, algorithms
from hyperopt import hp
from sklearn.model_selection import KFold
import warnings
from contextlib import contextmanager
import sys
import os

@contextmanager
def suppress_all_output():
    """Контекстный менеджер для подавления всего вывода"""
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        old_stderr = sys.stderr
        sys.stdout = devnull
        sys.stderr = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout
            sys.stderr = old_stderr

class UniversalGeneticOptimizer:
    def __init__(self,
                 param_space,
                 population_size=20,
                 n_generations=10,
                 mutation_prob=0.2,
                 crossover_prob=0.8,
                 tournament_size=3,
                 random_seed=42,
                 verbose=True):
        """
        Универсальный генетический алгоритм для оптимизации гиперпараметров

        Args:
            param_space: словарь с определением пространства поиска
            population_size: размер популяции
            n_generations: количество поколений
            mutation_prob: вероятность мутации
            crossover_prob: вероятность скрещивания
            tournament_size: размер турнира для отбора
            random_seed: seed для воспроизводимости
            verbose: вывод прогресса
        """
        self.param_space = param_space
        self.population_size = population_size
        self.n_generations = n_generations
        self.mutation_prob = mutation_prob
        self.crossover_prob = crossover_prob
        self.tournament_size = tournament_size
        self.verbose = verbose

        # Устанавливаем seed
        random.seed(random_seed)
        np.random.seed(random_seed)

        # Анализируем пространство параметров
        self.param_info = self._analyze_param_space()

        # Настройка DEAP
        self._setup_deap()

        # История оптимизации
        self.history = {
            'generations': [],
            'best_fitness': [],
            'avg_fitness': [],
            'best_params': []
        }

    def _analyze_param_space(self):
        """Анализ пространства параметров для определения их типов"""
        param_info = {}

        for param_name, param_def in self.param_space.items():
            if isinstance(param_def, tuple) and len(param_def) == 2:
                # Непрерывный параметр (min, max)
                param_info[param_name] = {
                    'type': 'continuous',
                    'min': param_def[0],
                    'max': param_def[1],
                    'quantization': None
                }
            elif isinstance(param_def, tuple) and len(param_def) == 3:
                # Непрерывный параметр с квантованием (min, max, step)
                param_info[param_name] = {
                    'type': 'continuous',
                    'min': param_def[0],
                    'max': param_def[1],
                    'quantization': param_def[2]
                }
            elif isinstance(param_def, list):
                # Дискретный параметр
                param_info[param_name] = {
                    'type': 'discrete',
                    'values': param_def
                }
            else:
                raise ValueError(f"Неподдерживаемый тип параметра: {param_name}")

        return param_info

    def _setup_deap(self):
        """Настройка DEAP для минимизации"""
        # Создаем класс для фитнеса (минимизация)
        if not hasattr(creator, "FitnessMin"):
            creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
        if not hasattr(creator, "Individual"):
            creator.create("Individual", list, fitness=creator.FitnessMin)

        self.toolbox = base.Toolbox()

        # Регистрируем функции создания индивидов
        self.toolbox.register("individual", self._create_individual)
        self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)

        # Регистрируем операторы
        self.toolbox.register("mate", self._crossover)
        self.toolbox.register("mutate", self._mutate)
        self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size)
        self.toolbox.register("evaluate", self._evaluate_individual)

    def _create_individual(self):
        """Создание случайного индивида"""
        individual = []

        for param_name in self.param_space.keys():
            info = self.param_info[param_name]

            if info['type'] == 'continuous':
                # Непрерывный параметр
                value = np.exp(np.random.uniform(np.log(info['min']), np.log(info['max'])))
                if info['quantization'] is not None:
                    # Применяем квантование
                    value = round(value / info['quantization']) * info['quantization']
                    value = np.clip(value, info['min'], info['max'])
                individual.append(value)

            elif info['type'] == 'discrete':
                # Дискретный параметр - сохраняем индекс
                individual.append(np.random.randint(len(info['values'])))

        return creator.Individual(individual)

    def _decode_individual(self, individual):
        """Декодирование индивида в параметры"""
        params = {}

        for i, param_name in enumerate(self.param_space.keys()):
            info = self.param_info[param_name]

            if info['type'] == 'continuous':
                params[param_name] = individual[i]
            elif info['type'] == 'discrete':
                params[param_name] = info['values'][int(individual[i])]

        return params

    def _crossover(self, ind1, ind2):
        """One-point crossover"""
        # Выбираем случайную точку разреза
        if len(ind1) > 1:
            crossover_point = random.randint(1, len(ind1) - 1)
        else:
            crossover_point = 1

        # Обмениваемся частями после точки разреза
        for i in range(crossover_point, len(ind1)):
            ind1[i], ind2[i] = ind2[i], ind1[i]

        return ind1, ind2

    def _mutate(self, individual):
        """Операция мутации - случайное значение из пространства поиска"""
        for i, param_name in enumerate(self.param_space.keys()):
            if random.random() < self.mutation_prob:
                info = self.param_info[param_name]

                if info['type'] == 'continuous':
                    # Генерируем новое случайное значение
                    new_value = np.exp(np.random.uniform(np.log(info['min']), np.log(info['max'])))
                    if info['quantization'] is not None:
                        new_value = round(new_value / info['quantization']) * info['quantization']
                        new_value = np.clip(new_value, info['min'], info['max'])
                    individual[i] = new_value

                elif info['type'] == 'discrete':
                    # Выбираем случайный индекс
                    individual[i] = np.random.randint(len(info['values']))

        return individual,

    def _evaluate_individual(self, individual):
        """Оценка индивида"""

        params = self._decode_individual(individual)
        pair_score = self.objective_function(params)

        return pair_score,

    def set_objective_function(self, objective_func):
        """Установка целевой функции"""
        self.objective_function = objective_func

    def optimize(self):
        """Запуск оптимизации"""
        if self.verbose:
            print(f"Пространство параметров:")
            for param_name, info in self.param_info.items():
                if info['type'] == 'continuous':
                    quant_info = f", квантование: {info['quantization']}" if info['quantization'] else ""
                    print(f"  {param_name}: непрерывный ({info['min']}, {info['max']}){quant_info}")
                else:
                    print(f"  {param_name}: дискретный {info['values']}")
            print(f"Размер популяции: {self.population_size}")
            print(f"Количество поколений: {self.n_generations}")
            print(f"Вероятность мутации: {self.mutation_prob}")
            print(f"Вероятность скрещивания: {self.crossover_prob}")
            print("="*60)

        # Создание начальной популяции
        population = self.toolbox.population(n=self.population_size)

        # Оценка начальной популяции
        fitnesses = list(map(self.toolbox.evaluate, population))
        for ind, fit in zip(population, fitnesses):
            ind.fitness.values = fit

        if self.verbose:
            self._print_population_info(population, 0, "Начальная популяция")

        # Эволюция
        for generation in range(1, self.n_generations + 1):
            if self.verbose:
                print(f"\n{'='*20} ПОКОЛЕНИЕ {generation} {'='*20}")

            # Отбор родителей
            parents = self.toolbox.select(population, len(population))


            # Создание потомков
            offspring = list(map(self.toolbox.clone, parents))

            # Скрещивание (one-point crossover)
            for child1, child2 in zip(offspring[::2], offspring[1::2]):
                if random.random() < self.crossover_prob:
                    self.toolbox.mate(child1, child2)
                    del child1.fitness.values
                    del child2.fitness.values

            # Мутация
            for mutant in offspring:
                if random.random() < self.mutation_prob:
                    self.toolbox.mutate(mutant)
                    del mutant.fitness.values

            # Оценка потомков
            invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
            fitnesses = map(self.toolbox.evaluate, invalid_ind)
            for ind, fit in zip(invalid_ind, fitnesses):
                ind.fitness.values = fit

            # ЭЛИТАРНАЯ СТРАТЕГИЯ: объединение родителей и потомков
            combined_population = population + offspring
            population = tools.selBest(combined_population, self.population_size)


            # Сохранение статистики
            self._save_generation_stats(population, generation)


        # Лучший индивид
        best_individual = tools.selBest(population, 1)[0]
        best_params = self._decode_individual(best_individual)
        best_fitness = best_individual.fitness.values[0]


        return {
            'best_params': best_params,
            'best_pair': best_fitness,
            'pair_diff_from_optimal': best_fitness,
            'history': self.history
        }

    def _format_params(self, params):
        """Форматирование параметров для вывода"""
        formatted = {}
        for key, value in params.items():
            if isinstance(value, float):
                formatted[key] = f"{value}"
            else:
                formatted[key] = value
        return formatted

    def _print_population_info(self, population, generation, title):
        """Вывод информации о популяции"""
        fitnesses = [ind.fitness.values[0] for ind in population]
        best_fitness = min(fitnesses)
        avg_fitness = np.mean(fitnesses)

        print(f"\n{title}:")
        print(f"  Лучший PAIR: {-best_fitness}")
        print(f"  Средний PAIR: {-avg_fitness}")


    def _save_generation_stats(self, population, generation):
        """Сохранение статистики поколения"""
        fitnesses = [ind.fitness.values[0] for ind in population]
        best_fitness = min(fitnesses)
        avg_fitness = np.mean(fitnesses)

        best_individual = min(population, key=lambda x: x.fitness.values[0])
        best_params = self._decode_individual(best_individual)

        self.history['generations'].append(generation)
        self.history['best_fitness'].append(best_fitness)
        self.history['avg_fitness'].append(avg_fitness)
        self.history['best_params'].append(best_params)


In [12]:
def evaluate_pair(real_data, synthetic_data, cat_columns):
    # One-hot энкодинг для категориальных признаков
    real_data_encoded = pd.get_dummies(real_data, columns=cat_columns, drop_first=True)

    if hasattr(synthetic_data, "dataframe"):
        synthetic = synthetic_data.dataframe()
    else:
        synthetic = synthetic_data

    synthetic_encoded = pd.get_dummies(synthetic, columns=cat_columns, drop_first=True)

    # После get_dummies
    real_data_encoded = real_data_encoded.astype(float)
    synthetic_encoded = synthetic_encoded.astype(float)

    # Корреляции
    corr_real = real_data_encoded.corr()
    corr_synth = synthetic_encoded.corr()

    # Абсолютное отклонение корреляций
    pairwise_diff = np.abs(corr_real - corr_synth)

    # Усреднённое отклонение
    mean_diff = pairwise_diff.mean().mean()

    # Проверяем на NaN
    if np.isnan(mean_diff):
        print("Warning: mean_diff is NaN, returning 0")
        return 0.0

    return 1 - mean_diff

In [13]:
from sklearn.model_selection import KFold

def generate_and_evaluate_pair_scores(params, dataset_info, n_splits=3):
    warnings.filterwarnings("ignore", category=FutureWarning)
    warnings.filterwarnings("ignore", category=UserWarning)

    columns_dataset = dataset_info['num_columns'] + dataset_info['cat_columns'] + [dataset_info['target']]
    data = process_data(dataset_info['data'][columns_dataset], dataset_info['num_columns'], dataset_info['cat_columns'],
                        'None', 'None')

    kf = KFold(n_splits=n_splits, shuffle=False)
    pair_scores = []

    for train_index, test_index in kf.split(data):
        train_data = data.iloc[train_index].reset_index(drop=True)
        test_data = data.iloc[test_index].reset_index(drop=True)

        metadata = SingleTableMetadata()
        metadata.detect_from_dataframe(data=train_data)

        cat_columns = dataset_info['cat_columns'].copy()
        if dataset_info["task"] == 'classification':
            cat_columns.append(dataset_info['target'])
        for col in dataset_info['cat_columns']:
            metadata.update_column(col, sdtype='categorical')

        # Создание и обучение CTGAN с заданными параметрами
        ctgan = CTGAN(
            metadata=metadata,
            discriminator_lr=params['discriminator_lr'],
            generator_lr=params['generator_lr'],
            batch_size=params['batch_size'],
            embedding_dim=params['embedding_dim'],
            generator_dim=params['generator_dim'],
            generator_decay=params['generator_decay'],
            discriminator_decay=params['discriminator_decay'],
            log_frequency=params['log_frequency'],
        )

        ctgan.fit(train_data)

        synthetic_data = ctgan.sample(len(test_data))

        # Вычисляем Pair-score
        score = evaluate_pair(
            test_data,
            synthetic_data,
            cat_columns
        )
        pair_scores.append(score)

    return -np.mean(pair_scores) #Возращаем *(-1) для минимизации

In [14]:
def optimize_dataset_genetic(dataset_name, dataset_info, param_space,
                           population_size=20, n_generations=10,
                           mutation_prob=0.2, crossover_prob=1.0,
                           tournament_size=3, random_seed=None, verbose=True):
    """
    Универсальная оптимизация датасета с использованием генетического алгоритма

    Args:
        dataset_name: название датасета
        dataset: информация о датасете
        param_space: пространство поиска параметров
        population_size: размер популяции
        n_generations: количество поколений
        mutation_prob: вероятность мутации
        crossover_prob: вероятность скрещивания
        tournament_size: размер турнира
        random_seed: seed для воспроизводимости
        verbose: вывод прогресса
    """
    print(f"Генетическая оптимизация для датасета: {dataset_name}")

    # Создание оптимизатора
    optimizer = UniversalGeneticOptimizer(
        param_space=param_space,
        population_size=population_size,
        n_generations=n_generations,
        mutation_prob=mutation_prob,
        crossover_prob=crossover_prob,
        tournament_size=tournament_size,
        random_seed=random_seed,
        verbose=verbose
    )

    # Определение целевой функции
    def objective(params):
        if verbose:
            print(f"Оценка параметров: {optimizer._format_params(params)}")

        pair_score = generate_and_evaluate_pair_scores(params, dataset_info)

        if verbose:
            print(f"PAIR score: {-pair_score}")

        return pair_score

    optimizer.set_objective_function(objective)

    # Запуск оптимизации
    start_time = time.time()
    results = optimizer.optimize()
    end_time = time.time()  # время окончания

    elapsed_time = end_time - start_time
    formatted_time = str(timedelta(seconds=int(elapsed_time)))

    print(f"{'='*60}")
    print("РЕЗУЛЬТАТЫ ОПТИМИЗАЦИИ:")
    print(f"Лучший PAIR для {dataset_name}: {-results['best_pair']}")
    print(f"Лучшие параметры: {results['best_params']}")
    print(f"[INFO] Время работы алгоритма: {formatted_time}")
    print(f"{'='*60}")

    return results

In [17]:
results = {}
N_GENERATIONS = 4 # без учета начальной популяции
POPULATION_SIZE = 10
MUTATION_PROB = 0.4
TOURNAMENT_SIZE = 3


for dataset_name in datasets:
    data = datasets[dataset_name]
    results[dataset_name] = optimize_dataset_genetic(
        dataset_name=dataset_name,
        dataset_info=datasets[dataset_name],
        param_space=ctgan_space,  # или ctgan_space, или любое другое
        population_size=POPULATION_SIZE,
        n_generations=N_GENERATIONS,
        mutation_prob=MUTATION_PROB,
        tournament_size=TOURNAMENT_SIZE,
        random_seed=RANDOM_SEED,
        verbose=True,
    )

Универсальная генетическая оптимизация для датасета: insurance
Пространство параметров:
  discriminator_lr: непрерывный (0.0004, 0.0021), квантование: 5e-05
  generator_lr: непрерывный (5e-05, 0.005), квантование: 5e-05
  batch_size: дискретный [100, 500, 1000]
  embedding_dim: дискретный [32, 128]
  generator_dim: дискретный [[128, 128, 128], [128, 128, 128, 128]]
  discriminator_dim: дискретный [[256, 256], [256, 256, 256]]
  generator_decay: непрерывный (1e-06, 6.4e-06), квантование: 1e-07
  discriminator_decay: непрерывный (1e-06, 8e-06), квантование: 1e-06
  log_frequency: дискретный [False, True]
  transformation_num_type: дискретный ['CDF', 'PLE_CDF']
  transformation_cat_type: дискретный ['OHE']
Размер популяции: 10
Количество поколений: 4
Вероятность мутации: 0.4
Вероятность скрещивания: 1.0
Оценка параметров: {'discriminator_lr': '0.00075', 'generator_lr': '0.004', 'batch_size': 1000, 'embedding_dim': 128, 'generator_dim': [128, 128, 128], 'discriminator_dim': [256, 256], 'ge

In [18]:
for dataset_name in datasets:
    print(f"{dataset_name}: {-results[dataset_name]['best_pair']}")

insurance: 0.9255184478937171
my_regression: 0.9136558784355948
two_moons: 0.9761737723601446
