/
PendingSplitsCheckpointSerializer.java
140 lines (111 loc) · 5.31 KB
/
PendingSplitsCheckpointSerializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.tencent.cloud.oceanus.connector.file.enumerator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import com.tencent.cloud.oceanus.connector.file.split.FileSourceSplit;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** */
public class PendingSplitsCheckpointSerializer
implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
private static final int VERSION = 1;
private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
private final SimpleVersionedSerializer<FileSourceSplit> splitSerializer;
public PendingSplitsCheckpointSerializer(
SimpleVersionedSerializer<FileSourceSplit> splitSerializer) {
this.splitSerializer = checkNotNull(splitSerializer);
}
// ------------------------------------------------------------------------
@Override
public int getVersion() {
return VERSION;
}
@Override
public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException {
checkArgument(
checkpoint.getClass() == PendingSplitsCheckpoint.class,
"Cannot serialize subclasses of PendingSplitsCheckpoint");
// optimization: the splits lazily cache their own serialized form
if (checkpoint.serializedFormCache != null) {
return checkpoint.serializedFormCache;
}
final SimpleVersionedSerializer<FileSourceSplit> splitSerializer =
this.splitSerializer; // stack cache
final Collection<FileSourceSplit> splits = checkpoint.getSplits();
final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
int totalLen =
16; // four ints: magic, version of split serializer, count splits, count paths
for (FileSourceSplit split : splits) {
final byte[] serSplit = splitSerializer.serialize(split);
serializedSplits.add(serSplit);
totalLen += serSplit.length + 4; // 4 bytes for the length field
}
for (Path path : processedPaths) {
final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
serializedPaths.add(serPath);
totalLen += serPath.length + 4; // 4 bytes for the length field
}
final byte[] result = new byte[totalLen];
final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
byteBuffer.putInt(splitSerializer.getVersion());
byteBuffer.putInt(serializedSplits.size());
byteBuffer.putInt(serializedPaths.size());
for (byte[] splitBytes : serializedSplits) {
byteBuffer.putInt(splitBytes.length);
byteBuffer.put(splitBytes);
}
for (byte[] pathBytes : serializedPaths) {
byteBuffer.putInt(pathBytes.length);
byteBuffer.put(pathBytes);
}
assert byteBuffer.remaining() == 0;
// optimization: cache the serialized from, so we avoid the byte work during repeated
// serialization
checkpoint.serializedFormCache = result;
return result;
}
@Override
public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException {
if (version != VERSION) {
throw new IOException("Unknown version: " + version);
}
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
final int magic = bb.getInt();
if (magic != VERSION_1_MAGIC_NUMBER) {
throw new IOException(
String.format(
"Invalid magic number for PendingSplitsCheckpoint. "
+ "Expected: %X , found %X",
VERSION_1_MAGIC_NUMBER, magic));
}
final int splitSerializerVersion = bb.getInt();
final int numSplits = bb.getInt();
final int numPaths = bb.getInt();
final SimpleVersionedSerializer<FileSourceSplit> splitSerializer =
this.splitSerializer; // stack cache
final ArrayList<FileSourceSplit> splits = new ArrayList<>(numSplits);
final ArrayList<Path> paths = new ArrayList<>(numPaths);
for (int remaining = numSplits; remaining > 0; remaining--) {
final byte[] bytes = new byte[bb.getInt()];
bb.get(bytes);
final FileSourceSplit split =
splitSerializer.deserialize(splitSerializerVersion, bytes);
splits.add(split);
}
for (int remaining = numPaths; remaining > 0; remaining--) {
final byte[] bytes = new byte[bb.getInt()];
bb.get(bytes);
final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
paths.add(path);
}
return PendingSplitsCheckpoint.reusingCollection(splits, paths);
}
}