Skip to content

Commit ba11dd5

Browse files
mkusakaryoppippi
andauthored
fix(ccusage): use streaming to handle large JSONL files (#706)
Co-authored-by: ryoppippi <1560508+ryoppippi@users.noreply.github.com>
1 parent 2235f2a commit ba11dd5

File tree

1 file changed

+179
-47
lines changed

1 file changed

+179
-47
lines changed

apps/ccusage/src/data-loader.ts

Lines changed: 179 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ import type {
1818
SortOrder,
1919
Version,
2020
} from './_types.ts';
21+
import { createReadStream, createWriteStream } from 'node:fs';
2122
import { readFile } from 'node:fs/promises';
2223
import path from 'node:path';
2324
import process from 'node:process';
25+
import { createInterface } from 'node:readline';
2426
import { toArray } from '@antfu/utils';
2527
import { Result } from '@praha/byethrow';
2628
import { groupBy, uniq } from 'es-toolkit'; // TODO: after node20 is deprecated, switch to native Object.groupBy
@@ -529,22 +531,41 @@ export function createUniqueHash(data: UsageData): string | null {
529531
return `${messageId}:${requestId}`;
530532
}
531533

534+
/**
535+
* Process a JSONL file line by line using streams to avoid memory issues with large files
536+
* @param filePath - Path to the JSONL file
537+
* @param processLine - Callback function to process each line
538+
*/
539+
async function processJSONLFileByLine(
540+
filePath: string,
541+
processLine: (line: string, lineNumber: number) => void | Promise<void>,
542+
): Promise<void> {
543+
const fileStream = createReadStream(filePath, { encoding: 'utf-8' });
544+
const rl = createInterface({
545+
input: fileStream,
546+
crlfDelay: Number.POSITIVE_INFINITY,
547+
});
548+
549+
let lineNumber = 0;
550+
for await (const line of rl) {
551+
lineNumber++;
552+
if (line.trim().length === 0) {
553+
continue;
554+
}
555+
await processLine(line, lineNumber);
556+
}
557+
}
558+
532559
/**
533560
* Extract the earliest timestamp from a JSONL file
534561
* Scans through the file until it finds a valid timestamp
562+
* Uses streaming to handle large files without loading entire content into memory
535563
*/
536564
export async function getEarliestTimestamp(filePath: string): Promise<Date | null> {
537565
try {
538-
const content = await readFile(filePath, 'utf-8');
539-
const lines = content.trim().split('\n');
540-
541566
let earliestDate: Date | null = null;
542567

543-
for (const line of lines) {
544-
if (line.trim() === '') {
545-
continue;
546-
}
547-
568+
await processJSONLFileByLine(filePath, (line) => {
548569
try {
549570
const json = JSON.parse(line) as Record<string, unknown>;
550571
if (json.timestamp != null && typeof json.timestamp === 'string') {
@@ -558,9 +579,8 @@ export async function getEarliestTimestamp(filePath: string): Promise<Date | nul
558579
}
559580
catch {
560581
// Skip invalid JSON lines
561-
continue;
562582
}
563-
}
583+
});
564584

565585
return earliestDate;
566586
}
@@ -759,26 +779,23 @@ export async function loadDailyUsageData(
759779
const allEntries: { data: UsageData; date: string; cost: number; model: string | undefined; project: string }[] = [];
760780

761781
for (const file of sortedFiles) {
762-
const content = await readFile(file, 'utf-8');
763-
const lines = content
764-
.trim()
765-
.split('\n')
766-
.filter(line => line.length > 0);
782+
// Extract project name from file path once per file
783+
const project = extractProjectFromPath(file);
767784

768-
for (const line of lines) {
785+
await processJSONLFileByLine(file, async (line) => {
769786
try {
770787
const parsed = JSON.parse(line) as unknown;
771788
const result = v.safeParse(usageDataSchema, parsed);
772789
if (!result.success) {
773-
continue;
790+
return;
774791
}
775792
const data = result.output;
776793

777794
// Check for duplicate message + request ID combination
778795
const uniqueHash = createUniqueHash(data);
779796
if (isDuplicateEntry(uniqueHash, processedHashes)) {
780797
// Skip duplicate message
781-
continue;
798+
return;
782799
}
783800

784801
// Mark this combination as processed
@@ -792,15 +809,12 @@ export async function loadDailyUsageData(
792809
? await calculateCostForEntry(data, mode, fetcher)
793810
: data.costUSD ?? 0;
794811

795-
// Extract project name from file path
796-
const project = extractProjectFromPath(file);
797-
798812
allEntries.push({ data, date, cost, model: data.message.model, project });
799813
}
800814
catch {
801815
// Skip invalid JSON lines
802816
}
803-
}
817+
});
804818
}
805819

806820
// Group by date, optionally including project
@@ -933,26 +947,20 @@ export async function loadSessionData(
933947
const joinedPath = parts.slice(0, -2).join(path.sep);
934948
const projectPath = joinedPath.length > 0 ? joinedPath : 'Unknown Project';
935949

936-
const content = await readFile(file, 'utf-8');
937-
const lines = content
938-
.trim()
939-
.split('\n')
940-
.filter(line => line.length > 0);
941-
942-
for (const line of lines) {
950+
await processJSONLFileByLine(file, async (line) => {
943951
try {
944952
const parsed = JSON.parse(line) as unknown;
945953
const result = v.safeParse(usageDataSchema, parsed);
946954
if (!result.success) {
947-
continue;
955+
return;
948956
}
949957
const data = result.output;
950958

951959
// Check for duplicate message + request ID combination
952960
const uniqueHash = createUniqueHash(data);
953961
if (isDuplicateEntry(uniqueHash, processedHashes)) {
954962
// Skip duplicate message
955-
continue;
963+
return;
956964
}
957965

958966
// Mark this combination as processed
@@ -976,7 +984,7 @@ export async function loadSessionData(
976984
catch {
977985
// Skip invalid JSON lines
978986
}
979-
}
987+
});
980988
}
981989

982990
// Group by session using Object.groupBy
@@ -1103,21 +1111,19 @@ export async function loadSessionUsageById(
11031111
if (file == null) {
11041112
return null;
11051113
}
1106-
const content = await readFile(file, 'utf-8');
1107-
const lines = content.trim().split('\n').filter(line => line.length > 0);
11081114

11091115
const mode = options?.mode ?? 'auto';
11101116
using fetcher = mode === 'display' ? null : new PricingFetcher(options?.offline);
11111117

11121118
const entries: UsageData[] = [];
11131119
let totalCost = 0;
11141120

1115-
for (const line of lines) {
1121+
await processJSONLFileByLine(file, async (line) => {
11161122
try {
11171123
const parsed = JSON.parse(line) as unknown;
11181124
const result = v.safeParse(usageDataSchema, parsed);
11191125
if (!result.success) {
1120-
continue;
1126+
return;
11211127
}
11221128
const data = result.output;
11231129

@@ -1131,7 +1137,7 @@ export async function loadSessionUsageById(
11311137
catch {
11321138
// Skip invalid JSON lines
11331139
}
1134-
}
1140+
});
11351141

11361142
return { totalCost, entries };
11371143
}
@@ -1353,26 +1359,20 @@ export async function loadSessionBlockData(
13531359
const allEntries: LoadedUsageEntry[] = [];
13541360

13551361
for (const file of sortedFiles) {
1356-
const content = await readFile(file, 'utf-8');
1357-
const lines = content
1358-
.trim()
1359-
.split('\n')
1360-
.filter(line => line.length > 0);
1361-
1362-
for (const line of lines) {
1362+
await processJSONLFileByLine(file, async (line) => {
13631363
try {
13641364
const parsed = JSON.parse(line) as unknown;
13651365
const result = v.safeParse(usageDataSchema, parsed);
13661366
if (!result.success) {
1367-
continue;
1367+
return;
13681368
}
13691369
const data = result.output;
13701370

13711371
// Check for duplicate message + request ID combination
13721372
const uniqueHash = createUniqueHash(data);
13731373
if (isDuplicateEntry(uniqueHash, processedHashes)) {
13741374
// Skip duplicate message
1375-
continue;
1375+
return;
13761376
}
13771377

13781378
// Mark this combination as processed
@@ -1403,7 +1403,7 @@ export async function loadSessionBlockData(
14031403
// Skip invalid JSON lines but log for debugging purposes
14041404
logger.debug(`Skipping invalid JSON line in 5-hour blocks: ${error instanceof Error ? error.message : String(error)}`);
14051405
}
1406-
}
1406+
});
14071407
}
14081408

14091409
// Identify session blocks
@@ -4081,6 +4081,138 @@ invalid json line
40814081
expect(result).toHaveLength(1);
40824082
expect(result[0]?.entries).toHaveLength(1);
40834083
});
4084+
4085+
describe('processJSONLFileByLine', () => {
4086+
it('should process each non-empty line with correct line numbers', async () => {
4087+
await using fixture = await createFixture({
4088+
'test.jsonl': '{"line": 1}\n{"line": 2}\n{"line": 3}\n',
4089+
});
4090+
4091+
const lines: Array<{ content: string; lineNumber: number }> = [];
4092+
await processJSONLFileByLine(path.join(fixture.path, 'test.jsonl'), (line, lineNumber) => {
4093+
lines.push({ content: line, lineNumber });
4094+
});
4095+
4096+
expect(lines).toHaveLength(3);
4097+
expect(lines[0]).toEqual({ content: '{"line": 1}', lineNumber: 1 });
4098+
expect(lines[1]).toEqual({ content: '{"line": 2}', lineNumber: 2 });
4099+
expect(lines[2]).toEqual({ content: '{"line": 3}', lineNumber: 3 });
4100+
});
4101+
4102+
it('should skip empty lines', async () => {
4103+
await using fixture = await createFixture({
4104+
'test.jsonl': '{"line": 1}\n\n{"line": 2}\n \n{"line": 3}\n',
4105+
});
4106+
4107+
const lines: string[] = [];
4108+
await processJSONLFileByLine(path.join(fixture.path, 'test.jsonl'), (line) => {
4109+
lines.push(line);
4110+
});
4111+
4112+
expect(lines).toHaveLength(3);
4113+
expect(lines[0]).toBe('{"line": 1}');
4114+
expect(lines[1]).toBe('{"line": 2}');
4115+
expect(lines[2]).toBe('{"line": 3}');
4116+
});
4117+
4118+
it('should handle async processLine callback', async () => {
4119+
await using fixture = await createFixture({
4120+
'test.jsonl': '{"line": 1}\n{"line": 2}\n',
4121+
});
4122+
4123+
const results: string[] = [];
4124+
await processJSONLFileByLine(path.join(fixture.path, 'test.jsonl'), async (line) => {
4125+
// Simulate async operation
4126+
await new Promise(resolve => setTimeout(resolve, 1));
4127+
results.push(line);
4128+
});
4129+
4130+
expect(results).toHaveLength(2);
4131+
expect(results[0]).toBe('{"line": 1}');
4132+
expect(results[1]).toBe('{"line": 2}');
4133+
});
4134+
4135+
it('should throw error when file does not exist', async () => {
4136+
await expect(
4137+
processJSONLFileByLine('/nonexistent/file.jsonl', () => {}),
4138+
).rejects.toThrow();
4139+
});
4140+
4141+
it('should handle empty file', async () => {
4142+
await using fixture = await createFixture({
4143+
'empty.jsonl': '',
4144+
});
4145+
4146+
const lines: string[] = [];
4147+
await processJSONLFileByLine(path.join(fixture.path, 'empty.jsonl'), (line) => {
4148+
lines.push(line);
4149+
});
4150+
4151+
expect(lines).toHaveLength(0);
4152+
});
4153+
4154+
it('should handle file with only empty lines', async () => {
4155+
await using fixture = await createFixture({
4156+
'only-empty.jsonl': '\n\n \n\t\n',
4157+
});
4158+
4159+
const lines: string[] = [];
4160+
await processJSONLFileByLine(path.join(fixture.path, 'only-empty.jsonl'), (line) => {
4161+
lines.push(line);
4162+
});
4163+
4164+
expect(lines).toHaveLength(0);
4165+
});
4166+
4167+
it('should process large files (600MB+) without RangeError', async () => {
4168+
// Create a realistic JSONL entry similar to actual Claude data (~283 bytes per line)
4169+
const sampleEntry = JSON.stringify({
4170+
timestamp: '2025-01-10T10:00:00Z',
4171+
message: {
4172+
id: 'msg_01234567890123456789',
4173+
usage: { input_tokens: 1000, output_tokens: 500 },
4174+
model: 'claude-sonnet-4-20250514',
4175+
},
4176+
requestId: 'req_01234567890123456789',
4177+
costUSD: 0.01,
4178+
}) + '\n';
4179+
4180+
// Target 600MB file (this would cause RangeError with readFile in Node.js)
4181+
const targetMB = 600;
4182+
const lineSize = Buffer.byteLength(sampleEntry, 'utf-8');
4183+
const lineCount = Math.ceil((targetMB * 1024 * 1024) / lineSize);
4184+
4185+
// Create fixture directory first
4186+
await using fixture = await createFixture({});
4187+
const filePath = path.join(fixture.path, 'large.jsonl');
4188+
4189+
// Write file using streaming to avoid Node.js string length limit (~512MB)
4190+
// Creating a 600MB string directly would cause "RangeError: Invalid string length"
4191+
const writeStream = createWriteStream(filePath);
4192+
4193+
// Write lines and handle backpressure
4194+
for (let i = 0; i < lineCount; i++) {
4195+
const canContinue = writeStream.write(sampleEntry);
4196+
// Respect backpressure by waiting for drain event
4197+
if (!canContinue) {
4198+
await new Promise<void>(resolve => writeStream.once('drain', () => resolve()));
4199+
}
4200+
}
4201+
4202+
// Ensure all data is flushed
4203+
await new Promise<void>((resolve, reject) => {
4204+
writeStream.end((err?: Error | null) => err ? reject(err) : resolve());
4205+
});
4206+
4207+
// Test streaming processing
4208+
let processedCount = 0;
4209+
await processJSONLFileByLine(filePath, () => {
4210+
processedCount++;
4211+
});
4212+
4213+
expect(processedCount).toBe(lineCount);
4214+
});
4215+
});
40844216
});
40854217
}
40864218

0 commit comments

Comments
 (0)