In [8]:
from json_profile_generator import generate_profile
profile = generate_profile("data/raw/part1.json.trimmed.json") # передаем путь в json

In [9]:
prompt = """

Следующий профиль в формате json задает собой реляционную структуру в БД. Твоя задача дать названия бизнес-сущностям и всем полям.

Профиль:

""" + str(profile) + """
Ответ выдай строго чистым JSON без комментариев ни до не после. Структура json ответа строго такая:

{
  "entity_names": {
    "[<entity_path>]": "<new_entity_name>"
  },
  "entity_descriptions": {
    "[<entity_path>]": "<one-sentence-what-this-entity-represents>"
  },
  "column_names": {
    "[<entity_path>]::[<column_path>]": "<new_column_name>"
  },
  "column_descriptions": {
    "[<entity_path>]::[<column_path>]": "<short-meaning-and-units-if-any>"
  },
  "notes": "<optional-freeform-notes-about-naming-decisions>"
}


"""

In [10]:
import json 
from validate_rename_patch import validate_rename_patch


# patch - это  ответ от LLM - Просим нам LLM правильно назвать сущности и поля бизнесово.

with open("rename1.json", "r", encoding="utf-8") as f:
    patch = json.load(f)

result = validate_rename_patch(profile, patch)  # критик - он уже сам скажет какие ошибки в ответе и если все норм, то SUCCESS
print(result)

SUCCESS


In [11]:
from final_profile import build_final_profile_from_files

final_prof = build_final_profile_from_files("profile.json", "rename1.json")
# дальше можете сериализовать, генерировать DDL/DBML, и пр.
# import json; print(json.dumps(final_prof, ensure_ascii=False, indent=2))

In [12]:
final_prof

