Skip to content

Commit 48e2638

Browse files
committed
Support streams
1 parent 121d1c6 commit 48e2638

File tree

13 files changed

+1297
-311
lines changed

13 files changed

+1297
-311
lines changed

Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,18 @@ path = "src/main.rs"
2222
[dependencies]
2323
bytes = "1.10.1"
2424
hostname = "0.4.1"
25-
ext-php-rs = { version = "0.14.0", features = ["embed"] }
26-
http-handler = { git = "https://github.com/platformatic/http-handler.git", branch = "core-async-trait" }
25+
ext-php-rs = { path = "../ext-php-rs", features = ["embed"] }
26+
http-body-util = "0.1"
27+
http-handler = { git = "https://github.com/platformatic/http-handler.git", branch = "streams-and-ws" }
2728
# http-handler = { path = "../http-handler" }
28-
http-rewriter = { git = "https://github.com/platformatic/http-rewriter.git" }
29+
http-rewriter = { git = "https://github.com/platformatic/http-rewriter.git", branch = "stream-and-ws" }
2930
# http-rewriter = { path = "../http-rewriter" }
3031
libc = "0.2.171"
3132
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
32-
napi = { version = "3", default-features = false, features = ["napi4"], optional = true }
33+
napi = { version = "3", default-features = false, features = ["napi4", "tokio_rt", "async"], optional = true }
3334
napi-derive = { version = "3", optional = true }
3435
once_cell = "1.21.0"
35-
tokio = { version = "1.45", features = ["rt", "macros", "rt-multi-thread"] }
36+
tokio = { version = "1.45", features = ["rt", "macros", "rt-multi-thread", "sync"] }
3637
regex = "1.0"
3738

3839
[dev-dependencies]

__test__/request.spec.mjs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ test('minimum construction requirements', (t) => {
1010

1111
t.is(req.method, 'GET')
1212
t.is(req.url, 'http://example.com/test.php')
13-
t.assert(req.body instanceof Buffer)
14-
t.is(req.body.length, 0)
1513
t.assert(req.headers instanceof Headers)
1614
t.is(req.headers.size, 0)
1715
})

