This project aims to experiment using debezium cdc engine without relying on kafka.
- PostgreSQL connector documentation
While the standard deployment using kafka connect is the most robust CDC option, we are capturing changes only for two small sized tables, one with 20K insert every 5 min at most 24 times in one day, for two days each month. The other has very rare CRUD operation after initialization (esteemed to be 200 each month). Since the major cloud providers implements pay-as-you-fart mechanism, and since using kafka we should also need a consumer, we have decided to merge the CDC engine within the consumer to reduce the deployment cost. It has been made possible since we do not think that parallelism is needed to address the current load: a single consumer is enough.
Debezium acts like a consumer of replication record provided by postgres. To achieve this goal an entry into the table
pg_replication_slots
is created. If, for any reason, the debezium process is no more running but the entry is still in
the pg_replication_slots
table, the database will experience a huge disk space increase. To address this behaviour use
select pg_drop_replication_slot(slot_name)
from pg_replication_slots
to kill them all or
select pg_drop_replication_slot(${slot_name})
to kill the unused one. Postgres will be in charge of cleanup the unused disk space.
- append
schema.table
to the csv list of tables to track, into .properites files underdebezium.table.include.list
- create a package under
it.nicods.debeziumdemo.data
namedschema.table_to_track
- under the new package create
- a class
TableName.java
with the following annotations:and with properties all the field of the table plus the following:@Entity @Table(name = "table_name_tracking", schema = "tracking") @Getter @Setter @ToString @NoArgsConstructor @AllArgsConstructor @Builder
@Id @GeneratedValue private Integer Id; //table fields... private String operation; private ZonedDateTime ts;
- create the relative repository:
@Repository public interface TableNameRepository extends JpaRepository<TableName, Integer> {}
- Create a change event class:
@AllArgsConstructor @NoArgsConstructor @Getter @Setter @ToString public class TableNameChangeEvent { private TableName before; private TableName after; private String op; private Long ts_ms; }
- Create a service class capable of handling the created change event:
@Slf4j public class TableNameService extends TrackingService<TableNameChangeEvent> { public TableNameRepository repository; public TableNameService(TableNameRepository repository){ this.repository =repository; } @Override @Transactional public void handleEvent(TableNameChangeEvent record) { //DO STUFF } }
- a class
- Integrate the new table change event into
DebeziumSourceEventListener
:case "cdc-demo-connector.schema.table_name": TableNameChangeEvent tEvent = tableNameService.deserialize(sourceRecordValue, TableNameChangeEvent.class); log.debug("JSON DESER {}", tEvent); tableNameService.handleEvent(tEvent); return true;