{'version': '1.0',
 'root': '__root__',
 'entities': [{'name': 'IndividualEntrepreneur',
   'path': [],
   'depth': 0,
   'parent': None,
   'primary_key': ['rec_id'],
   'columns': [{'name': 'CitizenshipTypeCode',
     'path': ['citizenship_kind'],
     'type': 'string',
     'nullable': True,
     'examples': ['1', '2', '1'],
     'types_seen': ['string'],
     'description': 'Код типа гражданства (например, гражданин РФ, иностранец и т.п.).'},
    {'name': 'CitizenshipCountry',
     'path': ['citizenship_name'],
     'type': 'string',
     'nullable': True,
     'examples': ['ТАДЖИКИСТАН', 'УЗБЕКИСТАН', 'ТАДЖИКИСТАН'],
     'types_seen': ['string'],
     'description': 'Страна гражданства предпринимателя.'},
    {'name': 'EntrepreneurFormCode',
     'path': ['code_form_ind_entrep'],
     'type': 'string',
     'nullable': True,
     'examples': ['1', '1', '1'],
     'types_seen': ['string'],
     'description': 'Код организационно-правовой формы ИП.'},
    {'name': 'RecordProcessing

In [13]:
from ddl_postgres import emit_ddl_pg
from ddl_clickhouse import emit_ddl_ch

# profile — это итоговый профиль (после применения патча)
ddl_pg = emit_ddl_pg(final_prof, types_yaml_path="config/types.yaml")
ddl_ch = emit_ddl_ch(final_prof, types_yaml_path="config/types.yaml", database="mydb")  # database опционально

print(ddl_pg)
print(ddl_ch)

CREATE TABLE IF NOT EXISTS "IndividualEntrepreneur" (
  "rec_id" BIGINT NOT NULL,
  "CitizenshipTypeCode" text,
  "CitizenshipCountry" text,
  "EntrepreneurFormCode" text,
  "RecordProcessingDate" date,
  "OGRNIPAssignmentDate" date,
  "DateOfBirth" date,
  "Email" text,
  "ErrorCode" integer,
  "NationalIDCard" text,
  "ForeignCitizenID" text,
  "RegistrationAuthorityCode" text,
  "RegistrationAuthorityName" text,
  "PrimaryOKVEDCode" text,
  "PrimaryOKVEDName" text,
  "InitialRegistrationDate" date,
  "InitialRegistrationNumber" text,
  "TaxAuthorityName" text,
  "StatusCode" text,
  "StatusDescription" text,
  "TerminationReasonCode" text,
  "TerminationReasonDescription" text,
  "FirstName" text,
  "Patronymic" text,
  "GenderCode" text,
  "LastName" text,
  "BirthCertificateFirstName" text,
  "BirthCertificatePatronymic" text,
  "BirthCertificateLastName" text,
  "INN" text,
  "PensionFundNumber" text,
  "SocialInsuranceNumber" text,
  "EntrepreneurFormName" text,
  "OGRNIP" text,

# Создадим DBML

In [14]:
import json
from dbml_from_profile import emit_dbml, save_dbml

dbml_text = emit_dbml(final_prof, project_database_type="Generic")
print(dbml_text[:500], "...")

# или сохранить в файл
save_dbml(final_prof, "schema.dbml", project_database_type="Generic")

Project {
  database_type: 'Generic'
}

Table "IndividualEntrepreneur" {
  "rec_id" bigint [pk, not null]
  "CitizenshipTypeCode" text [null, note: 'Код типа гражданства (например, гражданин РФ, иностранец и т.п.).']
  "CitizenshipCountry" text [null, note: 'Страна гражданства предпринимателя.']
  "EntrepreneurFormCode" text [null, note: 'Код организационно-правовой формы ИП.']
  "RecordProcessingDate" date [null, note: 'Дата формирования или обработки записи.']
  "OGRNIPAssignmentDate" date [nu ...


# Итератор для ETL в будущем

In [15]:
from row_iterator import iter_rows, get_table_columns


import json

with open("data/raw/part1.json.trimmed.json", "r", encoding="utf-8") as f:
    records = json.load(f)  # список dict'ов

# records: это iterable JSON-объектов (например, list(json.load(f)))
cols = get_table_columns(final_prof)

for table, row in iter_rows(final_prof, records):
    # row содержит PK и все данные по колонкам этого table
    # Можно буферизовать и грузить батчами:
    #   - PostgreSQL: COPY table (cols[table]) FROM STDIN
    #   - ClickHouse: INSERT INTO table FORMAT JSONEachRow (нужно сериализовать row в JSON)
    pass


# Удалим все таблицы для начала

In [16]:
import json
from ch_reset import drop_ch_tables_for_profile, recreate_and_load_ch
http = "http://127.0.0.1:8123"
db = "analytics"

drop_ch_tables_for_profile(http, final_prof, db)
# recreate_and_load_ch(http, final_prof, records, database=db)


In [17]:
import psycopg2, json
from pg_reset import drop_pg_tables_for_profile, recreate_and_load_pg

conn = psycopg2.connect("postgresql://postgres:postgres@localhost:5432/analytics")
drop_pg_tables_for_profile(conn, final_prof, schema="public")
# recreate_and_load_pg(conn, final_prof, records, schema="public")
conn.close()


# Загрузка в Постгрес

In [18]:
import json, psycopg2
from ddl_postgres import emit_ddl_pg
from load_postgres import copy_into_pg

In [19]:
with open("data/raw/part1.json.trimmed.json", "r", encoding="utf-8") as f:
    records = json.load(f)

In [20]:
conn = psycopg2.connect("postgresql://postgres:postgres@localhost:5432/analytics")

In [21]:
ddl = emit_ddl_pg(final_prof, types_yaml_path="config/types.yaml")
with conn.cursor() as cur:
    cur.execute("SET client_min_messages = WARNING")
    # psycopg2 нормально выполняет несколько CREATE в одном execute,
    # но если вдруг драйвер ругнётся — можно разбить по ';'
    cur.execute(ddl)
conn.commit()

In [22]:
copy_into_pg(conn, final_prof, records, schema="public", batch_size=50_000)
conn.close()

In [23]:
import psycopg2, json
from pg_introspect import (
    table_exists, list_tables, describe_columns,
    primary_key, foreign_keys, row_count,
    validate_schema_against_profile
)

# подключение
conn = psycopg2.connect("postgresql://postgres:postgres@localhost:5432/analytics")

print(list_tables(conn, "public"))
print(table_exists(conn, "public", "IndividualEntrepreneur"))          # True/False
print(primary_key(conn, "public", "IndividualEntrepreneur"))           # ['rec_id']
print(describe_columns(conn, "public", "IndividualEntrepreneur")[:5])  # первые 5 колонок
print(foreign_keys(conn, "public", "EntrepreneurOKVEDOptional"))       # FK -> parent
print(row_count(conn, "public", "IndividualEntrepreneur"))             # количество строк
print(row_count(conn, "public", "EntrepreneurOKVEDOptional"))

print(validate_schema_against_profile(conn, final_prof, schema="public"))

conn.close()


['EntrepreneurOKVEDOptional', 'IndividualEntrepreneur', 'individual_entrepreneur', 'okved_main', 'okved_optional']
True
['rec_id']
[{'name': 'rec_id', 'nullable': False, 'data_type': 'bigint', 'udt_name': 'int8', 'char_len': None, 'numeric_precision': 64, 'numeric_scale': 0}, {'name': 'CitizenshipTypeCode', 'nullable': True, 'data_type': 'text', 'udt_name': 'text', 'char_len': None, 'numeric_precision': None, 'numeric_scale': None}, {'name': 'CitizenshipCountry', 'nullable': True, 'data_type': 'text', 'udt_name': 'text', 'char_len': None, 'numeric_precision': None, 'numeric_scale': None}, {'name': 'EntrepreneurFormCode', 'nullable': True, 'data_type': 'text', 'udt_name': 'text', 'char_len': None, 'numeric_precision': None, 'numeric_scale': None}, {'name': 'RecordProcessingDate', 'nullable': True, 'data_type': 'date', 'udt_name': 'date', 'char_len': None, 'numeric_precision': None, 'numeric_scale': None}]
[{'name': 'fk_EntrepreneurOKVEDOptional_to_IndividualEntrepreneur', 'columns': ['r

# Загрузка в КЛикхаус

In [None]:
from ddl_clickhouse import emit_ddl_ch
from ch_exec import ch_exec_many

ddl_ch = emit_ddl_ch(final_prof, types_yaml_path="config/types.yaml", database="analytics")

# c хоста:
ch_exec_many("http://localhost:8123", ddl_ch, database="analytics")

# если вызываешь из контейнера Airflow — меняй хост:
# ch_exec_many("http://clickhouse:8123", ddl_ch, database="analytics")


In [None]:
from load_clickhouse import insert_into_ch

insert_into_ch(
    "http://127.0.0.1:8123",
    final_prof,
    records,
    database="analytics",     # таблицы уже созданы в analytics
    batch_size=100_000,
    cast=True,                # важный момент из-за timestamp64
    trust_env=False           # игнорируем системные HTTP(S)_PROXY
)



In [None]:
import json
from ch_introspect import (
    ch_ping, list_tables, table_exists, describe_columns,
    table_engine_and_keys, row_count, validate_schema_against_profile_ch
)

http = "http://127.0.0.1:8123"
db = "analytics"

# 0) проверка соединения
ch_ping(http)

# 1) список таблиц
print(list_tables(http, db))

# 2) существует ли таблица
print(table_exists(http, db, "IndividualEntrepreneur"))

# 3) описание колонок
print(describe_columns(http, db, "IndividualEntrepreneur")[:5])

# 4) ключи / движок
print(table_engine_and_keys(http, db, "EntrepreneurOKVEDOptional"))

# 5) количество строк
print(row_count(http, "analytics", "IndividualEntrepreneur"))
print(row_count(http, "analytics", "EntrepreneurOKVEDOptional"))


# 6) валидация против итогового профиля
print(validate_schema_against_profile_ch(http, final_prof, db, types_yaml_path="config/types.yaml"))
