## Разбор ДЗ

1) У нас есть master и slave (week02)

2) Как проверить репликацию:
    - На master `select * from pg_stat_replication;`
    - На replica `select * from pg_stat_wal_receiver;`

3) Хотим init из файла
```sql
  CREATE TABLE manufacturers (
    manufacturer_id SERIAL PRIMARY KEY,
    manufacturer_name VARCHAR(100) NOT NULL,
    manufacturer_legal_entity VARCHAR(100) NOT NULL
  );

  CREATE TABLE categories (
    category_id SERIAL PRIMARY KEY,
    category_name VARCHAR(100) NOT NULL
  );

  CREATE TABLE products (
    category_id BIGINT NOT NULL,
    manufacturer_id BIGINT NOT NULL,
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(255) NOT NULL,
    product_picture_url VARCHAR(255) NOT NULL,
    product_description VARCHAR(255) NOT NULL,
    product_age_restriction INT NOT NULL,
    CONSTRAINT category_fk FOREIGN KEY (category_id) REFERENCES categories (category_id),
    CONSTRAINT manufacturer_fk FOREIGN KEY (manufacturer_id) REFERENCES manufacturers (manufacturer_id)
  );

  CREATE TABLE stores (
    store_id SERIAL PRIMARY KEY,
    store_name VARCHAR(100) NOT NULL,
    store_country VARCHAR(255) NOT NULL,
    store_city VARCHAR(255) NOT NULL,
    store_address VARCHAR(255) NOT NULL
  );

  CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    customer_fname VARCHAR(100) NOT NULL,
    customer_lname VARCHAR(100) NOT NULL,
    customer_gender VARCHAR(100) NOT NULL,
    customer_phone VARCHAR(100) NOT NULL
  );

  CREATE TABLE price_change (
    product_id BIGINT NOT NULL,
    price_change_ts TIMESTAMP NOT NULL,
    new_price BIGINT NOT NULL,
    CONSTRAINT product_fk FOREIGN KEY (product_id) REFERENCES products (product_id),
    PRIMARY KEY (product_id, price_change_ts)
  );

  CREATE TABLE deliveries (
    delivery_id BIGINT PRIMARY KEY,
    store_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    delivery_date DATE NOT NULL,
    product_count INTEGER NOT NULL,
    CONSTRAINT store_fk FOREIGN KEY (store_id) REFERENCES stores (store_id),
    CONSTRAINT product_fk FOREIGN KEY (product_id) REFERENCES products (product_id)
  );

  CREATE TABLE purchases (
    store_id BIGINT NOT NULL,
    customer_id BIGINT NOT NULL,
    purchase_id SERIAL PRIMARY KEY,
    purchase_date TIMESTAMP NOT NULL,
    purchase_payment_type VARCHAR(100) NOT NULL,
    CONSTRAINT store_fk FOREIGN KEY (store_id) REFERENCES stores (store_id),
    CONSTRAINT customer_fk FOREIGN KEY (customer_id) REFERENCES customers (customer_id)
  );

  CREATE TABLE purchase_items (
    product_id BIGINT NOT NULL,
    purchase_id BIGINT NOT NULL,
    product_count BIGINT NOT NULL,
    product_price BIGINT NOT NULL,
    CONSTRAINT product_fk FOREIGN KEY (product_id) REFERENCES products (product_id),
    CONSTRAINT purchase_fk FOREIGN KEY (purchase_id) REFERENCES purchases (purchase_id),
    PRIMARY KEY (product_id, purchase_id)
  );
```
<br>
Для этого в docker-compose

```yml
  volumes:
    - ./db-init.sql:/docker-entrypoint-initdb.d/db-init.sql
```

4)
```sql
  CREATE VIEW gmv as (
    SELECT
      pu.store_id,
      pr.category_id,
      SUM(pui.product_count * pui.product_price) AS sales_sum
    FROM purchases pu
    JOIN purchase_items pui
      ON pu.purchase_id = pui.purchase_id
    JOIN products pr
      ON pui.product_id = pr.product_id
    GROUP BY
      pu.store_id,
      pr.category_id
  );
```
<br>

Для этого в docker-compose
```yml
  volumes:
    - ./db-init.sql:/docker-entrypoint-initdb.d/db-init.sql
    - ./view-init.sql:/docker-entrypoint-initdb.d/view-init.sql
```

