-
Notifications
You must be signed in to change notification settings - Fork 35
/
IcebergTableOperator.java
194 lines (168 loc) · 6.88 KB
/
IcebergTableOperator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.debezium.server.iceberg.tableoperator;
import io.debezium.DebeziumException;
import io.debezium.server.iceberg.IcebergChangeEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.iceberg.*;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper to perform operations in iceberg tables
*
* @author Rafael Acevedo
*/
@Dependent
public class IcebergTableOperator {
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.allow-field-addition", defaultValue = "true")
boolean allowFieldAddition;
@Inject
IcebergTableWriterFactory writerFactory;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
protected List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> events) {
ConcurrentHashMap<JsonNode, IcebergChangeEvent> deduplicatedEvents = new ConcurrentHashMap<>();
events.forEach(e ->
// deduplicate using key(PK)
deduplicatedEvents.merge(e.key(), e, (oldValue, newValue) -> {
if (this.compareByTsThenOp(oldValue.value(), newValue.value()) <= 0) {
return newValue;
} else {
return oldValue;
}
})
);
return new ArrayList<>(deduplicatedEvents.values());
}
/**
* This is used to deduplicate events within given batch.
* <p>
* Forex ample a record can be updated multiple times in the source. for example insert followed by update and
* delete. for this case we need to only pick last change event for the row.
* <p>
* Its used when `upsert` feature enabled (when the consumer operating non append mode) which means it should not add
* duplicate records to target table.
*
* @param lhs
* @param rhs
* @return
*/
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {
int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));
if (result == 0) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
result = cdcOperations.getOrDefault(lhs.get(opColumn).asText("c"), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.get(opColumn).asText("c"), -1)
);
}
return result;
}
/**
* If given schema contains new fields compared to target table schema then it adds new fields to target iceberg
* table.
* <p>
* Its used when allow field addition feature is enabled.
*
* @param icebergTable
* @param newSchema
*/
private void applyFieldAddition(Table icebergTable, Schema newSchema) {
UpdateSchema us = icebergTable.updateSchema().
unionByNameWith(newSchema).
setIdentifierFields(newSchema.identifierFieldNames());
Schema newSchemaCombined = us.apply();
// @NOTE avoid committing when there is no schema change. commit creates new commit even when there is no change!
if (!icebergTable.schema().sameSchema(newSchemaCombined)) {
LOGGER.warn("Extending schema of {}", icebergTable.name());
us.commit();
}
}
/**
* Adds list of events to iceberg table.
* <p>
* If field addition enabled then it groups list of change events by their schema first. Then adds new fields to
* iceberg table if there is any. And then follows with adding data to the table.
* <p>
* New fields are detected using CDC event schema, since events are grouped by their schemas it uses single
* event to find-out schema for the whole list of events.
*
* @param icebergTable
* @param events
*/
public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
events = deduplicateBatch(events);
}
if (!allowFieldAddition) {
// if field additions not enabled add set of events to table
addToTablePerSchema(icebergTable, events);
} else {
Map<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> eventsGroupedBySchema =
events.stream()
.collect(Collectors.groupingBy(IcebergChangeEvent::jsonSchema));
LOGGER.debug("Batch got {} records with {} different schema!!", events.size(), eventsGroupedBySchema.keySet().size());
for (Map.Entry<IcebergChangeEvent.JsonSchema, List<IcebergChangeEvent>> schemaEvents : eventsGroupedBySchema.entrySet()) {
// extend table schema if new fields found
applyFieldAddition(icebergTable, schemaEvents.getKey().icebergSchema());
// add set of events to table
addToTablePerSchema(icebergTable, schemaEvents.getValue());
}
}
}
/**
* Adds list of change events to iceberg table. All the events are having same schema.
*
* @param icebergTable
* @param events
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}
writer.close();
WriteResult files = writer.complete();
if (files.deleteFiles().length > 0) {
RowDelta newRowDelta = icebergTable.newRowDelta();
Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows);
Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes);
newRowDelta.commit();
} else {
AppendFiles appendFiles = icebergTable.newAppend();
Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile);
appendFiles.commit();
}
} catch (IOException ex) {
throw new DebeziumException(ex);
}
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());
}
}