Skip to content

Commit

Permalink
Fix memory leak in gzip pool + add test for gzip'd data
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner committed Nov 13, 2022
1 parent 21bf3dd commit 1cce9da
Show file tree
Hide file tree
Showing 7 changed files with 1,530 additions and 62 deletions.
61 changes: 11 additions & 50 deletions src/http/zlib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,26 @@ const MutableString = @import("../global.zig").MutableString;
const getAllocator = @import("../http_client_async.zig").getAllocator;
const ZlibPool = @This();
const Zlib = @import("../zlib.zig");
const bun = @import("../global.zig");

lock: Lock = Lock.init(),
items: std.ArrayList(*MutableString),
allocator: std.mem.Allocator,

pub var instance: ZlibPool = undefined;
pub var loaded: bool = false;

pub fn init(allocator: std.mem.Allocator) ZlibPool {
return ZlibPool{
.allocator = allocator,
.items = std.ArrayList(*MutableString).init(allocator),
};
fn initMutableString(allocator: std.mem.Allocator) anyerror!MutableString {
return MutableString.initEmpty(allocator);
}

pub fn get(this: *ZlibPool) !*MutableString {
std.debug.assert(loaded);

switch (this.items.items.len) {
0 => {
var mutable = try getAllocator().create(MutableString);
mutable.* = try MutableString.init(getAllocator(), 0);
return mutable;
},
else => {
return this.items.pop();
},
}
const BufferPool = bun.ObjectPool(MutableString, initMutableString, false, 4);

unreachable;
pub fn get(allocator: std.mem.Allocator) *MutableString {
return &BufferPool.get(allocator).data;
}

pub fn put(this: *ZlibPool, mutable: *MutableString) !void {
std.debug.assert(loaded);
pub fn put(mutable: *MutableString) void {
mutable.reset();
try this.items.append(mutable);
var node = @fieldParentPtr(BufferPool.Node, "data", mutable);
node.release();
}

pub fn decompress(compressed_data: []const u8, output: *MutableString) Zlib.ZlibError!void {
// Heuristic: if we have more than 128 KB of data to decompress
// it may take 1ms or so
// We must keep the network thread unblocked as often as possible
// So if we have more than 50 KB of data to decompress, we do it off the network thread
// if (compressed_data.len < 50_000) {
var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, getAllocator());
var reader = try Zlib.ZlibReaderArrayList.init(compressed_data, &output.list, output.allocator);
try reader.readAll();
return;
// }

// var task = try DecompressionTask.get(default_allocator);
// defer task.release();
// task.* = DecompressionTask{
// .data = compressed_data,
// .output = output,
// .event_fd = AsyncIO.global.eventfd(),
// };
// task.scheduleAndWait();

// if (task.err) |err| {
// return @errSetCast(Zlib.ZlibError, err);
// }
reader.deinit();
}
15 changes: 5 additions & 10 deletions src/http_client_async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ pub const InternalState = struct {
}

if (this.compressed_body) |body| {
ZlibPool.instance.put(body) catch unreachable;
ZlibPool.put(body);
this.compressed_body = null;
}

Expand All @@ -666,12 +666,7 @@ pub const InternalState = struct {
switch (this.encoding) {
Encoding.gzip, Encoding.deflate => {
if (this.compressed_body == null) {
if (!ZlibPool.loaded) {
ZlibPool.instance = ZlibPool.init(default_allocator);
ZlibPool.loaded = true;
}

this.compressed_body = ZlibPool.instance.get() catch unreachable;
this.compressed_body = ZlibPool.get(default_allocator);
}

return this.compressed_body.?;
Expand All @@ -695,8 +690,8 @@ pub const InternalState = struct {
gzip_timer = std.time.Timer.start() catch @panic("Timer failure");

body_out_str.list.expandToCapacity();
defer ZlibPool.instance.put(buffer_) catch unreachable;
ZlibPool.decompress(buffer.list.items, body_out_str) catch |err| {
defer ZlibPool.put(buffer_);
ZlibPool.decompress(buffer_.list.items, body_out_str) catch |err| {
Output.prettyErrorln("<r><red>Zlib error<r>", .{});
Output.flush();
return err;
Expand Down Expand Up @@ -1725,7 +1720,7 @@ pub fn handleResponseBody(this: *HTTPClient, incoming_data: []const u8) !bool {
buffer.list.ensureTotalCapacityPrecise(buffer.allocator, this.state.body_size) catch {};
}

const remaining_content_length = this.state.body_size - buffer.list.items.len;
const remaining_content_length = this.state.body_size -| buffer.list.items.len;
var remainder = incoming_data[0..@minimum(incoming_data.len, remaining_content_length)];

_ = try buffer.write(remainder);
Expand Down
116 changes: 116 additions & 0 deletions test/bun.js/fetch-gzip.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { concatArrayBuffers } from "bun";
import { it, describe, expect } from "bun:test";
import fs from "fs";
import { gc } from "./gc";

it("fetch() with a buffered gzip response works (one chunk)", async () => {
var server = Bun.serve({
port: 6025,

async fetch(req) {
return new Response(
await Bun.file(import.meta.dir + "/fixture.html.gz").arrayBuffer(),
{
headers: {
"Content-Encoding": "gzip",
"Content-Type": "text/html; charset=utf-8",
},
},
);
},
});

const res = await fetch(
`http://${server.hostname}:${server.port}`,
{},
{ verbose: true },
);
const arrayBuffer = await res.arrayBuffer();
expect(
new Buffer(arrayBuffer).equals(
new Buffer(
await Bun.file(import.meta.dir + "/fixture.html").arrayBuffer(),
),
),
).toBe(true);
server.stop();
});

it("fetch() with a gzip response works (one chunk)", async () => {
var server = Bun.serve({
port: 6023,

fetch(req) {
return new Response(Bun.file(import.meta.dir + "/fixture.html.gz"), {
headers: {
"Content-Encoding": "gzip",
"Content-Type": "text/html; charset=utf-8",
},
});
},
});

const res = await fetch(`http://${server.hostname}:${server.port}`);
const arrayBuffer = await res.arrayBuffer();
expect(
new Buffer(arrayBuffer).equals(
new Buffer(
await Bun.file(import.meta.dir + "/fixture.html").arrayBuffer(),
),
),
).toBe(true);
server.stop();
});

it("fetch() with a gzip response works (multiple chunks)", async () => {
var server = Bun.serve({
port: 6024,

fetch(req) {
return new Response(
new ReadableStream({
type: "direct",
async pull(controller) {
var chunks: ArrayBuffer[] = [];
const buffer = await Bun.file(
import.meta.dir + "/fixture.html.gz",
).arrayBuffer();
var remaining = buffer;
for (var i = 100; i < buffer.byteLength; i += 100) {
var chunk = remaining.slice(0, i);
remaining = remaining.slice(i);
controller.write(chunk);
chunks.push(chunk);
await controller.flush();
}

await controller.flush();
// sanity check
expect(
new Buffer(concatArrayBuffers(chunks)).equals(new Buffer(buffer)),
).toBe(true);

controller.end();
},
}),
{
headers: {
"Content-Encoding": "gzip",
"Content-Type": "text/html; charset=utf-8",
"Content-Length": "1",
},
},
);
},
});

const res = await fetch(`http://${server.hostname}:${server.port}`, {});
const arrayBuffer = await res.arrayBuffer();
expect(
new Buffer(arrayBuffer).equals(
new Buffer(
await Bun.file(import.meta.dir + "/fixture.html").arrayBuffer(),
),
),
).toBe(true);
});
2 changes: 1 addition & 1 deletion test/bun.js/filesink.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ describe("FileSink", () => {
} catch (e) {}
mkfifo(path, 0o666);
activeFIFO = (async function (stream: ReadableStream<Uint8Array>) {
var chunks = [];
var chunks: Uint8Array[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
Expand Down
Loading

0 comments on commit 1cce9da

Please sign in to comment.