---

5) Поднять debezium (week05)

```yml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: echo srvr | nc zookeeper 2181 || exit 1
      start_period: 10s
      retries: 20
      interval: 10s

  broker:
    image: confluentinc/cp-kafka:7.3.1
    hostname: broker
    container_name: broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - '9092:9092'
      - '9101:9101'
      - '29092:29092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    healthcheck:
      test: nc -z localhost 9092 || exit -1
      start_period: 15s
      interval: 5s
      timeout: 10s
      retries: 10
      
  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.3.1
    hostname: rest-proxy
    container_name: rest-proxy
    depends_on:
      broker:
        condition: service_healthy
    ports:
      - '8082:8082'
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: 'http://0.0.0.0:8082'

  debezium:
    image: debezium/connect:latest
    container_name: debezium
    hostname: debezium
    depends_on:
      broker:
        condition: service_healthy
    restart: always
    ports:
      - '8083:8083'
    environment:
      BOOTSTRAP_SERVERS: broker:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      STATUS_STORAGE_TOPIC: connect_statuses
      OFFSET_STORAGE_TOPIC: connect_offsets
      KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      ENABLE_DEBEZIUM_SCRIPTING: 'true'
    healthcheck:
      test:
        [
          'CMD',
          'curl',
          '--silent',
          '--fail',
          '-X',
          'GET',
          'http://localhost:8083/connectors',
        ]
      start_period: 10s
      interval: 10s
      timeout: 5s
      retries: 5

  debezium-ui:
    image: debezium/debezium-ui:latest
    container_name: debezium-ui
    hostname: debezium-ui
    depends_on:
      debezium:
        condition: service_healthy
    restart: always
    ports:
      - '8080:8080'
    environment:
      KAFKA_CONNECT_URIS: http://debezium:8083
```

6) Создадим connection

```json
{
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    
    "database.hostname": "host.docker.internal",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "pg-dev",
    
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
      
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.retention.ms": "604800000",
    "topic.creation.enable": "true",
    "topic.prefix": "postgres"
  }
}
```

In [1]:
import requests as rq

response = rq.post(
    url='http://localhost:8083/connectors',
    json={
        "name": "pg-connector",
        "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",

        "database.hostname": "host.docker.internal",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "pg-dev",

        "table.include.list": "public.(.*)",
        "heartbeat.interval.ms": "5000",
        "slot.name": "dbname_debezium",
        "publication.name": "dbname_publication",

        "topic.creation.default.cleanup.policy": "delete",
        "topic.creation.default.partitions": "1",
        "topic.creation.default.replication.factor": "1",
        "topic.creation.default.retention.ms": "604800000",
        "topic.creation.enable": "true",
        "topic.prefix": "postgres"
        }
    }
)
response.status_code, response.text

(201,
 '{"name":"pg-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","plugin.name":"pgoutput","database.hostname":"host.docker.internal","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","database.server.name":"pg-dev","table.include.list":"public.(.*)","heartbeat.interval.ms":"5000","slot.name":"dbname_debezium","publication.name":"dbname_publication","topic.creation.default.cleanup.policy":"delete","topic.creation.default.partitions":"1","topic.creation.default.replication.factor":"1","topic.creation.default.retention.ms":"604800000","topic.creation.enable":"true","topic.prefix":"postgres","name":"pg-connector"},"tasks":[],"type":"source"}')

7) Смотрим, что все создалось http://localhost:8080

8) Создадим postgres_dwh

```yml
  postgres_dwh:
    container_name: postgres_dwh
    image: postgres:14.5
    restart: always
    ports:
      - "5434:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
```

9) Пульнем тестовые данные

