In [2]:
db1_path='./nichego/baza1.sqlite3'
db2_path='./nichego/baza2.sqlite3'
output_db_path = 'result_database.sqlite3'


In [3]:
def deduplicate_merged_dfs(merged_dfs, pk_fk_info):
    """
    Удаляет дубликаты записей в каждой таблице:
    - записи считаются дубликатами, если совпадают все столбцы кроме PK
    - оставляет первую запись
    - обновляет foreign key в других таблицах, чтобы вместо удалённых ключей ссылались на оставшиеся

    Возвращает новый словарь датафреймов с устранёнными дубликатами и исправленными FK.
    """
    dfs = {table: df.copy() for table, df in merged_dfs.items()}

    for table, df in dfs.items():
        info = pk_fk_info.get(table, {})
        pk_col = info.get('pk')
        if pk_col is None:
            continue

        # Колонки для сравнения (все, кроме PK)
        compare_cols = [col for col in df.columns if col != pk_col]
        if not compare_cols:
            # Все столбцы — PK, пропускаем
            continue

        # Найдём дубликаты по значимым колонкам, сохранаем первую встреченную
        # keep='first' — оставляет первую, остальные отмечает.
        duplicate_mask = df.duplicated(subset=compare_cols, keep='first')
        if not duplicate_mask.any():
            continue

        # build map: копии PK -> оригинальный PK (первый встретившийся)
        original_pks_map = {}

        # для удобства сгруппируем по значимым колонкам
        grouped = df.groupby(compare_cols)

        for group_vals, group_df in grouped:
            # если в группе несколько записей - значит есть дубликаты
            if len(group_df) > 1:
                original_pk = group_df.iloc[0][pk_col]
                duplicate_pks = group_df.iloc[1:][pk_col].tolist()
                for dup_pk in duplicate_pks:
                    original_pks_map[dup_pk] = original_pk

        # Удаляем дубликаты — оставляем только первую запись на группу
        dfs[table] = df[~duplicate_mask].copy()

        # Для всех foreign keys, которые ссылаются на эту таблицу,
        # необходимо заменить FK значения dup_pk на original_pk
        for other_table, other_df in dfs.items():
            other_info = pk_fk_info.get(other_table, {})
            other_fks = other_info.get('fks', {})
            for fk_col, ref_table in other_fks.items():
                if ref_table == table:
                    # Обновим внешние ключи
                    # map заменит дубликаты на оригиналы, остальные оставит без изменений
                    dfs[other_table][fk_col] = other_df[fk_col].map(lambda x: original_pks_map.get(x, x))

    return dfs


In [7]:
import sqlite3
import pandas as pd
import base64
import os
import secrets

