@@ -42,17 +42,33 @@ my class IO::Socket::Async {
42
42
$ p
43
43
}
44
44
45
+ my class Datagram {
46
+ has $ . data ;
47
+ has str $ . hostname ;
48
+ has int $ . port ;
49
+
50
+ method decode (| c) {
51
+ die " Cannot decode a datagram with Str data" if $ ! data ~~ Str ;
52
+ return self . clone (data => $ ! data . decode(| c));
53
+ }
54
+ method encode (| c) {
55
+ die " Cannot encode a datagram with Blob data" if $ ! data ~~ Blob ;
56
+ return self . clone (data => $ ! data . encode(| c));
57
+ }
58
+ }
59
+
45
60
my class SocketReaderTappable does Tappable {
46
61
has $ ! VMIO ;
47
62
has $ ! scheduler ;
48
63
has $ ! buf ;
49
64
has $ ! close-promise ;
65
+ has $ ! udp ;
50
66
51
- method new (Mu : $ VMIO ! , : $ scheduler ! , : $ buf ! , : $ close-promise ! ) {
52
- self . CREATE! SET-SELF($ VMIO , $ scheduler , $ buf , $ close-promise )
67
+ method new (Mu : $ VMIO ! , : $ scheduler ! , : $ buf ! , : $ close-promise ! , : $ udp ! ) {
68
+ self . CREATE! SET-SELF($ VMIO , $ scheduler , $ buf , $ close-promise , $ udp )
53
69
}
54
70
55
- method ! SET-SELF (Mu $ ! VMIO , $ ! scheduler , $ ! buf , $ ! close-promise ) { self }
71
+ method ! SET-SELF (Mu $ ! VMIO , $ ! scheduler , $ ! buf , $ ! close-promise , $ ! udp ) { self }
56
72
57
73
method tap (& emit , & done , & quit , & tap ) {
58
74
my $ buffer := nqp ::list();
@@ -76,7 +92,7 @@ my class IO::Socket::Async {
76
92
$ lock . protect: {
77
93
my $ cancellation := nqp ::asyncreadbytes(nqp ::decont($ ! VMIO ),
78
94
$ ! scheduler . queue(: hint-affinity),
79
- -> Mu \seq, Mu \data, Mu \err {
95
+ -> Mu \seq, Mu \data, Mu \err, Mu \hostname = Str , Mu \port = Int {
80
96
$ lock . protect: {
81
97
unless $ finished {
82
98
if err {
@@ -85,7 +101,15 @@ my class IO::Socket::Async {
85
101
}
86
102
elsif nqp ::isconcrete(data) {
87
103
my int $ insert-pos = seq - $ buffer-start-seq ;
88
- nqp ::bindpos($ buffer , $ insert-pos , data);
104
+ if $ ! udp && nqp ::isconcrete(hostname) && nqp ::isconcrete(port) {
105
+ nqp ::bindpos($ buffer , $ insert-pos , Datagram. new (
106
+ data => data,
107
+ hostname => hostname,
108
+ port => port
109
+ ));
110
+ } else {
111
+ nqp ::bindpos($ buffer , $ insert-pos , data);
112
+ }
89
113
emit-events();
90
114
}
91
115
else {
@@ -116,13 +140,13 @@ my class IO::Socket::Async {
116
140
method serial (--> True ) { }
117
141
}
118
142
119
- multi method Supply (IO ::Socket::Async: D : : $ bin , : $ buf = buf8 . new , : $ enc , : $ scheduler = $ * SCHEDULER ) {
143
+ multi method Supply (IO ::Socket::Async: D : : $ bin , : $ buf = buf8 . new , : $ datagram , : $ enc , : $ scheduler = $ * SCHEDULER ) {
120
144
if $ bin {
121
145
Supply . new : SocketReaderTappable. new :
122
- : $ ! VMIO , : $ scheduler , : $ buf , : $ ! close-promise
146
+ : $ ! VMIO , : $ scheduler , : $ buf , : $ ! close-promise , udp => $ ! udp && $ datagram
123
147
}
124
148
else {
125
- my $ bin-supply = self . Supply (: bin);
149
+ my $ bin-supply = self . Supply (: bin, : $ datagram );
126
150
if $ ! udp {
127
151
supply {
128
152
whenever $ bin-supply {
0 commit comments