-
Notifications
You must be signed in to change notification settings - Fork 219
/
CompressOutputStream.java
194 lines (177 loc) · 7.57 KB
/
CompressOutputStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// SPDX-License-Identifier: LGPL-2.1-or-later
// Copyright (c) 2012-2014 Monty Program Ab
// Copyright (c) 2015-2021 MariaDB Corporation Ab
package org.mariadb.jdbc.client.socket.impl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream;
import org.mariadb.jdbc.client.util.MutableInt;
/**
* Compression writer handler Permit to wrap standard packet to compressed packet ( 7 byte header).
* Driver will compress packet only if packet size is meaningful (1536 bytes) > to one TCP packet.
*/
public class CompressOutputStream extends OutputStream {
private static final int MIN_COMPRESSION_SIZE = 1536; // TCP-IP single packet
private final OutputStream out;
private final MutableInt sequence;
private final byte[] header = new byte[7];
private byte[] longPacketBuffer = null;
/**
* Constructor.
*
* @param out socket output stream
* @param compressionSequence compression sequence
*/
public CompressOutputStream(OutputStream out, MutableInt compressionSequence) {
this.out = out;
this.sequence = compressionSequence;
}
/**
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code>
* to this output stream. The general contract for <code>write(b, off, len)</code> is that some
* bytes in the array <code>b</code> are written to the output stream in order; element <code>
* b[off]</code> is the first byte written and <code>b[off+len-1]</code> is the last byte written
* by this operation.
*
* <p>The <code>write</code> method of <code>OutputStream</code> calls the write method of one
* argument on each of the bytes to be written out. Subclasses are encouraged to override this
* method and provide a more efficient implementation.
*
* <p>If <code>b</code> is <code>null</code>, a <code>NullPointerException</code> is thrown.
*
* <p>If <code>off</code> is negative, or <code>len</code> is negative, or <code>off+len</code> is
* greater than the length of the array <code>b</code>, then an IndexOutOfBoundsException is
* thrown.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
* @throws IOException if an I/O error occurs. In particular, an <code>IOException</code> is
* thrown if the output stream is closed.
*/
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (len + ((longPacketBuffer != null) ? longPacketBuffer.length : 0) < MIN_COMPRESSION_SIZE) {
// *******************************************************************************
// small packet, no compression
// *******************************************************************************
if (longPacketBuffer != null) {
header[0] = (byte) (len + longPacketBuffer.length);
header[1] = (byte) ((len + longPacketBuffer.length) >>> 8);
header[2] = 0;
header[3] = sequence.incrementAndGet();
header[4] = 0;
header[5] = 0;
header[6] = 0;
out.write(header, 0, 7);
out.write(longPacketBuffer, 0, longPacketBuffer.length);
out.write(b, off, len);
longPacketBuffer = null;
return;
}
header[0] = (byte) len;
header[1] = (byte) (len >>> 8);
header[2] = 0;
header[3] = sequence.incrementAndGet();
header[4] = 0;
header[5] = 0;
header[6] = 0;
out.write(header, 0, 7);
out.write(b, off, len);
} else {
// *******************************************************************************
// compressing packet
// *******************************************************************************
int sent = 0;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
try (DeflaterOutputStream deflater = new DeflaterOutputStream(baos)) {
/**
* For multi packet, len will be 0x00ffffff + 4 bytes for header. but compression can only
* compress up to 0x00ffffff bytes (header initial length size cannot be > 3 bytes) so,
* for this specific case, a buffer will save remaining data
*/
if (longPacketBuffer != null) {
deflater.write(longPacketBuffer, 0, longPacketBuffer.length);
sent = longPacketBuffer.length;
longPacketBuffer = null;
}
if (len + sent > 0x00ffffff) {
int remaining = len + sent - 0x00ffffff;
longPacketBuffer = new byte[remaining];
System.arraycopy(b, off + 0x00ffffff - sent, longPacketBuffer, 0, remaining);
}
int bufLenSent = Math.min(0x00ffffff - sent, len);
deflater.write(b, off, bufLenSent);
sent += bufLenSent;
deflater.finish();
}
byte[] compressedBytes = baos.toByteArray();
int compressLen = compressedBytes.length;
header[0] = (byte) compressLen;
header[1] = (byte) (compressLen >>> 8);
header[2] = (byte) (compressLen >>> 16);
header[3] = sequence.incrementAndGet();
header[4] = (byte) sent;
header[5] = (byte) (sent >>> 8);
header[6] = (byte) (sent >>> 16);
out.write(header, 0, 7);
out.write(compressedBytes, 0, compressLen);
out.flush();
}
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be written out. The general
* contract of <code>flush</code> is that calling it is an indication that, if any bytes
* previously written have been buffered by the implementation of the output stream, such bytes
* should immediately be written to their intended destination.
*
* <p>If the intended destination of this stream is an abstraction provided by the underlying
* operating system, for example a file, then flushing the stream guarantees only that bytes
* previously written to the stream are passed to the operating system for writing; it does not
* guarantee that they are actually written to a physical device such as a disk drive.
*
* <p>The <code>flush</code> method of <code>OutputStream</code> does nothing.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void flush() throws IOException {
if (longPacketBuffer != null) {
byte[] b = longPacketBuffer;
longPacketBuffer = null;
write(b, 0, b.length);
}
out.flush();
sequence.set((byte) -1);
}
/**
* Closes this output stream and releases any system resources associated with this stream. The
* general contract of <code>close</code> is that it closes the output stream. A closed stream
* cannot perform output operations and cannot be reopened.
*
* <p>The <code>close</code> method of <code>OutputStream</code> does nothing.
*
* @throws IOException if an I/O error occurs.
*/
@Override
public void close() throws IOException {
out.close();
}
/**
* Writes the specified byte to this output stream. The general contract for <code>write</code> is
* that one byte is written to the output stream. The byte to be written is the eight low-order
* bits of the argument <code>b</code>. The 24 high-order bits of <code>b</code> are ignored.
*
* <p>Subclasses of <code>OutputStream</code> must provide an implementation for this method.
*
* @param b the <code>byte</code>.
* @throws IOException if an I/O error occurs. In particular, an <code>IOException</code> may be
* thrown if the output stream has been closed.
*/
@Override
public void write(int b) throws IOException {
throw new IOException("NOT EXPECTED !");
}
}