-
Notifications
You must be signed in to change notification settings - Fork 286
/
client.d
1290 lines (1096 loc) · 41.7 KB
/
client.d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
A simple HTTP/1.1 client implementation.
Copyright: © 2012-2014 Sönke Ludwig
License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
Authors: Sönke Ludwig, Jan Krüger
*/
module vibe.http.client;
public import vibe.core.net;
public import vibe.http.common;
public import vibe.inet.url;
import vibe.core.connectionpool;
import vibe.core.core;
import vibe.core.log;
import vibe.data.json;
import vibe.inet.message;
import vibe.inet.url;
import vibe.stream.counting;
import vibe.stream.tls;
import vibe.stream.operations;
import vibe.stream.wrapper : createConnectionProxyStream;
import vibe.stream.zlib;
import vibe.utils.array;
import vibe.utils.dictionarylist;
import vibe.internal.allocator;
import vibe.internal.freelistref;
import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
import core.exception : AssertError;
import std.algorithm : splitter;
import std.array;
import std.conv;
import std.encoding : sanitize;
import std.exception;
import std.format;
import std.string;
import std.typecons;
import std.datetime;
import std.socket : AddressFamily;
version(Posix)
{
version = UnixSocket;
}
/**************************************************************************************************/
/* Public functions */
/**************************************************************************************************/
@safe:
/**
Performs a synchronous HTTP request on the specified URL.
The requester parameter allows to customize the request and to specify the request body for
non-GET requests before it is sent. A response object is then returned or passed to the
responder callback synchronously.
This function is a low-level HTTP client facility. It will not perform automatic redirect,
caching or similar tasks. For a high-level download facility (similar to cURL), see the
`vibe.inet.urltransfer` module.
Note that it is highly recommended to use one of the overloads that take a responder callback,
as they can avoid some memory allocations and are safe against accidentally leaving stale
response objects (objects whose response body wasn't fully read). For the returning overloads
of the function it is recommended to put a `scope(exit)` right after the call in which
`HTTPClientResponse.dropBody` is called to avoid this.
See_also: `vibe.inet.urltransfer.download`
*/
HTTPClientResponse requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings)
{
return requestHTTP(URL.parse(url), requester, settings);
}
/// ditto
HTTPClientResponse requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings)
{
import std.algorithm.searching : canFind;
bool use_tls = isTLSRequired(url, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
auto res = cli.request(
(scope req){ httpRequesterDg(req, url, settings, requester); },
);
// make sure the connection stays locked if the body still needs to be read
if( res.m_client ) res.lockedConnection = cli;
logTrace("Returning HTTPClientResponse for conn %s", () @trusted { return cast(void*)res.lockedConnection.__conn; } ());
return res;
}
/// ditto
void requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings)
{
requestHTTP(URL(url), requester, responder, settings);
}
/// ditto
void requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings)
{
bool use_tls = isTLSRequired(url, settings);
auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings);
cli.request(
(scope req){ httpRequesterDg(req, url, settings, requester); },
responder
);
assert(!cli.m_requesting, "HTTP client still requesting after return!?");
assert(!cli.m_responding, "HTTP client still responding after return!?");
}
private bool isTLSRequired(in URL url, in HTTPClientSettings settings)
{
version(UnixSocket) {
enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix.");
} else {
enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s).");
}
enforce(url.host.length > 0, "URL must contain a host name.");
bool use_tls;
if (settings.proxyURL.schema !is null)
use_tls = settings.proxyURL.schema == "https";
else
{
version(UnixSocket)
use_tls = url.schema == "https" || url.schema == "https+unix";
else
use_tls = url.schema == "https";
}
return use_tls;
}
private void httpRequesterDg(scope HTTPClientRequest req, in URL url, in HTTPClientSettings settings, scope void delegate(scope HTTPClientRequest req) requester)
{
import std.algorithm.searching : canFind;
import vibe.http.internal.basic_auth_client: addBasicAuth;
if (url.localURI.length) {
assert(url.path.absolute, "Request URL path must be absolute.");
req.requestURL = url.localURI;
}
if (settings.proxyURL.schema !is null)
req.requestURL = url.toString(); // proxy exception to the URL representation
// IPv6 addresses need to be put into brackets
auto hoststr = url.host.canFind(':') ? "["~url.host~"]" : url.host;
// Provide port number when it is not the default one (RFC2616 section 14.23)
if (url.port && url.port != url.defaultPort)
req.headers["Host"] = format("%s:%d", hoststr, url.port);
else
req.headers["Host"] = hoststr;
if ("authorization" !in req.headers && url.username != "")
req.addBasicAuth(url.username, url.password);
if (requester) () @trusted { requester(req); } ();
}
/** Posts a simple JSON request. Note that the server www.example.org does not
exists, so there will be no meaningful result.
*/
unittest {
import vibe.core.log;
import vibe.http.client;
import vibe.stream.operations;
void test()
{
requestHTTP("http://www.example.org/",
(scope req) {
req.method = HTTPMethod.POST;
//req.writeJsonBody(["name": "My Name"]);
},
(scope res) {
logInfo("Response: %s", res.bodyReader.readAllUTF8());
}
);
}
}
/**
Returns a HTTPClient proxy object that is connected to the specified host.
Internally, a connection pool is used to reuse already existing connections. Note that
usually requestHTTP should be used for making requests instead of manually using a
HTTPClient to do so.
*/
auto connectHTTP(string host, ushort port = 0, bool use_tls = false, const(HTTPClientSettings) settings = null)
{
auto sttngs = settings ? settings : defaultSettings;
if (port == 0) port = use_tls ? 443 : 80;
auto ckey = ConnInfo(host, port, use_tls, sttngs.proxyURL.host, sttngs.proxyURL.port, sttngs.networkInterface);
ConnectionPool!HTTPClient pool;
s_connections.opApply((ref c) @safe {
if (c[0] == ckey)
pool = c[1];
return 0;
});
if (!pool) {
logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, sttngs.proxyURL.host, sttngs.proxyURL.port);
pool = new ConnectionPool!HTTPClient({
auto ret = new HTTPClient;
ret.connect(host, port, use_tls, sttngs);
return ret;
});
if (s_connections.full) s_connections.popFront();
s_connections.put(tuple(ckey, pool));
}
return pool.lockConnection();
}
static ~this()
{
foreach (ci; s_connections) {
ci[1].removeUnused((conn) {
conn.disconnect();
});
}
}
private struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; NetworkAddress bind_addr; }
private static vibe.utils.array.FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections;
/**************************************************************************************************/
/* Public types */
/**************************************************************************************************/
/**
Defines an HTTP/HTTPS proxy request or a connection timeout for an HTTPClient.
*/
class HTTPClientSettings {
URL proxyURL;
Duration defaultKeepAliveTimeout = 10.seconds;
/** Timeout for establishing a connection to the server
Note that this setting is only supported when using the vibe-core
module. If using one of the legacy drivers, any value other than
`Duration.max` will emit a runtime warning and connects without a
specific timeout.
*/
Duration connectTimeout = Duration.max;
/// Timeout during read operations on the underyling transport
Duration readTimeout = Duration.max;
/// Forces a specific network interface to use for outgoing connections.
NetworkAddress networkInterface = anyAddress;
/// Can be used to force looking up IPv4/IPv6 addresses for host names.
AddressFamily dnsAddressFamily = AddressFamily.UNSPEC;
/** Allows to customize the TLS context before connecting to a server.
Note that this overrides a callback set with `HTTPClient.setTLSContextSetup`.
*/
void delegate(TLSContext ctx) @safe nothrow tlsContextSetup;
@property HTTPClientSettings dup()
const @safe {
auto ret = new HTTPClientSettings;
ret.proxyURL = this.proxyURL;
ret.connectTimeout = this.connectTimeout;
ret.readTimeout = this.readTimeout;
ret.networkInterface = this.networkInterface;
ret.dnsAddressFamily = this.dnsAddressFamily;
ret.tlsContextSetup = this.tlsContextSetup;
return ret;
}
}
///
unittest {
void test() {
HTTPClientSettings settings = new HTTPClientSettings;
settings.proxyURL = URL.parse("http://proxyuser:proxypass@192.168.2.50:3128");
settings.defaultKeepAliveTimeout = 0.seconds; // closes connection immediately after receiving the data.
requestHTTP("http://www.example.org",
(scope req){
req.method = HTTPMethod.GET;
},
(scope res){
logInfo("Headers:");
foreach (key, ref value; res.headers.byKeyValue) {
logInfo("%s: %s", key, value);
}
logInfo("Response: %s", res.bodyReader.readAllUTF8());
}, settings);
}
}
version (Have_vibe_core)
unittest { // test connect timeout
import std.conv : to;
import vibe.core.stream : pipe, nullSink;
HTTPClientSettings settings = new HTTPClientSettings;
settings.connectTimeout = 50.msecs;
// Use an IP address that is guaranteed to be unassigned globally to force
// a timeout (see RFC 3330)
auto cli = connectHTTP("192.0.2.0", 80, false, settings);
auto timer = setTimer(500.msecs, { assert(false, "Connect timeout occurred too late"); });
scope (exit) timer.stop();
try {
cli.request(
(scope req) { assert(false, "Expected no connection"); },
(scope res) { assert(false, "Expected no response"); }
);
assert(false, "Response read expected to fail due to timeout");
} catch(Exception e) {}
}
unittest { // test read timeout
import std.conv : to;
import vibe.core.stream : pipe, nullSink;
version (VibeLibasyncDriver) {
logInfo("Skipping HTTP client read timeout test due to buggy libasync driver.");
} else {
HTTPClientSettings settings = new HTTPClientSettings;
settings.readTimeout = 50.msecs;
auto l = listenTCP(0, (conn) {
try conn.pipe(nullSink);
catch (Exception e) assert(false, e.msg);
conn.close();
}, "127.0.0.1");
auto cli = connectHTTP("127.0.0.1", l.bindAddress.port, false, settings);
auto timer = setTimer(500.msecs, { assert(false, "Read timeout occurred too late"); });
scope (exit) {
timer.stop();
l.stopListening();
cli.disconnect();
sleep(10.msecs); // allow the read connection end to fully close
}
try {
cli.request(
(scope req) { req.method = HTTPMethod.GET; },
(scope res) { assert(false, "Expected no response"); }
);
assert(false, "Response read expected to fail due to timeout");
} catch(Exception e) {}
}
}
/**
Implementation of a HTTP 1.0/1.1 client with keep-alive support.
Note that it is usually recommended to use requestHTTP for making requests as that will use a
pool of HTTPClient instances to keep the number of connection establishments low while not
blocking requests from different tasks.
*/
final class HTTPClient {
@safe:
enum maxHeaderLineLength = 4096;
private {
Rebindable!(const(HTTPClientSettings)) m_settings;
string m_server;
ushort m_port;
bool m_useTLS;
TCPConnection m_conn;
InterfaceProxy!Stream m_stream;
TLSStream m_tlsStream;
TLSContext m_tls;
static __gshared m_userAgent = "vibe.d/"~vibeVersionString~" (HTTPClient, +http://vibed.org/)";
static __gshared void function(TLSContext) ms_tlsSetup;
bool m_requesting = false, m_responding = false;
SysTime m_keepAliveLimit;
Duration m_keepAliveTimeout;
}
/** Get the current settings for the HTTP client. **/
@property const(HTTPClientSettings) settings() const {
return m_settings;
}
/**
Sets the default user agent string for new HTTP requests.
*/
static void setUserAgentString(string str) @trusted { m_userAgent = str; }
/**
Sets a callback that will be called for every TLS context that is created.
Setting such a callback is useful for adjusting the validation parameters
of the TLS context.
*/
static void setTLSSetupCallback(void function(TLSContext) @safe func) @trusted { ms_tlsSetup = func; }
/**
Sets up this HTTPClient to connect to a specific server.
This method may only be called if any previous connection has been closed.
The actual connection is deferred until a request is initiated (using `HTTPClient.request`).
*/
void connect(string server, ushort port = 80, bool use_tls = false, const(HTTPClientSettings) settings = defaultSettings)
{
assert(!m_conn);
assert(port != 0);
disconnect();
m_conn = TCPConnection.init;
m_settings = settings;
m_keepAliveTimeout = settings.defaultKeepAliveTimeout;
m_keepAliveLimit = Clock.currTime(UTC()) + m_keepAliveTimeout;
m_server = server;
m_port = port;
m_useTLS = use_tls;
if (use_tls) {
m_tls = createTLSContext(TLSContextKind.client);
// this will be changed to trustedCert once a proper root CA store is available by default
m_tls.peerValidationMode = TLSPeerValidationMode.none;
if (settings.tlsContextSetup) settings.tlsContextSetup(m_tls);
else () @trusted { if (ms_tlsSetup) ms_tlsSetup(m_tls); } ();
}
}
/**
Forcefully closes the TCP connection.
Before calling this method, be sure that no request is currently being processed.
*/
void disconnect()
nothrow {
if (m_conn) {
version (Have_vibe_core) {}
else scope(failure) assert(false);
if (m_conn.connected) {
try m_stream.finalize();
catch (Exception e) logDebug("Failed to finalize connection stream when closing HTTP client connection: %s", e.msg);
m_conn.close();
}
if (m_useTLS) () @trusted { return destroy(m_stream); } ();
m_stream = InterfaceProxy!Stream.init;
() @trusted { return destroy(m_conn); } ();
m_conn = TCPConnection.init;
}
}
private void doProxyRequest(T, U)(ref T res, U requester, ref bool close_conn, ref bool has_body)
@trusted { // scope new
import std.conv : to;
import vibe.internal.utilallocator: RegionListAllocator;
version (VibeManualMemoryManagement)
scope request_allocator = new RegionListAllocator!(shared(Mallocator), false)(1024, Mallocator.instance);
else
scope request_allocator = new RegionListAllocator!(shared(GCAllocator), true)(1024, GCAllocator.instance);
res.dropBody();
scope(failure)
res.disconnect();
if (res.statusCode != 407) {
throw new HTTPStatusException(HTTPStatus.internalServerError, "Proxy returned Proxy-Authenticate without a 407 status code.");
}
// send the request again with the proxy authentication information if available
if (m_settings.proxyURL.username is null) {
throw new HTTPStatusException(HTTPStatus.proxyAuthenticationRequired, "Proxy Authentication Required.");
}
m_responding = false;
close_conn = false;
bool found_proxy_auth;
foreach (string proxyAuth; res.headers.getAll("Proxy-Authenticate"))
{
if (proxyAuth.length >= "Basic".length && proxyAuth[0.."Basic".length] == "Basic")
{
found_proxy_auth = true;
break;
}
}
if (!found_proxy_auth)
{
throw new HTTPStatusException(HTTPStatus.notAcceptable, "The Proxy Server didn't allow Basic Authentication");
}
SysTime connected_time;
has_body = doRequestWithRetry(requester, true, close_conn, connected_time);
m_responding = true;
static if (is(T == HTTPClientResponse))
res = new HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time);
else
res = scoped!HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time);
if (res.headers.get("Proxy-Authenticate", null) !is null){
res.dropBody();
throw new HTTPStatusException(HTTPStatus.proxyAuthenticationRequired, "Proxy Authentication Failed.");
}
}
/**
Performs a HTTP request.
`requester` is called first to populate the request with headers and the desired
HTTP method and version. After a response has been received it is then passed
to the caller which can in turn read the reponse body. Any part of the body
that has not been processed will automatically be consumed and dropped.
Note that the `requester` callback might be invoked multiple times in the event
that a request has to be resent due to a connection failure.
Also note that the second form of this method (returning a `HTTPClientResponse`) is
not recommended to use as it may accidentially block a HTTP connection when
only part of the response body was read and also requires a heap allocation
for the response object. The callback based version on the other hand uses
a stack allocation and guarantees that the request has been fully processed
once it has returned.
*/
void request(scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse) responder)
@trusted { // scope new
import vibe.internal.utilallocator: RegionListAllocator;
version (VibeManualMemoryManagement)
scope request_allocator = new RegionListAllocator!(shared(Mallocator), false)(1024, Mallocator.instance);
else
scope request_allocator = new RegionListAllocator!(shared(GCAllocator), true)(1024, GCAllocator.instance);
scope (failure) {
m_responding = false;
disconnect();
}
bool close_conn;
SysTime connected_time;
bool has_body = doRequestWithRetry(requester, false, close_conn, connected_time);
m_responding = true;
auto res = scoped!HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time);
// proxy implementation
if (res.headers.get("Proxy-Authenticate", null) !is null) {
doProxyRequest(res, requester, close_conn, has_body);
}
Exception user_exception;
while (true)
{
try responder(res);
catch (Exception e) {
logDebug("Error while handling response: %s", e.toString().sanitize());
user_exception = e;
}
if (res.statusCode < 200) {
// just an informational status -> read and handle next response
if (m_responding) res.dropBody();
if (m_conn) {
res = scoped!HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time);
continue;
}
}
if (m_responding) {
logDebug("Failed to handle the complete response of the server - disconnecting.");
res.disconnect();
}
assert(!m_responding, "Still in responding state after finalizing the response!?");
if (user_exception || res.headers.get("Connection") == "close")
disconnect();
break;
}
if (user_exception) throw user_exception;
}
/// ditto
HTTPClientResponse request(scope void delegate(HTTPClientRequest) requester)
{
bool close_conn;
SysTime connected_time;
scope (failure) {
m_responding = false;
disconnect();
}
bool has_body = doRequestWithRetry(requester, false, close_conn, connected_time);
m_responding = true;
auto res = new HTTPClientResponse(this, has_body, close_conn, () @trusted { return vibeThreadAllocator(); } (), connected_time);
// proxy implementation
if (res.headers.get("Proxy-Authenticate", null) !is null) {
doProxyRequest(res, requester, close_conn, has_body);
}
return res;
}
private bool doRequestWithRetry(scope void delegate(HTTPClientRequest req) requester, bool confirmed_proxy_auth /* basic only */, out bool close_conn, out SysTime connected_time)
{
if (m_conn && m_conn.connected && Clock.currTime(UTC()) > m_keepAliveLimit){
logDebug("Disconnected to avoid timeout");
disconnect();
}
// check if this isn't the first request on a connection
bool is_persistent_request = m_conn && m_conn.connected;
// retry the request if the connection gets closed prematurely and this is a persistent request
bool has_body;
foreach (i; 0 .. is_persistent_request ? 2 : 1) {
connected_time = Clock.currTime(UTC());
close_conn = false;
has_body = doRequest(requester, close_conn, false, connected_time);
logTrace("HTTP client waiting for response");
if (!m_stream.empty) break;
}
return has_body;
}
private bool doRequest(scope void delegate(HTTPClientRequest req) requester, ref bool close_conn, bool confirmed_proxy_auth = false /* basic only */, SysTime connected_time = Clock.currTime(UTC()))
{
assert(!m_requesting, "Interleaved HTTP client requests detected!");
assert(!m_responding, "Interleaved HTTP client request/response detected!");
m_requesting = true;
scope(exit) m_requesting = false;
if (!m_conn || !m_conn.connected || m_conn.waitForDataEx(0.seconds) == WaitForDataStatus.noMoreData) {
if (m_conn)
disconnect(); // make sure all resources are freed
if (m_settings.proxyURL.host !is null){
enum AddressType {
IPv4,
IPv6,
Host
}
static AddressType getAddressType(string host){
import std.regex : regex, Captures, Regex, matchFirst;
static IPv4Regex = regex(`^\s*((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\s*$`, ``);
static IPv6Regex = regex(`^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$`, ``);
if (!matchFirst(host, IPv4Regex).empty)
{
return AddressType.IPv4;
}
else if (!matchFirst(host, IPv6Regex).empty)
{
return AddressType.IPv6;
}
else
{
return AddressType.Host;
}
}
import std.functional : memoize;
alias findAddressType = memoize!getAddressType;
bool use_dns;
if (() @trusted { return findAddressType(m_settings.proxyURL.host); } () == AddressType.Host)
{
use_dns = true;
}
NetworkAddress proxyAddr = resolveHost(m_settings.proxyURL.host, m_settings.dnsAddressFamily, use_dns,
m_settings.connectTimeout);
proxyAddr.port = m_settings.proxyURL.port;
m_conn = connectTCPWithTimeout(proxyAddr, m_settings.networkInterface, m_settings.connectTimeout);
}
else {
version(UnixSocket)
{
import core.sys.posix.sys.un;
import core.sys.posix.sys.socket;
import std.regex : regex, Captures, Regex, matchFirst, ctRegex;
import core.stdc.string : strcpy;
NetworkAddress addr;
if (m_server[0] == '/')
{
addr.family = AF_UNIX;
sockaddr_un* s = addr.sockAddrUnix();
enforce(s.sun_path.length > m_server.length, "Unix sockets cannot have that long a name.");
s.sun_family = AF_UNIX;
() @trusted { strcpy(cast(char*)s.sun_path.ptr,m_server.toStringz()); } ();
} else
{
addr = resolveHost(m_server, m_settings.dnsAddressFamily, true, m_settings.connectTimeout);
addr.port = m_port;
}
m_conn = connectTCPWithTimeout(addr, m_settings.networkInterface, m_settings.connectTimeout);
} else
{
auto addr = resolveHost(m_server, m_settings.dnsAddressFamily, true, m_settings.connectTimeout);
addr.port = m_port;
m_conn = connectTCPWithTimeout(addr, m_settings.networkInterface, m_settings.connectTimeout);
}
}
if (m_settings.readTimeout != Duration.max)
m_conn.readTimeout = m_settings.readTimeout;
m_stream = m_conn;
if (m_useTLS) {
try m_tlsStream = createTLSStream(m_conn, m_tls, TLSStreamState.connecting, m_server, m_conn.remoteAddress);
catch (Exception e) {
m_conn.close();
m_conn = TCPConnection.init;
throw e;
}
m_stream = m_tlsStream;
}
}
return () @trusted { // scoped
auto req = scoped!HTTPClientRequest(m_stream, m_conn);
if (m_useTLS)
req.m_peerCertificate = m_tlsStream.peerCertificate;
req.headers["User-Agent"] = m_userAgent;
if (m_settings.proxyURL.host !is null){
req.headers["Proxy-Connection"] = "keep-alive";
if (confirmed_proxy_auth)
{
import std.base64;
ubyte[] user_pass = cast(ubyte[])(m_settings.proxyURL.username ~ ":" ~ m_settings.proxyURL.password);
req.headers["Proxy-Authorization"] = "Basic " ~ cast(string) Base64.encode(user_pass);
}
}
else {
req.headers["Connection"] = "keep-alive";
}
req.headers["Accept-Encoding"] = "gzip, deflate";
req.headers["Host"] = m_server;
requester(req);
if (req.httpVersion == HTTPVersion.HTTP_1_0)
close_conn = true;
else if (m_settings.proxyURL.host !is null)
close_conn = req.headers.get("Proxy-Connection", "keep-alive") != "keep-alive";
else
close_conn = req.headers.get("Connection", "keep-alive") != "keep-alive";
req.finalize();
return req.method != HTTPMethod.HEAD;
} ();
}
}
private auto connectTCPWithTimeout(NetworkAddress addr, NetworkAddress bind_address, Duration timeout)
{
version (Have_vibe_core) {
return connectTCP(addr, bind_address, timeout);
} else {
if (timeout != Duration.max)
logWarn("HTTP client connect timeout is set, but not supported by the legacy vibe-d:core module.");
return connectTCP(addr, bind_address);
}
}
/**
Represents a HTTP client request (as sent to the server).
*/
final class HTTPClientRequest : HTTPRequest {
private {
InterfaceProxy!OutputStream m_bodyWriter;
FreeListRef!ChunkedOutputStream m_chunkedStream;
bool m_headerWritten = false;
FixedAppender!(string, 22) m_contentLengthBuffer;
TCPConnection m_rawConn;
TLSCertificateInformation m_peerCertificate;
}
/// private
this(InterfaceProxy!Stream conn, TCPConnection raw_conn)
{
super(conn);
m_rawConn = raw_conn;
}
@property NetworkAddress localAddress() const { return m_rawConn.localAddress; }
@property NetworkAddress remoteAddress() const { return m_rawConn.remoteAddress; }
@property ref inout(TLSCertificateInformation) peerCertificate() inout { return m_peerCertificate; }
/**
Accesses the Content-Length header of the request.
Negative values correspond to an unset Content-Length header.
*/
@property long contentLength() const { return headers.get("Content-Length", "-1").to!long(); }
/// ditto
@property void contentLength(long value)
{
if (value >= 0) headers["Content-Length"] = clengthString(value);
else if ("Content-Length" in headers) headers.remove("Content-Length");
}
/**
Writes the whole request body at once using raw bytes.
*/
void writeBody(RandomAccessStream data)
{
writeBody(data, data.size - data.tell());
}
/// ditto
void writeBody(InputStream data)
{
data.pipe(bodyWriter);
finalize();
}
/// ditto
void writeBody(InputStream data, ulong length)
{
headers["Content-Length"] = clengthString(length);
data.pipe(bodyWriter, length);
finalize();
}
/// ditto
void writeBody(in ubyte[] data, string content_type = null)
{
if( content_type != "" ) headers["Content-Type"] = content_type;
headers["Content-Length"] = clengthString(data.length);
bodyWriter.write(data);
finalize();
}
/**
Writes the request body as JSON data.
*/
void writeJsonBody(T)(T data, bool allow_chunked = false)
{
import vibe.stream.wrapper : streamOutputRange;
headers["Content-Type"] = "application/json; charset=UTF-8";
// set an explicit content-length field if chunked encoding is not allowed
if (!allow_chunked) {
import vibe.internal.rangeutil;
long length = 0;
auto counter = () @trusted { return RangeCounter(&length); } ();
() @trusted { serializeToJson(counter, data); } ();
headers["Content-Length"] = clengthString(length);
}
auto rng = streamOutputRange!1024(bodyWriter);
() @trusted { serializeToJson(&rng, data); } ();
rng.flush();
finalize();
}
/** Writes the request body as form data.
*/
void writeFormBody(T)(T key_value_map)
{
import vibe.inet.webform : formEncode;
import vibe.stream.wrapper : streamOutputRange;
import vibe.internal.rangeutil;
long length = 0;
auto counter = () @trusted { return RangeCounter(&length); } ();
counter.formEncode(key_value_map);
headers["Content-Length"] = clengthString(length);
headers["Content-Type"] = "application/x-www-form-urlencoded";
auto dst = streamOutputRange!1024(bodyWriter);
() @trusted { return &dst; } ().formEncode(key_value_map);
}
///
unittest {
void test(HTTPClientRequest req) {
req.writeFormBody(["foo": "bar"]);
}
}
void writePart(MultiPart part)
{
assert(false, "TODO");
}
/**
An output stream suitable for writing the request body.
The first retrieval will cause the request header to be written, make sure
that all headers are set up in advance.s
*/
@property InterfaceProxy!OutputStream bodyWriter()
{
if (m_bodyWriter) return m_bodyWriter;
assert(!m_headerWritten, "Trying to write request body after body was already written.");
if (httpVersion != HTTPVersion.HTTP_1_0
&& "Content-Length" !in headers && "Transfer-Encoding" !in headers
&& headers.get("Connection", "") != "close")
{
headers["Transfer-Encoding"] = "chunked";
}
writeHeader();
m_bodyWriter = m_conn;
if (headers.get("Transfer-Encoding", null) == "chunked") {
m_chunkedStream = createChunkedOutputStreamFL(m_bodyWriter);
m_bodyWriter = m_chunkedStream;
}
return m_bodyWriter;
}
private void writeHeader()
{
import vibe.stream.wrapper;
assert(!m_headerWritten, "HTTPClient tried to write headers twice.");
m_headerWritten = true;
auto output = streamOutputRange!1024(m_conn);
formattedWrite(() @trusted { return &output; } (), "%s %s %s\r\n", httpMethodString(method), requestURL, getHTTPVersionString(httpVersion));
logTrace("--------------------");
logTrace("HTTP client request:");
logTrace("--------------------");
logTrace("%s", this);
foreach (k, v; headers.byKeyValue) {
() @trusted { formattedWrite(&output, "%s: %s\r\n", k, v); } ();
logTrace("%s: %s", k, v);
}
output.put("\r\n");
logTrace("--------------------");
}
private void finalize()
{
// test if already finalized
if (m_headerWritten && !m_bodyWriter)
return;
// force the request to be sent
if (!m_headerWritten) writeHeader();
else {
bodyWriter.flush();
if (m_chunkedStream) {
m_bodyWriter.finalize();
m_conn.flush();
}
m_bodyWriter = typeof(m_bodyWriter).init;
m_conn = typeof(m_conn).init;
}
}
private string clengthString(ulong len)
{
m_contentLengthBuffer.clear();
() @trusted { formattedWrite(&m_contentLengthBuffer, "%s", len); } ();
return () @trusted { return m_contentLengthBuffer.data; } ();
}
}
/**
Represents a HTTP client response (as received from the server).
*/
final class HTTPClientResponse : HTTPResponse {
@safe:
private {
HTTPClient m_client;
LockedConnection!HTTPClient lockedConnection;
FreeListRef!LimitedInputStream m_limitedInputStream;
FreeListRef!ChunkedInputStream m_chunkedInputStream;
FreeListRef!ZlibInputStream m_zlibInputStream;
FreeListRef!EndCallbackInputStream m_endCallback;
InterfaceProxy!InputStream m_bodyReader;
bool m_closeConn;
int m_maxRequests;
}