def load_dbs_as_dfs(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Получаем список таблиц
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [row[0] for row in cursor.fetchall()]

    dfs = {}
    for table in tables:
        dfs[table] = pd.read_sql_query(f"SELECT * FROM {table}", conn)

    conn.close()
    return dfs

def generate_256bit_base64():
    rand_bytes = secrets.token_bytes(32)  # 256 бит = 32 байта
    return base64.urlsafe_b64encode(rand_bytes).rstrip(b'=').decode('ascii')

def replace_keys_base64(dfs, pk_fk_info):
    """
    dfs: dict таблица -> dataframe
    pk_fk_info: dict с описанием, где первичные ключи и внешние ключи, вида:
      {
        'table_name': {
           'pk': 'primary_key_column_name',
           'fks': { 'fk_column_name': 'referenced_table' }
        },
        ...
      }

    Возвращает новый словарь датафреймов с заменёнными ключами в Base64.
    """
    # Словарь для хранения маппинга старых pk -> новых base64 ключей для каждой таблицы
    pk_maps = {}

    dfs_new = {}

    # Сначала для каждой таблицы заменим PK на base64 значения и запомним сопоставление
    for table, df in dfs.items():
        info = pk_fk_info.get(table, {})
        pk_col = info.get('pk')
        if pk_col is None:
            # Нет PK — просто копируем таблицу без изменений
            dfs_new[table] = df.copy()
            continue

        # Создаём мапинг
        old_pks = df[pk_col].tolist()
        new_pks = [generate_256bit_base64() for _ in old_pks]

        pk_map = dict(zip(old_pks, new_pks))
        pk_maps[table] = pk_map

        df_copy = df.copy()
        df_copy[pk_col] = df_copy[pk_col].map(pk_map)
        dfs_new[table] = df_copy

    # Теперь обновим внешние ключи, если они есть
    for table, df in dfs_new.items():
        info = pk_fk_info.get(table, {})
        fks = info.get('fks', {})
        if not fks:
            continue

        df_copy = df.copy()

        for fk_col, ref_table in fks.items():
            if fk_col not in df_copy.columns:
                continue
            if ref_table not in pk_maps:
                continue
            fk_map = pk_maps[ref_table]
            # Для значений внешнего ключа, которые есть в fk_map, меняем, а если нет — оставляем как есть
            df_copy[fk_col] = df_copy[fk_col].map(lambda x: fk_map.get(x, x))
        dfs_new[table] = df_copy

    return dfs_new

def merge_dfs(dfs1, dfs2):
    """
    Объединяем две словаря таблиц (dataframe) по ключу (названию таблиц)
    """
    merged = {}
    all_tables = set(dfs1.keys()).union(dfs2.keys())
    for table in all_tables:
        df1 = dfs1.get(table)
        df2 = dfs2.get(table)
        if df1 is not None and df2 is not None:
            merged_df = pd.concat([df1, df2], ignore_index=True)
        elif df1 is not None:
            merged_df = df1.copy()
        else:
            merged_df = df2.copy()
        merged[table] = merged_df
    return merged

def replace_base64_with_autoinc(dfs, pk_fk_info):
    """
    Заменяет base64 ключи в объединённом наборе dfs на autoincrement int ключи,
    поправив внешние ключи.

    Возвращает новый набор dfs.
    """
    pk_maps = {}  # table_name -> { old_base64_key: new_int_key }

    dfs_new = {}

    # Сначала для каждой таблицы создаём маппинг base64 pk -> int pk
    for table, df in dfs.items():
        info = pk_fk_info.get(table, {})
        pk_col = info.get('pk')
        if pk_col is None:
            dfs_new[table] = df.copy()
            continue

        old_pks = df[pk_col].tolist()
        # Создаем инт айдишки, нам надо уникальные:
        unique_old_pks = pd.Series(old_pks).unique()
        pk_map = dict(zip(unique_old_pks, range(1, len(unique_old_pks) + 1)))
        pk_maps[table] = pk_map

        # Заменяем pk в df
        df_copy = df.copy()
        df_copy[pk_col] = df_copy[pk_col].map(pk_map)
        dfs_new[table] = df_copy

    # Обновляем foreign key
    for table, df in dfs_new.items():
        info = pk_fk_info.get(table, {})
        fks = info.get('fks', {})
        if not fks:
            continue
        df_copy = df.copy()
        for fk_col, ref_table in fks.items():
            if fk_col not in df_copy.columns:
                continue
            if ref_table not in pk_maps:
                continue
            fk_map = pk_maps[ref_table]
            df_copy[fk_col] = df_copy[fk_col].map(lambda x: fk_map.get(x, x))
        dfs_new[table] = df_copy

    return dfs_new

def write_dfs_to_sqlite(dfs, pk_fk_info, output_db_path):
    """
    Записывает датафреймы в SQLite файл

    При этом для таблиц с PK ставит INTEGER PRIMARY KEY AUTOINCREMENT,
    для других просто создаёт таблицы.

    !! Внимание !! 
    Здесь упрощённая логика создания таблиц — для реальной сложной схемы 
    стоит использовать инспектор схемы или ORM

    Принимает:
      - dfs: dict table->DataFrame
      - pk_fk_info: dict с info по key-ам
      - output_db_path: файл для записи
    """
    if os.path.exists(output_db_path):
        os.remove(output_db_path)

    conn = sqlite3.connect(output_db_path)
    cursor = conn.cursor()

    for table, df in dfs.items():
        info = pk_fk_info.get(table, {})
        pk_col = info.get('pk')
        fks = info.get('fks', {})

        # Формируем схему
        columns = df.columns.tolist()
        col_defs = []

        for col in columns:
            dtype = df[col].dtype
            if pd.api.types.is_integer_dtype(dtype):
                col_type = 'INTEGER'
            elif pd.api.types.is_float_dtype(dtype):
                col_type = 'REAL'
            else:
                col_type = 'TEXT'

            if pk_col == col:
                col_def = f"{col} INTEGER PRIMARY KEY AUTOINCREMENT"
            else:
                col_def = col + ' ' + col_type
            col_defs.append(col_def)

        sql_create = f"CREATE TABLE {table} ({', '.join(col_defs)}"

        # Добавляем внешние ключи (если есть)
        if fks:
            fk_defs = []
            for fk_col, ref_table in fks.items():
                pk_ref_col = pk_fk_info[ref_table]['pk']
                fk_defs.append(f"FOREIGN KEY ({fk_col}) REFERENCES {ref_table}({pk_ref_col})")
            if fk_defs:
                sql_create += ', ' + ', '.join(fk_defs)

        sql_create += ');'

        cursor.execute(sql_create)

        # Вставляем данные
        placeholders = ', '.join(['?' for _ in columns])
        insert_sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"

        values = df.where(pd.notnull(df), None).values.tolist()
        cursor.executemany(insert_sql, values)

    conn.commit()
    conn.close()

def merge_sqlite_dbs_with_key_replacement(db1_path, db2_path, output_db_path, pk_fk_info):
    """
    Основная функция:
    1) Загружает обе базы в DF
    2) Заменяет PK в обеих базах на 256бит base64 ключи с обновлением FK
    3) Объединяет DF по таблицам
    4) Заменяет base64 ключи на int PK с обновлением FK
    5) Записывает в SQLite

    pk_fk_info: структура с info по первичным и внешним ключам

    Пример pk_fk_info:
    {
       'authors': {
           'pk': 'author_id',
           'fks': {}
       },
       'books': {
           'pk': 'book_id',
           'fks': {'author_id': 'authors'}
       }
    }
    """

    print("Загрузка баз...")
    dfs1 = load_dbs_as_dfs(db1_path)
    dfs2 = load_dbs_as_dfs(db2_path)

    print("Заменяем PK на base64 в первой базе...")
    dfs1_b64 = replace_keys_base64(dfs1, pk_fk_info)

    print("Заменяем PK на base64 во второй базе...")
    dfs2_b64 = replace_keys_base64(dfs2, pk_fk_info)

    print("Объединяем датафреймы...")
    merged_dfs = merge_dfs(dfs1_b64, dfs2_b64)

    print("Удаляем дубликаты с корректировкой foreign keys...")
    merged_deduped_dfs = deduplicate_merged_dfs(merged_dfs, pk_fk_info)

    print("Заменяем base64 PK на int PK с автоинкрементом...")
    final_dfs = replace_base64_with_autoinc(merged_deduped_dfs, pk_fk_info)

    print("Записываем в SQLite...")
    write_dfs_to_sqlite(final_dfs, pk_fk_info, output_db_path)

    print(f"Обработка завершена. Итоговая база: {output_db_path}")


# --- ПРИМЕР ВЫЗОВА ---
# Необходимо задать pk_fk_info для вашей схемы БД (пример ниже)
# pk_fk_info = {
#   'table1': {'pk': 'id', 'fks': {'parent_id': 'table1'}},
#   'table2': {'pk': 'id', 'fks': {'table1_id': 'table1'}},
# }

# merge_sqlite_dbs_with_key_replacement('db1.sqlite', 'db2.sqlite', 'merged.sqlite', pk_fk_info)


In [8]:
pk_fk_info = {
    'animal_docs': {
        'pk': 'pk',
        'fks': {
            'animal': 'animals_simple',
            'cattle': 'cattle'
        }
    },
    'animals_simple': {
        'pk': 'pk',
        'fks': {
            'belongs_to_household': 'household'
        }
    },
    'cattle': {
        'pk': 'pk',
        'fks': {
            'belongs_to_household': 'household'
        }
    },
    'city': {
        'pk': 'pk',
        'fks': {
            'belongs_to_settlement': 'settlement'
        }
    },
    'conducted_tests': {
        'pk': 'pk',
        'fks': {
            'animal': 'cattle'
        }
    },
    'household': {
        'pk': 'pk',
        'fks': {
            'belongs_to_city': 'city'
        }
    },
    'report_entries': {
        'pk': 'pk',
        'fks': {
            'belongs_to_report': 'reports',
            'household': 'household'
        }
    },
    'reports': {
        'pk': 'pk',
        'fks': {
            'city': 'city'
        }
    },
    'settlement': {
        'pk': 'pk',
        'fks': {}
    }
}


In [10]:
# PRIMER BLIN
pk_fk_info = {
    'gay_clubs': {
        'pk': 'pk',
        'fks': {}
    },
    'porno': {
        'pk': 'pk',
        'fks': {
            'gay_club': 'gay_clubs'
        }
    }
}

merge_sqlite_dbs_with_key_replacement(db1_path, db2_path, output_db_path, pk_fk_info)

Загрузка баз...
Заменяем PK на base64 в первой базе...
Заменяем PK на base64 во второй базе...
Объединяем датафреймы...
Удаляем дубликаты с корректировкой foreign keys...
Заменяем base64 PK на int PK с автоинкрементом...
Записываем в SQLite...
Обработка завершена. Итоговая база: result_database.sqlite3


  for group_vals, group_df in grouped:


In [11]:
#
#
#
# ДАЛЬШЕ ИДЁТ АВТОГЕНЕРАТОР pk_fk_info ПРИ УСЛОВИИ ЧТО ИМЕНА В ТАБЛИЦЕ ПОДЧИНЯЮТСЯ ПРАВИЛАМ АЛЬФИРЫ МЕНЛИГУЛОВНЫ ИЛИ+- С ПРЕФИКСОМ И ПОСТФИКСОМ
#
#
#

In [15]:
import sqlite3
import os
import json

def generate_pk_fk_info_from_db(db_path, output_path, fk_prefix, fk_postfix):
    
    """
    Генерирует pk_fk_info из схемы SQLite базы данных,
    где foreign key колонки по шаблону: prefix + имя_таблицы_на_которую_ссылается + postfix

    Args:
        db_path (str): путь к SQLite базе
        output_path (str): путь к файлу для сохранения pk_fk_info
        fk_prefix (str): префикс для FK-колонок
        fk_postfix (str): постфикс для FK-колонок

    Результат:
        Распечатывает pk_fk_info и сохраняет в файл output_path
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Получаем все таблицы
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = [row[0] for row in cursor.fetchall()]

    pk_fk_info = {}

    for table in tables:
        # Получаем info о колонках - pragma table_info
        cursor.execute(f"PRAGMA table_info({table})")
        columns_info = cursor.fetchall()  # (cid, name, type, notnull, dflt_value, pk)

        # Определяем PK - предполагаем, что PK одна колонка с pk>0
        pk_cols = [col[1] for col in columns_info if col[5] > 0]
        pk = pk_cols[0] if pk_cols else None  # None, если нет PK

        # Подготовим словарь для FK колонок
        fk_dict = {}

        # Теперь пытаемся понять, на какие таблицы ссылаются FK колонки по шаблону имени:
        # FK-колонки имеют имя: fk_prefix + referenced_table + fk_postfix

        # Перебираем все таблицы как возможные referenced_table
        for ref_table in tables:
            expected_fk_col = f"{fk_prefix}{ref_table}{fk_postfix}"
            # Если такая колонка есть в текущей таблице - регистрируем FK
            col_names = [col[1] for col in columns_info]
            if expected_fk_col in col_names:
                fk_dict[expected_fk_col] = ref_table

        pk_fk_info[table] = {
            'pk': pk,
            'fks': fk_dict
        }

    conn.close()

    # Выведем результат красиво (JSON-подобном формате)
    import pprint
    print("Сгенерированный pk_fk_info:")
    pprint.pprint(pk_fk_info)
    output_path = output_path + " pk_fk_info.txt"
    # Запишем в файл
    with open(output_path, 'w', encoding='utf-8') as f_out:
        # Можно сохранить красиво в json с отступами
        import json
        json.dump(pk_fk_info, f_out, indent=4, ensure_ascii=False)

    print(f"\npk_fk_info сохранён в файл: {output_path}")


In [16]:
db_file = db1_path
output_file = db_file

generate_pk_fk_info_from_db(db_file, output_file, fk_prefix='', fk_postfix='')


Сгенерированный pk_fk_info:
{'gay_clubs': {'fks': {}, 'pk': 'pk'},
 'porno': {'fks': {'gay_clubs': 'gay_clubs'}, 'pk': 'pk'}}

pk_fk_info сохранён в файл: ./nichego/baza1.sqlite3 pk_fk_info.txt