```sql
INSERT INTO manufacturers (manufacturer_name, manufacturer_legal_entity) VALUES
('Manufacturer A', 'OOO A'), ('Manufacturer B', 'OAO B'), ('Manufacturer C', 'IP C');

INSERT INTO categories (category_name) VALUES
('Electronics'), ('Clothing'), ('Groceries');

INSERT INTO products (
    product_id, product_name, product_picture_url, product_description, product_age_restriction,
    category_id, manufacturer_id
) VALUES
(1, 'Smartphone', 'data:image/jpeg;base64,/9j/4AAQSk', 'lol', 12, 1, 1),
(2, 'T-Shirt', 'data:image/jpeg;base64,/9j/IJHDSUD', 'kek', 14, 2, 2),
(3, 'Bread', 'data:image/jpeg;base64,/9j/I6SSYA', 'puk', 0, 3, 3);

INSERT INTO stores (store_name, store_country, store_city, store_address) VALUES
('Store X', 'Russia', 'Moscow', '1 Chapel Hill'),
('Store Y', 'Belarus', 'Minsk', '438 DARK SPURT'),
('Store Z', 'Ukraine', 'Kiev', 'ul. Kosmonavtov 35-11');

INSERT INTO price_change (product_id, price_change_ts, new_price) VALUES
(1, '2023-10-15 12:00:00', 520.00), (2, '2023-10-15 13:00:00', 14.00);

INSERT INTO deliveries (delivery_id, store_id, product_id, delivery_date, product_count) VALUES
(1, 1, 1, '2023-10-15', 100), (2, 2, 2, '2023-10-14', 200);

INSERT INTO customers (customer_fname, customer_lname, customer_gender, customer_phone) VALUES
('John', 'Doe', 'man', '88005553535'),
('Jane', 'Smith', 'woman', '88006664548'),
('Alice', 'Johnson', 'woman', '89155673451');

INSERT INTO purchases (store_id, customer_id, purchase_date, purchase_payment_type) VALUES
(1, 1, '2023-10-15', 'cash'), (2, 2, '2023-10-15', 'card'), (3, 3, '2023-10-14', 'cash');

INSERT INTO purchase_items (product_id, purchase_id, product_count, product_price) VALUES
(1, 1, 2, 500.00), (2, 2, 3, 15.00), (3, 3, 5, 2.00);
```

In [45]:
!curl http://localhost:8082/v3/clusters

