Skip to content

Commit

Permalink
restore previous stream handling (leaking sockets)
Browse files Browse the repository at this point in the history
  • Loading branch information
majodev committed May 25, 2023
1 parent 7200211 commit 831e96b
Showing 1 changed file with 52 additions and 46 deletions.
98 changes: 52 additions & 46 deletions server/logic/fetchFontSubsetArchive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { finished } from "stream/promises";
import { config } from "../config";
import { asyncRetry } from "../utils/asyncRetry";
import { IVariantItem } from "./fetchFontURLs";
import { Readable, pipeline } from "stream";
import { Readable } from "stream";

const RETRIES = 2;

Expand Down Expand Up @@ -35,63 +35,69 @@ export async function fetchFontSubsetArchive(

const archive = new JSZip();

// const streams: (Readable | fs.WriteStream)[] = _.compact(
// _.flatten(
await Bluebird.map(variants, (variant) => {
return Bluebird.map(variant.urls, async (variantUrl) => {
const filename = `${fontID}-${fontVersion}-${subsets.join("_")}-${variant.id}.${variantUrl.format}`;

// download the file for type (filename now known)
let readable: Readable;
try {
// console.log("fetchFontSubsetArchive...", variantUrl.format, filename, variantUrl.url);
readable = await fetchFontSubsetArchiveStream(variantUrl.url, filename, variantUrl.format);
archive.file(filename, readable);
} catch (e) {
// if a specific format does not work, silently discard it.
console.error("fetchFontSubsetArchive discarding", variantUrl.format, filename, variantUrl.url);
return null;
}

subsetFontArchive.files.push({
variant: variant.id, // variants and format are used to filter them out later!
format: variantUrl.format,
path: filename,
});

// return stream;
});
});
// )
// );
const streams: (Readable | fs.WriteStream)[] = _.compact(
_.flatten(
await Bluebird.map(variants, (variant) => {
return Bluebird.map(variant.urls, async (variantUrl) => {
const filename = `${fontID}-${fontVersion}-${subsets.join("_")}-${variant.id}.${variantUrl.format}`;

// download the file for type (filename now known)
let readable: Readable;
try {
// console.log("fetchFontSubsetArchive...", variantUrl.format, filename, variantUrl.url);
readable = await fetchFontSubsetArchiveStream(variantUrl.url, filename, variantUrl.format);
archive.file(filename, readable);
} catch (e) {
// if a specific format does not work, silently discard it.
console.error("fetchFontSubsetArchive discarding", variantUrl.format, filename, variantUrl.url);
return null;
}

subsetFontArchive.files.push({
variant: variant.id, // variants and format are used to filter them out later!
format: variantUrl.format,
path: filename,
});

return readable;
});
})
)
);

const target = fs.createWriteStream(subsetFontArchive.zipPath);
// streams.push(target);
streams.push(target);

console.info(`fetchFontSubsetArchive create archive... file=${subsetFontArchive.zipPath}`);

try {
await finished(pipeline(archive.generateNodeStream({
await finished(archive.generateNodeStream({
compression: "DEFLATE",
streamFiles: true,
}), target, (err) => {
if (err) {
console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed file=${subsetFontArchive.zipPath}", err);
}
}));
// streamFiles: true,
}).pipe(target));

// await finished(pipeline(archive.generateNodeStream({
// compression: "DEFLATE",
// streamFiles: true,
// }), target, (err) => {
// if (err) {
// console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed file=${subsetFontArchive.zipPath}", err);
// }
// }));
console.info(`fetchFontSubsetArchive create archive done! file=${subsetFontArchive.zipPath}`);
} catch (e) {
console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed", e);
// ensure all fs streams into the archive and the actual zip file are destroyed
// _.each(streams, (stream) => {
// try {
// stream.destroy();
// } catch (err) {
// console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed, stream.destroy failed (catched)", fontID, subsets, err);
// }
// });
_.each(streams, (stream, index) => {
try {
console.warn(`fetchFontSubsetArchive archive.generateNodeStream destroy stream ${index}/${streams.length}...`)
stream.destroy();
} catch (err) {
console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed, stream.destroy failed (catched)", fontID, subsets, err);
}
});

// console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed, streams destroyed. Rethrowing err...", fontID, subsets, e);
console.error("fetchFontSubsetArchive archive.generateNodeStream pipe failed, streams destroyed. Rethrowing err...", fontID, subsets, e);
throw e;
}

Expand Down

0 comments on commit 831e96b

Please sign in to comment.