-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.clj
1954 lines (1876 loc) · 93 KB
/
message.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(ns frereth.cp.message
"Translation of curvecpmessage.c
This is really a generic buffer program
The \"parent\" child/server reads/writes to pipes that this provides,
in a specific (and apparently undocumented) communications protocol.
This, in turn, reads/writes data from/to a child that it spawns.
I keep wanting to think of this as a simple transducer and just
skip the buffering pieces, but they (and the flow control) are
really the main point."
(:require [clojure.pprint :refer (cl-format pprint)]
[clojure.spec.alpha :as s]
[frereth.cp.message
[constants :as K]
[flow-control :as flow-control]
[from-child :as from-child]
[from-parent :as from-parent]
[helpers :as help]
[specs :as specs]
[to-child :as to-child]
[to-parent :as to-parent]]
[frereth.cp.shared :as shared]
[frereth.cp.shared
[bit-twiddling :as b-t]
[crypto :as crypto]
[specs :as shared-specs]
[util :as utils]]
[frereth.weald
[logging :as log]
[specs :as weald]]
[manifold
[deferred :as dfrd]
[executor :as exec]
[stream :as strm]])
(:import [clojure.lang ExceptionInfo IDeref PersistentQueue]
[io.netty.buffer ByteBuf Unpooled]
[java.io IOException PipedInputStream PipedOutputStream]
java.util.concurrent.TimeoutException))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Magic constants
(def child-buffer-timeout
"How long might we block child->"
;; This is far too long. Unfortunately, my event loop
;; is currently very slow.
1000)
(def max-child-buffer-size
"Maximum message blocks from parent to child that we'll buffer before dropping
must be power of 2 -- DJB"
64)
(def write-from-parent-timeout
"milliseconds before we give up on writing a packet from parent to child"
5000)
(def write-from-child-timeout
"milliseconds before we give up on writing a packet from child to parent"
5000)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Specs
(s/def ::action-timing-details (s/keys :req [::actual-next
::delta_f
::scheduling-time]))
(s/def ::next-action-time nat-int?)
(s/def ::now nat-int?)
(s/def ::source-tags #{::child-> ::parent-> ::query-state})
(s/def ::input (s/tuple ::source-tags bytes?))
(s/def ::action-tag #{::specs/child->
::drained
::no-op
::parent->
::query-state
::timed-out})
;; Q: How do I spec these out?
;; Since we really have 0, 1, or 2 arguments
;; A: s/or seems like the most likely approach.
;; TODO: Nail this down
(s/def ::next-action (s/tuple ::action-tag))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; Internal API
(s/fdef build-un-ackd-blocks
:args (s/cat :log-state ::weald/state
:logger ::weald/logger)
:ret ::specs/un-ackd-blocks)
(defn build-un-ackd-blocks
[{:keys [::weald/logger]
log-state ::weald/state}]
(sorted-set-by (fn [x y]
(try
(let [x-time (::specs/time x)
y-time (::specs/time y)]
(if (= x-time y-time)
;; Assume that we never have multiple messages with the
;; same timestamp.
;; We do hit this case when disj is trying to
;; remove an existing block.
0
(if (< x-time y-time)
-1
1)))
(catch NullPointerException ex
(let [log-state (log/exception (log/clean-fork log-state ::failed)
ex
::build-un-ackd-blocks
"Comparing time"
{::lhs x
::rhs y})]
(log/flush-logs! logger log-state))
(throw ex))))))
;;;; Q: what else is in the reference implementation?
;;;; A: Lots. Scattered around quite a bit now
;;;; FIXME: Move these to the functions where I've implemented them.
;;; 186-654 main
;;; 186-204 boilerplate
;;; 260-263 set up some globals
;;; 264-645 main event loop
;;; 645-651 wait on children to exit
;;; 652-653 exit codes
;;; 264-645 main event loop
;;; 263-269: exit if done
;;; 271-306: Decide what and when to poll, based on global state
;;; This next piece seems like it deserves a prominent place.
;;; But, really, it needs to be deeply hidden from the end-programmer.
;;; I want it to be easy for me to change out and swap around, but
;;; that means no one else should ever really even need to know that
;;; it happens (no matter how vital it obviously is)
;;; 307-318: Poll for incoming data
;;; 317 XXX: keepalives
;;; 444-609 handle this message if it's comprehensible: (DJB)
;;; (in more depth)
;;; 445-450: boilerplate
;;; 452-453: short-circuiting
;;; 455: extract ID
;;; 456-542: loop over range(blocknum)
;;; This looks like the "real" Chicago algorithm
;;; 544-560: Call acknowledged() several times
;;; This is for several byte ranges in the incoming message
;;; 0 to the 8-byte offset starting at byte 8
;;; Then we have pairs of start/stop offsets
;;; where each start is from the previous stop
;;; This seems a bit tedious/inefficient, esp. since
;;; lines 396-401 seem to ignore everything except
;;; the blockid[pos] at position 8 (aka 0) and then
;;; positions 46 and 48.
;;; Those are really the length at position 38 and initial
;;; offset at position 40, thanks to starting the send
;;; with length/16 at offset 7.
;;; 589-593: increment the receivebytes count
;;; 595: never acknowledge a pure acknowledgment (DJB)
;;; This short-circuits if something extracts to 0
;;; (looks like this is the first of the message structure,
;;; which would be the message ID)
;;; 597: /* XXX: delay acknowledgments */ (DJB)
;;; 598-608: Looks like this just ACKs everything
;;; Evidence:
;;; 606: /* XXX: incorporate selective acknowledgents */ (DJB)
;;; 634-643: try closing pipe to child: (DJB)
;;; Well, maybe. If we're done with it
(s/fdef trigger-output
:args (s/cat :io-handle ::specs/io-handle
:state ::specs/state)
:ret ::specs/state)
(defn trigger-output
[{:keys [::specs/to-child]
:as io-handle}
{{:keys [::specs/next-action]} ::specs/flow-control
:keys [::specs/message-loop-name]
:as state}]
(let [state (update state
::weald/state
#(log/debug %
::trigger-output
"Possibly sending message to parent"
{::specs/message-loop-name message-loop-name}))]
;; I have at least 1 unit test that receives input
;; from parent, forwards that to child, then
;; echoes it back.
;; Then it calls trigger-output, doesn't find
;; anything ready to go, loops back to polling
;; for events, finds the message the child
;; just queued, and starts over.
;; It's very tempting to try to account for
;; that scenario here, but it would involve
;; enough extra stateful contortions.
;; This is a scenario when it seems like it would
;; be nice to be able to peek into io-handle's stream.
;; If a message from the child just got buffered, it
;; would be nice to move it from there into the outbound
;; queue.
;; Actually, there's probably a useful clue right there:
;; It would be nice to have 1 queue for the outer API,
;; which is what schedule-next-event! is looping around.
;; And then multiple queues for the specifics, like
;; incoming from child vs. parent vs. a timeout triggering
;; a resend.
;; Maybe in a future version.
;; It doesn't seem to make any sense to call this if
;; we were triggered by a message coming in from
;; the parent.
;; Even if there pending blocks are ready to
;; send, outgoing messages are throttled by
;; the flow-control logic.
;; Likewise, there isn't a lot of sense in
;; calling it from the child, due to the same
;; throttling issues.
;; This really only makes sense when the
;; timer triggers to let us know that it's
;; OK to send a new message.
;; *However*:
;; The timeout on that may completely change
;; when the child schedules another send,
;; or a message arrives from parent to
;; update the RTT.
;; I've seen this happen: child sends two
;; messages in quick succession. We refuse
;; to send the second until the first gets
;; an ACK. Once the ACK does arrive, then
;; this can send the second without going
;; through the scheduling process again.
;; At the moment, that part's slow enough
;; for this to be a noticeable win.
(to-parent/maybe-send-block! io-handle
(assoc state
::specs/recent
(System/nanoTime)))))
(s/fdef trigger-from-child
:args (s/cat :io-handle ::specs/io-handle
:callback (s/fspec :args (s/cat :state ::specs/state)
:ret ::specs/state)
:accepted? dfrd/deferrable?
:state ::specs/state)
:ret ::specs/state)
(defn trigger-from-child
[io-handle
callback
^IDeref accepted?
{{:keys [::specs/strm-hwm]
:as outgoing} ::specs/outgoing
:keys [::specs/message-loop-name]
:as state}]
{:pre [callback]}
(let [state (update state
::weald/state
#(log/info %
::trigger-from-child
"Sent stream address"
{::specs/strm-hwm strm-hwm
::specs/message-loop-name message-loop-name}))]
(deliver accepted? true)
(let [state (callback state)]
(assert (::specs/outgoing state) "Callback threw away outgoing")
;; TODO: check whether we can do output yet.
;; It's pointless to call this if we just have
;; to wait for the timer to expire.
(trigger-output io-handle state))))
(s/fdef trigger-from-parent
:args (s/cat :io-handle ::specs/io-handle
:array-o-bytes bytes?
:specs/state ::specs/state)
:ret ::specs/state)
(defn trigger-from-parent
"Message block arrived from parent. Work triggered by ioloop"
;; TODO: Move as much of this as possible into from-parent
;; The only reason I haven't already moved the whole thing
;; is that we need to use to-parent to send the ACK, and I'd
;; really rather not introduce dependencies between those namespaces
[{:keys [::weald/logger
::specs/message-loop-name]
:as io-handle}
^bytes message
{{:keys [::specs/->child-buffer]} ::specs/incoming
{:keys [::specs/client-waiting-on-response]} ::specs/flow-control
log-state ::weald/state
:as state}]
(let [log-state (log/debug log-state
::trigger-from-parent
"Incoming from parent")]
;; This is an important side-effect that permanently converts the
;; "mode" of the i/o loop that's pulling bytes from the child's
;; output pipe.
;; Now that we've gotten a response back, we can switch from
;; initiate packets to message packets, which effectively doubles
;; the signal bandwidth.
(when-not (realized? client-waiting-on-response)
(deliver client-waiting-on-response false))
;;; From parent (over watch8)
;;; 417-433: for loop from 0-bytes read
;;; Copies bytes from incoming message buffer to message[][]
(let [incoming-size (count message)]
(when (zero? incoming-size)
(log/flush-logs! logger log-state)
;; This is supposed to kill the entire process
;; TODO: Be more graceful
(throw (AssertionError. "Bad Message")))
;; Reference implementation is really reading bytes from
;; a stream.
;; It reads the first byte to get the length of the block,
;; pulls the next byte from the stream, calculates the stream
;; length, double-checks for failure conditions, and then copies
;; the bytes into the last spot in the global message array
;; (assuming that array/buffer hasn't filled up waiting for the
;; client to process it).
;; I'm going to take a simpler and easier approach, at least for
;; the first pass.
;; trigger-from-parent! is expecting to have a ::->child-buffer key
;; that's really a vector that we can just conj onto.
(when-not state
;; Q: Why aren't I just using log-state?
(let [logs (log/warn (log/init (::weald/context log-state)
(::weald/lamport log-state))
::trigger-from-parent
;; They're about to get worse
"nil state. Things went sideways recently")]
(log/flush-logs! logger logs)))
(if (< (count ->child-buffer) max-child-buffer-size)
;; Q: Will ->child-buffer ever have more than one array?
;; It would be faster to skip the map/reduce
;; TODO: Try switching to the reducers version instead, to
;; run this in parallel
(let [previously-buffered-message-bytes (reduce + 0
(map (fn [^bytes buf]
(try
(count buf)
(catch UnsupportedOperationException ex
(let [prelog (utils/pre-log message-loop-name)]
(throw (ex-info (str prelog
"Parent sent a "
(class buf)
" which isn't a B]")
{::cause ex}))))))
->child-buffer))
log-state (log/debug log-state
::trigger-from-parent
"possibly processing"
{::bytes-buffered previously-buffered-message-bytes
::buffer-count (count ->child-buffer)})]
;; Probably need to do something with previously-buffered-message-bytes.
;; Definitely need to check the number of bytes that have not
;; been forwarded along yet.
;; However, the reference implementation does not.
;; Then again...it's basically a self-enforcing
;; 64K buffer, so maybe it's already covered, and I just wasted
;; CPU cycles calculating it.
(if (<= incoming-size K/max-msg-len)
(let
;; It's tempting to move as much as possible from here
;; into the (now defunct) agent handler.
;; That impulse seems wrong. Based on preliminary numbers,
;; any filtering I can do outside an an agent send is a win.
;; TODO: Now that the manifold version is working, revisit
;; that decision.
[log-state (log/debug log-state
::trigger-from-parent
"Message is small enough. Look back here")
state (-> state
(assoc ::weald/state log-state)
(assoc-in [::specs/incoming ::specs/parent->buffer]
message))]
;; This is basically an iteration of the top-level
;; event-loop handler from main().
;; I can skip the pieces that only relate to reading
;; from the child, because I'm using an active callback
;; approach, and this was triggered by a block of
;; data coming from the parent.
;; However, there *is* the need to handle packets that the
;; child has buffered up to send to the parent.
;; Except that we can't do this here/now. That part's
;; limited by the flow-control logic (handled by the
;; timer) and the callbacks arriving from the child.
;; It seems like we probably should cancel/reschedule,
;; since whatever ACK just arrived might adjust the RTT
;; logic.
(try
;; This is a prime example of something that should
;; be queued up to be called for side-effects.
;; TODO: Split those out and make that happen.
(as-> (from-parent/try-processing-message!
io-handle
state) state'
(to-child/forward! io-handle state')
;; This will update recent.
;; In the reference implementation, that happens immediately
;; after trying to read from the child.
;; Q: Am I setting up any problems for myself by waiting
;; this long?
;; i.e. Is it worth doing that at the top of the trigger
;; functions instead?
(trigger-output io-handle state'))
(catch ExceptionInfo ex
(let [log-state (log/exception log-state
ex
::trigger-from-parent
"Forwarding failed"
(.getData ex))]
(assoc state ::weald/state log-state)))
(catch RuntimeException ex
(let [msg "Trying to cope with a message arriving from parent"]
(update state
::weald/state
#(log/exception %
ex
::trigger-from-parent
msg))))))
;; This is actually pretty serious.
;; All sorts of things had to go wrong for us to get here.
;; TODO: More extensive error handling.
;; Actually, should probably add an optional client-supplied
;; error handler for situations like this
(assoc state
::weald/state
(log/warn log-state
::trigger-from-parent
"Incoming message too large"
{::incoming-size incoming-size
::maximum-allowed K/max-msg-len}))))
;; TODO: Need a way to apply back-pressure
;; to child
(assoc state
::weald/state
(log/warn log-state
::trigger-from-parent
"Child buffer overflow\nWait!"
{::incoming-buffer-size (count ->child-buffer)
::max-allowed max-child-buffer-size}))))))
(defn trigger-from-timer
[io-handle
{:keys [::specs/message-loop-name]
:as state}]
;; It's really tempting to move this to to-parent.
;; But (at least in theory) it could also trigger
;; output to-child.
;; So leave it be for now.
;; I keep thinking that I need to check data arriving from
;; the child, but the main point to this logic branch is
;; to resend an outbound block that hasn't been ACK'd yet.
(trigger-output io-handle (update state
::weald/state
#(log/debug %
::trigger-from-timer
"I/O triggered by timer"
{::specs/message-loop-name message-loop-name}))))
(s/fdef condensed-choose-next-scheduled-time
:args (s/cat :outgoing ::specs/outgoing
:state ::specs/state
:to-child-done? ::specs/to-child-done?)
:ret (s/keys :req [::next-action-time
::weald/state]))
(defn condensed-choose-next-scheduled-time
[{{:keys [::specs/n-sec-per-block
::specs/rtt-timeout]} ::specs/flow-control
{:keys [::specs/->child-buffer
::specs/gap-buffer]} ::specs/incoming
{:keys [::specs/ackd-addr
::specs/earliest-time
::specs/last-block-time
::specs/send-eof
::specs/un-sent-blocks
::specs/un-ackd-blocks
::specs/want-ping]
:as outgoing} ::specs/outgoing
:keys [::specs/message-loop-name
::specs/recent]
log-state ::weald/state
:as state}
to-child-done?]
;;; This amounts to lines 286-305
;; I should be able to just completely bypass this if there's
;; more new data pending.
;; TODO: Figure out how to make that work
;; Bigger issue:
;; This scheduler is so aggressive at waiting for an initial
;; message from the child that it takes 10 ms for the agent
;; send about it to actually get through the queue
;; Spinning around fast-idling while I'm doing nothing is
;; stupid.
;; And it winds up scheduling into the past, which leaves
;; this triggering every millisecond.
;; We can get pretty good turn-around time in memory,
;; but this part...actually, if we could deliver a message
;; and get an ACK in the past, that would be awesome.
;; TODO: Be smarter about the timeout.
(let [now (System/nanoTime)
min-resend-time (+ last-block-time n-sec-per-block)
un-ackd-count (count un-ackd-blocks)
un-sent-count(count un-sent-blocks)
default-next (+ recent (utils/seconds->nanos 60)) ; by default, wait 1 minute
send-eof-processed (to-parent/send-eof-buffered? outgoing)
rtt-resend-time (+ earliest-time rtt-timeout)
next-time
(cond-> default-next
;; The first clause is weird. 1 second is always going to happen more
;; quickly than the 1 minute initial default.
;; Sticking with the min pattern because of the way the threading macro works
(= want-ping ::specs/second-1) (min (+ recent (utils/seconds->nanos 1)))
(= want-ping ::specs/immediate) (min min-resend-time)
;; If the outgoing buffer is not full
;; And:
;; If sendeof, but not sendeofprocessed
;; else (!sendeof):
;; if there are buffered bytes that have not been sent yet
;; Lines 290-292
;; Q: What is the actual point to this?
;; (the logic seems really screwy, but that's almost definitely
;; a lack of understanding on my part)
;; A: There are at least 3 different moving parts involved here
;; 1. Are there unsent blocks that need to be sent?
;; 2. Do we have previously sent blocks that might need to re-send?
;; 3. Have we sent an un-ACK'd EOF?
(and (< (+ un-ackd-count
un-sent-count)
K/max-outgoing-blocks)
(if (not= ::specs/false send-eof)
(not send-eof-processed)
(pos? un-sent-count))) (min min-resend-time)
;; Lines 293-296
(and (not= 0 un-ackd-count)
(> rtt-resend-time
min-resend-time)) (min rtt-resend-time)
;; There's one last caveat, from 298-300:
;; It all swirls around watchtochild, which gets set up
;; between lines 276-279.
;; Basic point:
;; If there are incoming messages, but the pipe to child is closed,
;; short-circuit so we can exit.
;; That seems like a fairly major error condition.
;; Q: What's the justification?
;; Hypothesis: It's based around the basic idea of
;; being lenient about accepting garbage.
;; This seems like the sort of garbage that would be
;; worth capturing for future analysis.
;; Then again...if extra UDP packets arrive out of order,
;; it probably isn't all *that* surprising.
;; Still might be worth tracking for the sake of security.
(and (not= 0 (+ (count gap-buffer)
(count ->child-buffer)))
;; This looks backward. It isn't.
;; If there are bytes to forward to the
;; child, and the pipe is still open, then
;; try to send them.
;; However, the logic *is* broken:
;; The check for gap-buffer really needs
;; to be based around closed gaps
(not (realized? to-child-done?))) 0)]
;; Lines 302-305
{::next-action-time (max recent next-time)
::weald/state log-state}))
(s/fdef choose-next-scheduled-time
:args (s/cat :state ::specs/state
:to-child-done? ::specs/to-child-done?)
:ret (s/keys :req [::next-action-time
::weald/state]))
(defn choose-next-scheduled-time
[{{:keys [::specs/n-sec-per-block
::specs/rtt-timeout]
:as flow-control} ::specs/flow-control
{:keys [::specs/->child-buffer
::specs/gap-buffer]} ::specs/incoming
{:keys [::specs/earliest-time
::specs/last-block-time
::specs/send-eof
::specs/un-sent-blocks
::specs/un-ackd-blocks
::specs/want-ping]
:as outgoing} ::specs/outgoing
:keys [::specs/message-loop-name
::specs/recent]
log-state ::weald/state
:as state}
to-child-done?]
{:pre [flow-control
last-block-time
n-sec-per-block
outgoing
state]}
(println "choose-next-scheduled-time for")
(pprint state)
(when-not log-state
(throw (ex-info (str "Missing " ::weald/state)
{::among (keys state)
::full state})))
;;; This amounts to lines 286-305
;; I should be able to just completely bypass this if there's
;; more new data pending.
;; TODO: Figure out how to make that work
(let [now (System/nanoTime)
;; TODO: Switch to the alt version using cond->
;; But first, see whether the performance diffence
;; goes away if I just eliminate all the logging
min-resend-time (+ last-block-time n-sec-per-block)
prelog (utils/pre-log message-loop-name)
default-next (+ recent (utils/seconds->nanos 60)) ; by default, wait 1 minute
;; Lines 286-289
next-based-on-ping (case want-ping
::specs/false default-next
::specs/immediate (min default-next min-resend-time)
;; Go with the assumption that this matches wantping 1 in the original
;; I think the point there is for the
;; client to give the server 1 second to start up
::specs/second-1 (+ recent (utils/seconds->nanos 1)))
;; Lines 290-292
;; Q: What is the actual point to this?
;; (the logic seems really screwy, but that's almost definitely
;; a lack of understanding on my part)
;; A: There are at least 3 different moving parts involved here
;; 1. Are there unsent blocks that need to be sent?
;; 2. Do we have previously sent blocks that might need to re-send?
;; 3. Have we sent an un-ACK'd EOF?
un-ackd-count (count un-ackd-blocks)
un-sent-count(count un-sent-blocks)
send-eof-processed (to-parent/send-eof-buffered? outgoing)
;; Strange things happen once EOF gets set. This goes into
;; a much tighter loop, but we can't send messages that
;; quickly.
;; FIXME: Do a better job coordinating the scheduling.
next-based-on-eof (if (and (< (+ un-ackd-count
un-sent-count)
K/max-outgoing-blocks)
(if (not= ::specs/false send-eof)
(not send-eof-processed)
(pos? un-sent-count)))
(min next-based-on-ping min-resend-time)
next-based-on-ping)
;; Lines 293-296
rtt-resend-time (+ earliest-time rtt-timeout)
next-based-on-earliest-block-time (if (and (not= 0 un-ackd-count)
(> rtt-resend-time
min-resend-time))
(min next-based-on-eof rtt-resend-time)
next-based-on-eof)
;; There's one last caveat, from 298-300:
;; It all swirls around watchtochild, which gets set up
;; between lines 276-279.
;; Basic point:
;; If there are incoming messages, but the pipe to child is closed,
;; short-circuit so we can exit.
;; That seems like a fairly major error condition.
;; Q: What's the justification?
;; Hypothesis: It's based around the basic idea of
;; being lenient about accepting garbage.
;; This seems like the sort of garbage that would be
;; worth capturing for future analysis.
;; Then again...if extra UDP packets arrive out of order,
;; it probably isn't all *that* surprising.
;; Still might be worth tracking for the sake of security.
based-on-closed-child (if (and (not= 0 (+ (count gap-buffer)
(count ->child-buffer)))
(not (realized? to-child-done?)))
;; This looks backward. It isn't.
;; If there are bytes to forward to the
;; child, and the pipe is still open, then
;; try to send them.
;; However, the logic *is* broken:
;; The check for gap-buffer really needs
;; to be based around closed gaps
0
next-based-on-earliest-block-time)
;; Lines 302-305
actual-next (max based-on-closed-child recent)
mid1-time (System/nanoTime)
;; TODO: Just build log-message in one fell swoop instead
;; of all these individual steps.
;; Give the JIT something to work with.
log-message (cl-format nil
(str "Minimum resend time: ~:d\n"
"which is ~:d nanoseconds\n"
"after last block time ~:d.\n"
"Recent was ~:d ns in the past\n"
"rtt-timeout: ~:d\n"
"earliest-time: ~:d")
min-resend-time
n-sec-per-block
;; I'm calculating last-block-time
;; incorrectly, due to a misunderstanding
;; about the name.
;; It should really be the value of
;; recent, set immediately after
;; I send a block to parent.
last-block-time
(- now recent)
rtt-timeout
earliest-time)
log-message (str log-message (cl-format nil
"\nDefault +1 minute: ~:d from recent: ~:d\nScheduling based on want-ping value ~a"
default-next
recent
want-ping))
log-message (str log-message (cl-format nil
"\nBased on ping settings, adjusted next time to: ~:d"
next-based-on-ping))
log-message (str log-message
"\nEOF/unsent criteria:\nun-ackd-count: "
un-ackd-count
"\nun-sent-count: "
un-sent-count
"\nsend-eof: "
send-eof
"\nsend-eof-processed: "
send-eof-processed
(cl-format nil
"\nDue to EOF status: ~:d"
next-based-on-eof))
log-message (str log-message
(cl-format nil
"\nAdjusted for RTT: ~:d"
(long next-based-on-earliest-block-time)))
log-message (str log-message
(cl-format nil
"\nAfter adjusting for closed/ignored child watcher: ~:d"
(long based-on-closed-child)))
end-time (System/nanoTime)
log-state (log/debug log-state
::choose-next-scheduled-time
(str "Scheduling considerations\n"
log-message))]
{::next-action-time actual-next
::weald/state log-state}))
(declare schedule-next-timeout!)
;; FIXME: This seems like it really should be debug only
(let [interrupted (atom nil)]
(defn interrupted?
[]
@interrupted)
(defn interrupt!
[]
(println "FIXME: Debug only\nInterrupting ioloop manually")
(reset! interrupted true))
(defn clear-interrupt!
[]
(println "Unblocking ioloop manually")
(reset! interrupted false))
(comment
(interrupt!)
(interrupted?)
(clear-interrupt!)
)
(s/fdef action-trigger!
:args (s/cat :timing-details ::action-timing-details
:io-handle ::specs/io-handle
:state ::specs/state
:log-state-atom ::shared-specs/atom
:next-action ::next-action)
:ret any?)
(defn action-trigger!
"Some action triggered the main ioloop"
;; This function is way too long.
;; FIXME: refactor it into smaller pieces
[{:keys [::actual-next
::delta_f
::scheduling-time]
:as timing-details}
{:keys [::specs/message-loop-name]
:as io-handle}
{:keys [::specs/outgoing]
:as state}
log-state-atom
;; This is a variant that consists of a [tag callback] pair
;; It's tempting to destructure this here.
;; That makes the code a little more concise,
;; and easier to read. But it also makes
;; error handling more difficult.
;; I've had enough trouble getting and keeping
;; this correct that I want to retain this more
;; verbose approach, at least until the entire
;; thing settles down a bit.
;; Besides, I don't actually know what's in
;; success until I check the tag.
;; So I could destructure it here as [tag & args],
;; then destructure args later. But that makes
;; ita less obvious a win.
next-action]
{:pre [outgoing]}
(swap! log-state-atom log/do-sync-clock)
(let [now (System/nanoTime) ; It's tempting to just use millis, but that throws off recent
;; Line 337
;; Doing this now instead of after trying to receive data from the
;; child seems like a fairly significant change from the reference
;; implementation.
;; TODO: Compare with other higher-level implementations
;; TODO: Ask cryptographers and protocol experts whether this is
;; a choice I'll really regret
state (assoc state ::specs/recent now)
log-state @log-state-atom
;; This is used during exception handling
;; because that might happengg873 on a different thread
prelog (utils/pre-log message-loop-name)
fmt (str "Awakening event loop that was sleeping for ~g ms "
"after ~:d at ~:d\n"
"at ~:d because: ~a")
log-state (try
(log/debug log-state
::action-trigger!
(cl-format nil
fmt
delta_f
scheduling-time
(or actual-next -1)
now
next-action)
{::specs/message-loop-name message-loop-name})
(catch NullPointerException ex
(log/exception log-state
ex
::action-trigger!
"Error building the event loop Awakening message"
{::delta_f delta_f
::scheduling-time scheduling-time
::actual-next actual-next
::now now
::next-action next-action
::trigger-details prelog
::specs/message-loop-name message-loop-name}))
(catch NumberFormatException ex
(log/exception log-state
ex
::action-trigger!
"Error formatting the event loop Awakening message"
{::delta_f delta_f
::scheduling-time scheduling-time
::actual-next actual-next
::now now
::next-action next-action
::trigger-details prelog
::specs/message-loop-name message-loop-name})))
[tag
log-state] (try
[(first next-action) log-state]
(catch IllegalArgumentException ex
[::no-op
(log/exception log-state
ex
::action-trigger!
"Should have been a variant"
{::trigger-details prelog
::specs/message-loop-name message-loop-name})]))
;; TODO: Really should add something like an action ID to the state
;; to assist in tracing how data flows. flow-control seems like a very
;; likely place to put it.
updater (case tag
;; Q: Is this worth switching to something like core.match or a multimethod?
::specs/child-> (let [[_ callback ack] next-action]
(partial trigger-from-child io-handle callback ack))
::drained (fn [{log-state ::weald/state
:as state}]
;; Actually, this seems like a strong argument for
;; having a pair of 1-way streams (as opposed to
;; a bi-directional one)
;; Child could still have
;; bytes to send to the parent after the latter's
;; stopped sending, or vice versa.
;; I'm pretty sure the complexity I haven't finished
;; translating stems from that case.
;; TODO: Another piece to revisit once the basics
;; work.
(update state
::weald/state
#(log/warn %
::action-trigger!
"Stream closed. Surely there's more to do"
{::trigger-details prelog
::specs/message-loop-name message-loop-name})))
::no-op identity
;; Q: Shouldn't this be from the specs ns?
;; Or maybe ::child-> should move here.
;; Either probably makes sense. It just needs
;; to be consistent.
::parent-> (partial trigger-from-parent
io-handle
(second next-action))
;; This can throw off the timer, since we're basing the delay on
;; the delta from recent (which doesn't change) rather than now.
;; But we're basing the actual delay from now, which does change.
;; e.g. If the scheduled delay is 980 ms, and someone triggers a
;; query-state that takes 20 ms after 20 ms, the new delay will
;; still be 980 ms rather than the 940 that would have been
;; appropriate.
;; Q: What's the best way to avoid this?
;; Updating recent seems obvious, but also dubious.
;; Decrementing the delay seems like something the scheduler
;; should handle.
::query-state (fn [state]
(if-let [dst (second next-action)]
(do
(deliver dst state)
state)
(update state
::weald/state
#(log/warn %
::action-trigger!
"state-query request missing required deferred"
{::trigger-details prelog
::specs/message-loop-name message-loop-name}))))
::reset-parent-callback (fn [state]
(-> state
(update ::weald/state
#(log/warn %
::action-trigger!
"Changing parent-callback"))
(assoc ::specs/->parent (second next-action))))
::timed-out (fn [state]
(trigger-from-timer io-handle
(update state
::weald/state
#(log/debug %
"Re-triggering Output due to timeout"
(assoc timing-details
::trigger-details prelog
::specs/message-loop-name message-loop-name))))))
state (assoc state
::weald/state
(log/debug log-state
::action-trigger!
"Processing event"
{::tag tag
::specs/message-loop-name message-loop-name}))
;; At the end of the main ioloop in the reference
;; implementation, there's a block that closes the pipe
;; to the child if we're done.
;; I think the point is that we'll quit polling on
;; that and start short-circuiting out of the blocks
;; that might do the send, once we've hit EOF
;; Q: What can I do here to
;; produce the same effect?
;; TODO: Worry about that once the basic idea works.
;; If the child sent a big batch of data to go out
;; all at once, don't waste time setting up a timeout
;; scheduler. The poll in the original would have
;; returned immediately anyway.
;; Except that n-sec-per-block puts a hard limit on how
;; fast we can send.
start (System/currentTimeMillis)
;; TODO: Break these pieces into something
;; like the interceptor-chain idea. They should
;; return a value that includes a key for a
;; seq of functions to run to perform the
;; side-effects.
;; I'd still have to call updater, get
;; that updating seq, and update the state
;; to recurse.
;; I'd prefer to do these next two
;; pieces in a single step.
;; TODO: Read up on Executors. I could wind up
;; with really nasty interactions now that I
;; don't have an agent to keep this single-
;; threaded.
;; Actually, it should be safe as written.
;; Just be sure to keep everything synchronized
;; around takes from the i/o handle. (Not
;; needing to do that manually is
;; a great reason to not introduce a second