__test__/response.spec.mjs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ test('Minimal response construction', (t) => {
99

1010
t.is(res.status, 200)
1111
t.assert(res.headers instanceof Headers)
12-
t.assert(res.body instanceof Buffer)
13-
t.deepEqual(res.body.toString(), '')
1412
t.assert(res.log instanceof Buffer)
1513
t.deepEqual(res.log.toString(), '')
1614
t.is(res.exception, null)
@@ -37,8 +35,6 @@ test('Full Response construction', (t) => {
3735
t.assert(res.headers instanceof Headers)
3836
t.deepEqual(res.headers.get('Content-Type'), 'application/json')
3937
t.deepEqual(res.headers.getAll('Accept'), ['application/json', 'text/plain'])
40-
t.assert(res.body instanceof Buffer)
41-
t.deepEqual(res.body.toString(), json)
4238
t.assert(res.log instanceof Buffer)
4339
t.deepEqual(res.log.toString(), 'Hello, from error_log!')
4440
t.deepEqual(res.exception, 'Hello, from PHP!')

__test__/streaming.spec.mjs

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
import test from 'ava'
2+
3+
import { Php, Request } from '../index.js'
4+
import { MockRoot } from './util.mjs'
5+
6+
test('handleStream - basic response', async (t) => {
7+
const mockroot = await MockRoot.from({
8+
'index.php': `<?php
9+
echo 'Hello, from PHP!';
10+
?>`
11+
})
12+
t.teardown(() => mockroot.clean())
13+
14+
const php = new Php({
15+
docroot: mockroot.path
16+
})
17+
18+
const req = new Request({
19+
method: 'GET',
20+
url: 'http://example.com/index.php'
21+
})
22+
23+
const [res] = await Promise.all([
24+
php.handleStream(req),
25+
req.end()
26+
])
27+
28+
t.is(res.status, 200)
29+
30+
// Collect streaming body
31+
let body = ''
32+
for await (const chunk of res) {
33+
body += chunk.toString('utf8')
34+
}
35+
t.is(body, 'Hello, from PHP!')
36+
})
37+
38+
test('handleStream - chunked output', async (t) => {
39+
const mockroot = await MockRoot.from({
40+
'stream.php': `<?php
41+
echo 'Chunk 1';
42+
flush();
43+
echo 'Chunk 2';
44+
flush();
45+
echo 'Chunk 3';
46+
?>`
47+
})
48+
t.teardown(() => mockroot.clean())
49+
50+
const php = new Php({
51+
docroot: mockroot.path
52+
})
53+
54+
const req = new Request({
55+
method: 'GET',
56+
url: 'http://example.com/stream.php'
57+
})
58+
59+
const [res] = await Promise.all([
60+
php.handleStream(req),
61+
req.end()
62+
])
63+
64+
t.is(res.status, 200)
65+
66+
// Collect all chunks
67+
const chunks = []
68+
for await (const chunk of res) {
69+
chunks.push(chunk.toString('utf8'))
70+
}
71+
72+
// Should have received all chunks
73+
const body = chunks.join('')
74+
t.is(body, 'Chunk 1Chunk 2Chunk 3')
75+
})
76+
77+
test('handleStream - headers available immediately', async (t) => {
78+
const mockroot = await MockRoot.from({
79+
'headers.php': `<?php
80+
header('X-Custom-Header: test-value');
81+
header('Content-Type: application/json');
82+
echo '{"status": "ok"}';
83+
?>`
84+
})
85+
t.teardown(() => mockroot.clean())
86+
87+
const php = new Php({
88+
docroot: mockroot.path
89+
})
90+
91+
const req = new Request({
92+
method: 'GET',
93+
url: 'http://example.com/headers.php'
94+
})
95+
96+
const [res] = await Promise.all([
97+
php.handleStream(req),
98+
req.end()
99+
])
100+
101+
// Headers should be available immediately
102+
t.is(res.status, 200)
103+
t.is(res.headers.get('x-custom-header'), 'test-value')
104+
t.is(res.headers.get('content-type'), 'application/json')
105+
106+
// Body can be consumed after
107+
let body = ''
108+
for await (const chunk of res) {
109+
body += chunk.toString('utf8')
110+
}
111+
t.is(body, '{"status": "ok"}')
112+
})
113+
114+
test('handleStream - POST with buffered body', async (t) => {
115+
const mockroot = await MockRoot.from({
116+
'echo.php': `<?php
117+
$input = file_get_contents('php://input');
118+
echo "Received: " . $input;
119+
?>`
120+
})
121+
t.teardown(() => mockroot.clean())
122+
123+
const php = new Php({
124+
docroot: mockroot.path
125+
})
126+
127+
const req = new Request({
128+
method: 'POST',
129+
url: 'http://example.com/echo.php',
130+
headers: {
131+
'Content-Type': 'text/plain'
132+
},
133+
body: Buffer.from('Hello from client!')
134+
})
135+
136+
const res = await php.handleStream(req)
137+
t.is(res.status, 200)
138+
139+
let body = ''
140+
for await (const chunk of res) {
141+
body += chunk.toString('utf8')
142+
}
143+
t.is(body, 'Received: Hello from client!')
144+
})
145+
146+
test('handleStream - POST with streamed body', async (t) => {
147+
const mockroot = await MockRoot.from({
148+
'echo.php': `<?php
149+
$input = file_get_contents('php://input');
150+
echo "Received: " . $input;
151+
?>`
152+
})
153+
t.teardown(() => mockroot.clean())
154+
155+
const php = new Php({
156+
docroot: mockroot.path
157+
})
158+
159+
const req = new Request({
160+
method: 'POST',
161+
url: 'http://example.com/echo.php',
162+
headers: {
163+
'Content-Type': 'text/plain'
164+
}
165+
})
166+
167+
// Run handleStream and writes concurrently using Promise.all
168+
const [res] = await Promise.all([
169+
php.handleStream(req),
170+
(async () => {
171+
await req.write('Hello ')
172+
await req.write('from ')
173+
await req.write('streaming!')
174+
await req.end()
175+
})()
176+
])
177+
178+
t.is(res.status, 200)
179+
180+
let body = ''
181+
for await (const chunk of res) {
182+
body += chunk.toString('utf8')
183+
}
184+
t.is(body, 'Received: Hello from streaming!')
185+
})
186+
187+
test.skip('handleStream - exception handling', async (t) => {
188+
// TODO: Implement proper exception handling in streaming mode
189+
// See EXCEPTIONS.md for implementation approaches
190+
const mockroot = await MockRoot.from({
191+
'error.php': `<?php
192+
throw new Exception('Test exception');
193+
?>`
194+
})
195+
t.teardown(() => mockroot.clean())
196+
197+
const php = new Php({
198+
docroot: mockroot.path
199+
})
200+
201+
const req = new Request({
202+
method: 'GET',
203+
url: 'http://example.com/error.php'
204+
})
205+
206+
const res = await php.handleStream(req)
207+
208+
// Exception should be sent through the stream
209+
let errorOccurred = false
210+
try {
211+
for await (const chunk of res) {
212+
// Should not receive chunks, should throw
213+
}
214+
} catch (err) {
215+
errorOccurred = true
216+
t.true(err.message.includes('Exception'))
217+
}
218+
219+
t.true(errorOccurred, 'Exception should be thrown during iteration')
220+
})
221+
222+
test('handleStream - empty response', async (t) => {
223+
const mockroot = await MockRoot.from({
224+
'empty.php': `<?php
225+
// No output
226+
?>`
227+
})
228+
t.teardown(() => mockroot.clean())
229+
230+
const php = new Php({
231+
docroot: mockroot.path
232+
})
233+
234+
const req = new Request({
235+
method: 'GET',
236+
url: 'http://example.com/empty.php'
237+
})
238+
239+
const [res] = await Promise.all([
240+
php.handleStream(req),
241+
req.end()
242+
])
243+
244+
t.is(res.status, 200)
245+
246+
let body = ''
247+
for await (const chunk of res) {
248+
body += chunk.toString('utf8')
249+
}
250+
t.is(body, '')
251+
})

0 commit comments

Comments
 (0)