{"kind":"KafkaClusterList","metadata":{"self":"http://rest-proxy:8082/v3/clusters","next":null},"data":[{"kind":"KafkaCluster","metadata":{"self":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg","resource_name":"crn:///kafka=zjy3h7GlTPCAIcDuLZ2cGg"},"cluster_id":"zjy3h7GlTPCAIcDuLZ2cGg","controller":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/brokers/1"},"acls":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/acls"},"brokers":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/brokers"},"broker_configs":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/broker-configs"},"consumer_groups":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/consumer-groups"},"topics":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/topics"},"partition_reassignments":{"related":"http://rest-proxy:8082/v3/clusters/zjy3h7GlTPCAIcDuLZ2cGg/topics/-/partitions/-/reassignment"}}]

In [6]:
import json

with open('topics.json', 'r') as f:
    topics = json.load(f)
    for i in topics['data']:
        print(i['topic_name'])

__debezium-heartbeat.postgres
connect_configs
connect_offsets
connect_statuses
postgres.public.categories
postgres.public.customers
postgres.public.deliveries
postgres.public.manufacturers
postgres.public.price_change
postgres.public.products
postgres.public.purchase_items
postgres.public.purchases
postgres.public.stores


In [7]:
from kafka import KafkaConsumer

kafka_server = 'localhost:9092'
kafka_topics = [
    'postgres.public.categories',
    'postgres.public.customers',
    'postgres.public.deliveries',
    'postgres.public.manufacturers',
    'postgres.public.price_change',
    'postgres.public.products',
    'postgres.public.purchase_items',
    'postgres.public.purchases',
    'postgres.public.stores',
]
kafka_consumer_group = 'backend'
dwh_conn_string = 'postgresql+psycopg2://postgres:postgres@localhost:5434/postgres'

def read_single_message(topic=None):
    assert topic is not None, 'You must specify topic name'
    
    consumer = KafkaConsumer(
        bootstrap_servers=kafka_server,
        value_deserializer=lambda v: v if v is None else json.loads( v.decode("utf-8") ),
        auto_offset_reset="earliest",
        group_id=kafka_consumer_group
    )
    consumer.subscribe(topics=topic)

    try:
        for message in consumer:
            value = message.value
            return value
    except Exception as e:
            print("Closing consumer due to error\n")
            consumer.close()
            raise e
    finally:
        print("Closing consumer due to finish\n")
        consumer.close()

In [8]:
test = read_single_message('postgres.public.purchases')
test

Closing consumer due to finish



{'schema': {'type': 'struct',
  'fields': [{'type': 'struct',
    'fields': [{'type': 'int64', 'optional': False, 'field': 'store_id'},
     {'type': 'int64', 'optional': False, 'field': 'customer_id'},
     {'type': 'int32',
      'optional': False,
      'default': 0,
      'field': 'purchase_id'},
     {'type': 'int64',
      'optional': False,
      'name': 'io.debezium.time.MicroTimestamp',
      'version': 1,
      'field': 'purchase_date'},
     {'type': 'string', 'optional': False, 'field': 'purchase_payment_type'}],
    'optional': True,
    'name': 'postgres.public.purchases.Value',
    'field': 'before'},
   {'type': 'struct',
    'fields': [{'type': 'int64', 'optional': False, 'field': 'store_id'},
     {'type': 'int64', 'optional': False, 'field': 'customer_id'},
     {'type': 'int32',
      'optional': False,
      'default': 0,
      'field': 'purchase_id'},
     {'type': 'int64',
      'optional': False,
      'name': 'io.debezium.time.MicroTimestamp',
      'version': 

In [11]:
test['payload']

{'before': None,
 'after': {'store_id': 1,
  'customer_id': 1,
  'purchase_id': 1,
  'purchase_date': 1697328000000000,
  'purchase_payment_type': 'cash'},
 'source': {'version': '2.2.0.Alpha3',
  'connector': 'postgresql',
  'name': 'postgres',
  'ts_ms': 1702396316890,
  'snapshot': 'false',
  'db': 'postgres',
  'sequence': '["50856440","50856544"]',
  'schema': 'public',
  'table': 'purchases',
  'txId': 758,
  'lsn': 50856544,
  'xmin': None},
 'op': 'c',
 'ts_ms': 1702396317133,
 'transaction': None}

10) Посмотрим на stores и придуем, что с ним делать

```sql
CREATE TABLE stores (
    store_id SERIAL PRIMARY KEY,
    store_name VARCHAR(100) NOT NULL,
    store_country VARCHAR(255) NOT NULL,
    store_city VARCHAR(255) NOT NULL,
    store_address VARCHAR(255) NOT NULL
  );
```

```sql
create schema if not exists dwh_detailed;

create table if not exists dwh_detailed.anc_sourceSystem (
    sourceSystemID    VARCHAR(128) PRIMARY KEY,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_sourceSystem_name (
    sourceSystemID    VARCHAR(128) UNIQUE NOT NULL,
    name              VARCHAR(255) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.anc_store (
    storeId           VARCHAR(128) PRIMARY KEY,
    sourceSystemID    VARCHAR(128) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_store_businessKey (
    storeId           VARCHAR(128) UNIQUE NOT NULL,
    businessKey       BIGINT NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_store_isActive (
    storeId           VARCHAR(128) UNIQUE NOT NULL,
    isActive          BOOL NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_store_name (
    storeId           VARCHAR(128) UNIQUE NOT NULL,
    storeName         VARCHAR(255) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_store_address (
    storeId           VARCHAR(128) UNIQUE NOT NULL,
    storeAddress      VARCHAR(255) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_store_city (
    storeId           VARCHAR(128) UNIQUE NOT NULL,
    storeCity         VARCHAR(255) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.anc_country (
    countryId         VARCHAR(128) PRIMARY KEY,
    sourceSystemID    VARCHAR(128) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.attr_country_name (
    countryId         VARCHAR(128) UNIQUE NOT NULL,
    countryName       VARCHAR(255) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

create table if not exists dwh_detailed.tie_store_country (
    storeId           VARCHAR(128) NOT NULL,
    countryId         VARCHAR(128) NOT NULL,
    created_at        TIMESTAMP NOT NULL DEFAULT NOW()
);

ALTER TABLE dwh_detailed.tie_store_country
ADD CONSTRAINT uq_tie_store_country UNIQUE(storeId, countryId);
```

In [12]:
import hashlib
from sqlalchemy import create_engine, text

class StoreHandler:
    dwh_engine = None
    kafka_engine = None
    
    def __init__(self, dwh_conn_string, kafka_host, kafka_consumer_group, kafka_topic):
        self.dwh_engine = create_engine(dwh_conn_string)
        self.kafka_engine = KafkaConsumer(
            bootstrap_servers=kafka_host,
            value_deserializer=lambda v: v if v is None else json.loads( v.decode("utf-8") ),
            auto_offset_reset="earliest",
            group_id=kafka_consumer_group
        )
        self.kafka_engine.subscribe(topics=kafka_topic)
        
    def compute_hash(self, obj):
        return hashlib.md5(obj.encode()).hexdigest()
    
    def process_row(self, row):
        if (test['payload']['before'] is None) and (test['payload']['after'] is not None):
            # create query
            query = ''
            # 1 - source system
            source_system_name = row['payload']['source']['name']
            source_system_hash = self.compute_hash(source_system_name)
            query += f"insert into dwh_detailed.anc_sourceSystem (sourceSystemID) values ('{source_system_hash}') on conflict (sourceSystemID) do nothing;\n"
            query += f"insert into dwh_detailed.attr_sourceSystem_name (sourceSystemID, name) values ('{source_system_hash}', '{source_system_name}') on conflict (sourceSystemID) do nothing;\n"
            # 2 - store
            store_id = row['payload']['after']['store_id']
            store_hash = self.compute_hash( str(store_id) )
            store_name = row['payload']['after']['store_name']
            store_address = row['payload']['after']['store_address']
            store_city = row['payload']['after']['store_city']
            query += f"insert into dwh_detailed.anc_store (storeId, sourceSystemID) values ('{store_hash}', '{source_system_hash}') on conflict (storeId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_store_businessKey (storeId, businessKey) values ('{store_hash}', '{store_id}') on conflict (storeId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_store_name (storeId, storeName) values ('{store_hash}', '{store_name}') on conflict (storeId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_store_address (storeId, storeAddress) values ('{store_hash}', '{store_address}') on conflict (storeId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_store_city (storeId, storeCity) values ('{store_hash}', '{store_city}') on conflict (storeId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_store_isActive (storeId, isActive) values ('{store_hash}', true) on conflict (storeId) do nothing;\n"
            # 3 - country
            country_name = row['payload']['after']['store_country']
            country_hash = self.compute_hash(country_name)
            query += f"insert into dwh_detailed.anc_country (countryId, sourceSystemID) values ('{country_hash}', '{source_system_hash}') on conflict (countryId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_country_name (countryId, countryName) values ('{country_hash}', '{country_name}') on conflict (countryId) do nothing;\n"
            query += f"insert into dwh_detailed.tie_store_country (storeId, countryId) values ('{store_hash}', '{country_hash}') on conflict (storeId, countryId) do nothing;\n"
        elif (test['payload']['before'] is not None) and (test['payload']['after'] is None):
            # delete query
            query = ''
            store_id = row['payload']['before']['store_id']
            store_hash = self.compute_hash( str(store_id) )
            query += f"insert into dwh_detailed.attr_store_isActive (storeId, isActive) values ('{store_hash}', false) on conflict (storeId) do update set isActive = EXCLUDED.isActive;\n"
        else:
            # update query
            query = ''
            # 1 - source system
            source_system_name = row['payload']['source']['name']
            source_system_hash = self.compute_hash(source_system_name)
            query += f"insert into dwh_detailed.anc_sourceSystem (sourceSystemID) values ('{source_system_hash}') on conflict (sourceSystemID) do nothing;\n"
            query += f"insert into dwh_detailed.attr_sourceSystem_name (sourceSystemID, name) values ('{source_system_hash}', '{source_system_name}') on conflict (sourceSystemID) do nothing;\n"
            # 2 - store
            store_id = row['payload']['after']['store_id']
            store_hash = self.compute_hash( str(store_id) )
            store_name = row['payload']['after']['store_name']
            store_address = row['payload']['after']['store_address']
            store_city = row['payload']['after']['store_city']
            query += f"insert into dwh_detailed.attr_store_name (storeId, storeName) values ('{store_hash}', '{store_name}') on conflict (storeId) do update set storeName = EXCLUDED.storeName;\n"
            query += f"insert into dwh_detailed.attr_store_address (storeId, storeAddress) values ('{store_hash}', '{store_address}') on conflict (storeId) do update set storeAddress = EXCLUDED.storeAddress;\n"
            query += f"insert into dwh_detailed.attr_store_city (storeId, storeCity) values ('{store_hash}', '{store_city}') on conflict (storeId) do update set storeCity = EXCLUDED.storeCity;\n"
            # 3 - country
            country_name = row['payload']['after']['store_country']
            country_hash = self.compute_hash(country_name)
            query += f"insert into dwh_detailed.anc_country (countryId, sourceSystemID) values ('{country_hash}', '{source_system_hash}') on conflict (countryId) do nothing;\n"
            query += f"insert into dwh_detailed.attr_country_name (countryId, countryName) values ('{country_hash}', '{country_name}') on conflict (countryId) do nothing;\n"
            query += f"insert into dwh_detailed.tie_store_country (storeId, countryId) values ('{store_hash}', '{country_hash}') on conflict (storeId, countryId) do nothing;\n"
        
        print(query)
        with self.dwh_engine.connect() as con:
            res = con.execute(text(query))
            con.commit()
        print('---')
        
    def consume(self):
        try:
            for message in self.kafka_engine:
                print(message.value)
                print('+++')
                self.process_row(message.value)
        except Exception as e:
                print("Closing consumer due to error\n")
                self.kafka_engine.close()
                raise e
        finally:
            print("Closing consumer due to finish\n")
            self.kafka_engine.close()

In [14]:
a = StoreHandler(dwh_conn_string, kafka_server, kafka_consumer_group, 'postgres.public.stores')
a.consume()

{'schema': {'type': 'struct', 'fields': [{'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'store_id'}, {'type': 'string', 'optional': False, 'field': 'store_name'}, {'type': 'string', 'optional': False, 'field': 'store_country'}, {'type': 'string', 'optional': False, 'field': 'store_city'}, {'type': 'string', 'optional': False, 'field': 'store_address'}], 'optional': True, 'name': 'postgres.public.stores.Value', 'field': 'before'}, {'type': 'struct', 'fields': [{'type': 'int32', 'optional': False, 'default': 0, 'field': 'store_id'}, {'type': 'string', 'optional': False, 'field': 'store_name'}, {'type': 'string', 'optional': False, 'field': 'store_country'}, {'type': 'string', 'optional': False, 'field': 'store_city'}, {'type': 'string', 'optional': False, 'field': 'store_address'}], 'optional': True, 'name': 'postgres.public.stores.Value', 'field': 'after'}, {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'version'},

Closing consumer due to finish



KeyboardInterrupt: 

In [18]:
""" Добавление коннекторов Debezium. """

import requests
from typing import Dict, Any

DEBEZIUM_URL = 'http://localhost:8083/connectors'


def add_connector(connector_dict: Dict[str, Any]) -> None:
    response = requests.post(
        DEBEZIUM_URL, 
        json=connector_dict
    )
    assert response.status_code < 400, response.text


pg_connector_config = {
  "name": "pg-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    
    "database.hostname": "host.docker.internal",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "pg-dev",
    
    "table.include.list": "public.(.*)",
    "heartbeat.interval.ms": "5000",
    "slot.name": "dbname_debezium",
    "publication.name": "dbname_publication",
      
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.partitions": "1",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.retention.ms": "604800000",
    "topic.creation.enable": "true",
    "topic.prefix": "postgres"
  }
}


sink_connectors_info = {
    'postgres.public.categories': 'category_id',
    'postgres.public.customers': 'customer_id',
    'postgres.public.manufacturers': 'manufacturer_id',
    'postgres.public.deliveries': 'delivery_id',
    'postgres.public.price_change': 'product_id,price_change_ts',
    'postgres.public.products': 'product_id',
    'postgres.public.purchase_items': 'product_id,purchase_id',
    'postgres.public.purchases': 'purchase_id',
    'postgres.public.stores': 'store_id'
}


def add_pg_connector() -> None:
    add_connector(pg_connector_config)


def add_sink_connectors() -> None:
    for topic, record_keys in sink_connectors_info.items():
        add_connector(
            {
            "name": topic.replace('.', '-'),
            "config": {
                "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
                "topics": topic,
                "connection.url": "jdbc:postgresql://host.docker.internal:5434/postgres",
                "connection.username": "postgres",
                "connection.password": "postgres",
                "tasks.max":"1",
                "insert.mode": "upsert",
                "delete.enabled": "false",
                "primary.key.mode": "record_key",
                "primary.key.fields": record_keys,
                "schema.evolution": "basic"
                }
            }
        )

add_pg_connector()
add_sink_connectors()