forked from mithro/valentyusb
/
test-eptri.py
1473 lines (1219 loc) · 54.3 KB
/
test-eptri.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Tests for the Fomu Tri-Endpoint
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, NullTrigger, Timer
from cocotb.result import TestFailure, TestSuccess, ReturnValue
from valentyusb.usbcore.utils.packet import *
from valentyusb.usbcore.endpoint import *
from valentyusb.usbcore.pid import *
from valentyusb.usbcore.utils.pprint import pp_packet
from wishbone import WishboneMaster, WBOp
import logging
import csv
def grouper_tofit(n, iterable):
from itertools import zip_longest
"""Group iterable into multiples of n, except don't leave
trailing None values at the end.
"""
# itertools.zip_longest is broken because it requires you to fill in some
# value, and doesn't mention anything else in its documentation that would
# not require this behavior.
# Re-do the array to shrink it down if any None values are discovered.
broken = zip_longest(*[iter(iterable)]*n, fillvalue=None)
fixed = []
for e in broken:
f = []
for el in e:
if el is not None:
f.append(el)
fixed.append(f)
return fixed
class UsbTest:
def __init__(self, dut):
self.dut = dut
self.csrs = dict()
with open("csr.csv", newline='') as csr_csv_file:
csr_csv = csv.reader(csr_csv_file)
# csr_register format: csr_register, name, address, size, rw/ro
for row in csr_csv:
if row[0] == 'csr_register':
self.csrs[row[1]] = int(row[2], base=0)
cocotb.fork(Clock(dut.clk48, 20800, 'ps').start())
self.wb = WishboneMaster(dut, "wishbone", dut.clk12, timeout=20)
# Set the signal "test_name" to match this test
import inspect
tn = cocotb.binary.BinaryValue(value=None, n_bits=4096)
tn.buff = inspect.stack()[1][3]
self.dut.test_name = tn
@cocotb.coroutine
def reset(self):
self.dut.reset = 1
yield RisingEdge(self.dut.clk12)
self.dut.reset = 0
yield RisingEdge(self.dut.clk12)
self.dut.usb_d_p = 1
self.dut.usb_d_n = 0
yield self.disconnect()
# Enable endpoint 0
yield self.write(self.csrs['usb_setup_ev_enable'], 0xff)
yield self.write(self.csrs['usb_in_ev_enable'], 0xff)
yield self.write(self.csrs['usb_out_ev_enable'], 0xff)
yield self.write(self.csrs['usb_setup_ev_pending'], 0xff)
yield self.write(self.csrs['usb_in_ev_pending'], 0xff)
yield self.write(self.csrs['usb_out_ev_pending'], 0xff)
yield self.write(self.csrs['usb_address'], 0)
@cocotb.coroutine
def write(self, addr, val):
yield self.wb.write(addr, val)
@cocotb.coroutine
def read(self, addr):
value = yield self.wb.read(addr)
raise ReturnValue(value)
@cocotb.coroutine
def connect(self):
USB_PULLUP_OUT = self.csrs['usb_pullup_out']
yield self.write(USB_PULLUP_OUT, 1)
@cocotb.coroutine
def clear_pending(self, _ep):
yield Timer(0)
@cocotb.coroutine
def disconnect(self):
USB_PULLUP_OUT = self.csrs['usb_pullup_out']
yield self.write(USB_PULLUP_OUT, 0)
def assertEqual(self, a, b, msg):
if a != b:
raise TestFailure("{} != {} - {}".format(a, b, msg))
def assertSequenceEqual(self, a, b, msg):
if a != b:
raise TestFailure("{} vs {} - {}".format(a, b, msg))
def print_ep(self, epaddr, msg, *args):
self.dut._log.info("ep(%i, %s): %s" % (
EndpointType.epnum(epaddr),
EndpointType.epdir(epaddr).name,
msg) % args)
# Host->Device
@cocotb.coroutine
def _host_send_packet(self, packet):
"""Send a USB packet."""
# Packet gets multiplied by 4x so we can send using the
# usb48 clock instead of the usb12 clock.
packet = 'JJJJJJJJ' + wrap_packet(packet)
self.assertEqual('J', packet[-1], "Packet didn't end in J: "+packet)
for v in packet:
if v == '0' or v == '_':
# SE0 - both lines pulled low
self.dut.usb_d_p <= 0
self.dut.usb_d_n <= 0
elif v == '1':
# SE1 - illegal, should never occur
self.dut.usb_d_p <= 1
self.dut.usb_d_n <= 1
elif v == '-' or v == 'I':
# Idle
self.dut.usb_d_p <= 1
self.dut.usb_d_n <= 0
elif v == 'J':
self.dut.usb_d_p <= 1
self.dut.usb_d_n <= 0
elif v == 'K':
self.dut.usb_d_p <= 0
self.dut.usb_d_n <= 1
else:
raise TestFailure("Unknown value: %s" % v)
yield RisingEdge(self.dut.clk48)
@cocotb.coroutine
def host_send_token_packet(self, pid, addr, ep):
epnum = EndpointType.epnum(ep)
yield self._host_send_packet(token_packet(pid, addr, epnum))
@cocotb.coroutine
def host_send_data_packet(self, pid, data):
assert pid in (PID.DATA0, PID.DATA1), pid
yield self._host_send_packet(data_packet(pid, data))
@cocotb.coroutine
def host_send_sof(self, time):
yield self._host_send_packet(sof_packet(time))
@cocotb.coroutine
def host_send_ack(self):
yield self._host_send_packet(handshake_packet(PID.ACK))
@cocotb.coroutine
def host_send(self, data01, addr, epnum, data, expected=PID.ACK):
"""Send data out the virtual USB connection, including an OUT token"""
yield self.host_send_token_packet(PID.OUT, addr, epnum)
yield self.host_send_data_packet(data01, data)
yield self.host_expect_packet(handshake_packet(expected), "Expected {} packet.".format(expected))
@cocotb.coroutine
def host_setup(self, addr, epnum, data):
"""Send data out the virtual USB connection, including a SETUP token"""
yield self.host_send_token_packet(PID.SETUP, addr, epnum)
yield self.host_send_data_packet(PID.DATA0, data)
yield self.host_expect_ack()
@cocotb.coroutine
def host_recv(self, data01, addr, epnum, data):
"""Send data out the virtual USB connection, including an IN token"""
yield self.host_send_token_packet(PID.IN, addr, epnum)
yield self.host_expect_data_packet(data01, data)
yield self.host_send_ack()
# Device->Host
@cocotb.coroutine
def host_expect_packet(self, packet, msg=None):
"""Except to receive the following USB packet."""
def current():
values = (self.dut.usb_d_p, self.dut.usb_d_n)
if values == (0, 0):
return '_'
elif values == (1, 1):
return '1'
elif values == (1, 0):
return 'J'
elif values == (0, 1):
return 'K'
else:
raise TestFailure("Unrecognized dut values: {}".format(values))
# Wait for transmission to start
tx = 0
bit_times = 0
for i in range(0, 100):
tx = self.dut.usb_tx_en
if tx == 1:
break
yield RisingEdge(self.dut.clk48)
bit_times = bit_times + 1
if tx != 1:
raise TestFailure("No packet started, " + msg)
# # USB specifies that the turn-around time is 7.5 bit times for the device
bit_time_max = 12.5
bit_time_acceptable = 7.5
if (bit_times/4.0) > bit_time_max:
raise TestFailure("Response came after {} bit times, which is more than {}".format(bit_times / 4.0, bit_time_max))
if (bit_times/4.0) > bit_time_acceptable:
self.dut._log.warn("Response came after {} bit times (> {})".format(bit_times / 4.0, bit_time_acceptable))
else:
self.dut._log.info("Response came after {} bit times".format(bit_times / 4.0))
# Read in the transmission data
result = ""
for i in range(0, 4096):
result += current()
yield RisingEdge(self.dut.clk48)
if self.dut.usb_tx_en != 1:
break
if tx == 1:
raise TestFailure("Packet didn't finish, " + msg)
self.dut.usb_d_p = 1
self.dut.usb_d_n = 0
# Check the packet received matches
expected = pp_packet(wrap_packet(packet))
actual = pp_packet(result)
self.assertSequenceEqual(expected, actual, msg)
@cocotb.coroutine
def host_expect_ack(self):
yield self.host_expect_packet(handshake_packet(PID.ACK), "Expected ACK packet.")
@cocotb.coroutine
def host_expect_nak(self):
yield self.host_expect_packet(handshake_packet(PID.NAK), "Expected NAK packet.")
@cocotb.coroutine
def host_expect_stall(self):
yield self.host_expect_packet(handshake_packet(PID.STALL), "Expected STALL packet.")
@cocotb.coroutine
def host_expect_data_packet(self, pid, data):
assert pid in (PID.DATA0, PID.DATA1), pid
yield self.host_expect_packet(data_packet(pid, data), "Expected %s packet with %r" % (pid.name, data))
@cocotb.coroutine
def pending(self, ep):
if EndpointType.epdir(ep) == EndpointType.IN:
val = yield self.read(self.csrs['usb_in_status'])
else:
val = yield self.read(self.csrs['usb_out_status'])
raise ReturnValue(val & 1)
@cocotb.coroutine
def expect_setup(self, epaddr, expected_data):
actual_data = []
# wait for data to appear
for i in range(128):
self.dut._log.debug("Prime loop {}".format(i))
status = yield self.read(self.csrs['usb_setup_status'])
have = status & 1
if have:
break
yield RisingEdge(self.dut.clk12)
for i in range(48):
self.dut._log.debug("Read loop {}".format(i))
status = yield self.read(self.csrs['usb_setup_status'])
have = status & 1
if not have:
break
v = yield self.read(self.csrs['usb_setup_data'])
yield self.write(self.csrs['usb_setup_ctrl'], 1)
actual_data.append(v)
yield RisingEdge(self.dut.clk12)
if len(actual_data) < 2:
raise TestFailure("data was short (got {}, expected {})".format(expected_data, actual_data))
actual_data, actual_crc16 = actual_data[:-2], actual_data[-2:]
self.print_ep(epaddr, "Got: %r (expected: %r)", actual_data, expected_data)
self.assertSequenceEqual(expected_data, actual_data, "SETUP packet not received")
self.assertSequenceEqual(crc16(expected_data), actual_crc16, "CRC16 not valid")
# Acknowledge that we've handled the setup packet
yield self.write(self.csrs['usb_setup_ctrl'], 2)
@cocotb.coroutine
def drain_setup(self):
actual_data = []
for i in range(48):
status = yield self.read(self.csrs['usb_setup_status'])
have = status & 1
if not have:
break
v = yield self.read(self.csrs['usb_setup_data'])
yield self.write(self.csrs['usb_setup_ctrl'], 1)
actual_data.append(v)
yield RisingEdge(self.dut.clk12)
yield self.write(self.csrs['usb_setup_ctrl'], 2)
# Drain the pending bit
yield self.write(self.csrs['usb_setup_ev_pending'], 0xff)
return actual_data
@cocotb.coroutine
def drain_out(self):
actual_data = []
for i in range(70):
status = yield self.read(self.csrs['usb_out_status'])
have = status & 1
if not have:
break
v = yield self.read(self.csrs['usb_out_data'])
yield self.write(self.csrs['usb_out_ctrl'], 1)
actual_data.append(v)
yield RisingEdge(self.dut.clk12)
yield self.write(self.csrs['usb_out_ev_pending'], 0xff)
yield self.write(self.csrs['usb_out_ctrl'], 0x02)
return actual_data[:-2] # Strip off CRC16
@cocotb.coroutine
def expect_data(self, epaddr, expected_data, expected):
actual_data = []
# wait for data to appear
for i in range(128):
self.dut._log.debug("Prime loop {}".format(i))
status = yield self.read(self.csrs['usb_out_status'])
have = status & 1
if have:
break
yield RisingEdge(self.dut.clk12)
for i in range(256):
self.dut._log.debug("Read loop {}".format(i))
status = yield self.read(self.csrs['usb_out_status'])
have = status & 1
if not have:
break
v = yield self.read(self.csrs['usb_out_data'])
yield self.write(self.csrs['usb_out_ctrl'], 3)
actual_data.append(v)
yield RisingEdge(self.dut.clk12)
if expected == PID.ACK:
if len(actual_data) < 2:
raise TestFailure("data {} was short".format(actual_data))
actual_data, actual_crc16 = actual_data[:-2], actual_data[-2:]
self.print_ep(epaddr, "Got: %r (expected: %r)", actual_data, expected_data)
self.assertSequenceEqual(expected_data, actual_data, "DATA packet not correctly received")
self.assertSequenceEqual(crc16(expected_data), actual_crc16, "CRC16 not valid")
@cocotb.coroutine
def set_response(self, ep, response):
if EndpointType.epdir(ep) == EndpointType.IN and response == EndpointResponse.ACK:
yield self.write(self.csrs['usb_in_ctrl'], EndpointType.epnum(ep))
@cocotb.coroutine
def send_data(self, token, ep, data):
for b in data:
yield self.write(self.csrs['usb_in_data'], b)
yield self.write(self.csrs['usb_in_ctrl'], ep)
@cocotb.coroutine
def transaction_setup(self, addr, data, epnum=0):
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
xmit = cocotb.fork(self.host_setup(addr, epnum, data))
yield self.expect_setup(epaddr_out, data)
yield xmit.join()
@cocotb.coroutine
def transaction_data_out(self, addr, ep, data, chunk_size=64, expected=PID.ACK):
epnum = EndpointType.epnum(ep)
datax = PID.DATA1
# # Set it up so we ACK the final IN packet
# yield self.write(self.csrs['usb_in_ctrl'], 0)
for _i, chunk in enumerate(grouper_tofit(chunk_size, data)):
self.dut._log.warning("Sening {} bytes to host".format(len(chunk)))
# Enable receiving data
yield self.write(self.csrs['usb_out_ctrl'], (1 << 1))
xmit = cocotb.fork(self.host_send(datax, addr, epnum, chunk, expected))
yield self.expect_data(epnum, list(chunk), expected)
yield xmit.join()
if datax == PID.DATA0:
datax = PID.DATA1
else:
datax = PID.DATA0
@cocotb.coroutine
def transaction_data_in(self, addr, ep, data, chunk_size=64):
epnum = EndpointType.epnum(ep)
datax = PID.DATA1
sent_data = 0
for i, chunk in enumerate(grouper_tofit(chunk_size, data)):
sent_data = 1
self.dut._log.debug("Actual data we're expecting: {}".format(chunk))
for b in chunk:
yield self.write(self.csrs['usb_in_data'], b)
yield self.write(self.csrs['usb_in_ctrl'], epnum)
recv = cocotb.fork(self.host_recv(datax, addr, epnum, chunk))
yield recv.join()
if datax == PID.DATA0:
datax = PID.DATA1
else:
datax = PID.DATA0
if not sent_data:
yield self.write(self.csrs['usb_in_ctrl'], epnum)
recv = cocotb.fork(self.host_recv(datax, addr, epnum, []))
yield self.send_data(datax, epnum, data)
yield recv.join()
@cocotb.coroutine
def set_data(self, ep, data):
_epnum = EndpointType.epnum(ep)
for b in data:
yield self.write(self.csrs['usb_in_data'], b)
@cocotb.coroutine
def transaction_status_in(self, addr, ep):
epnum = EndpointType.epnum(ep)
assert EndpointType.epdir(ep) == EndpointType.IN
xmit = cocotb.fork(self.host_recv(PID.DATA1, addr, epnum, []))
yield xmit.join()
@cocotb.coroutine
def transaction_status_out(self, addr, ep):
epnum = EndpointType.epnum(ep)
assert EndpointType.epdir(ep) == EndpointType.OUT
xmit = cocotb.fork(self.host_send(PID.DATA1, addr, epnum, []))
yield xmit.join()
@cocotb.coroutine
def control_transfer_out(self, addr, setup_data, descriptor_data=None):
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
if (setup_data[0] & 0x80) == 0x80:
raise Exception("setup_data indicated an IN transfer, but you requested an OUT transfer")
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
if setup_ev != 0:
raise TestFailure("setup_ev should be 0 at the start of the test, was: {:02x}".format(setup_ev))
# Setup stage
self.dut._log.info("setup stage")
yield self.transaction_setup(addr, setup_data)
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
if setup_ev != 1:
raise TestFailure("setup_ev should be 1, was: {:02x}".format(setup_ev))
yield self.write(self.csrs['usb_setup_ev_pending'], setup_ev)
# Data stage
if descriptor_data is not None:
out_ev = yield self.read(self.csrs['usb_out_ev_pending'])
if out_ev != 0:
raise TestFailure("out_ev should be 0 at the start of the test, was: {:02x}".format(out_ev))
if (setup_data[7] != 0 or setup_data[6] != 0) and descriptor_data is None:
raise Exception("setup_data indicates data, but no descriptor data was specified")
if (setup_data[7] == 0 and setup_data[6] == 0) and descriptor_data is not None:
raise Exception("setup_data indicates no data, but descriptor data was specified")
if descriptor_data is not None:
self.dut._log.info("data stage")
yield self.transaction_data_out(addr, epaddr_out, descriptor_data)
if descriptor_data is not None:
yield RisingEdge(self.dut.clk12)
out_ev = yield self.read(self.csrs['usb_out_ev_pending'])
if out_ev != 1:
raise TestFailure("out_ev should be 1 at the end of the test, was: {:02x}".format(out_ev))
yield self.write(self.csrs['usb_out_ev_pending'], out_ev)
# Status stage
self.dut._log.info("status stage")
in_ev = yield self.read(self.csrs['usb_in_ev_pending'])
if in_ev != 0:
raise TestFailure("in_ev should be 0 at the start of the test, was: {:02x}".format(in_ev))
yield self.transaction_status_in(addr, epaddr_in)
yield RisingEdge(self.dut.clk12)
in_ev = yield self.read(self.csrs['usb_in_ev_pending'])
if in_ev != 1:
raise TestFailure("in_ev should be 1 at the end of the test, was: {:02x}".format(in_ev))
yield self.write(self.csrs['usb_in_ev_pending'], in_ev)
@cocotb.coroutine
def control_transfer_in(self, addr, setup_data, descriptor_data=None):
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
if (setup_data[0] & 0x80) == 0x00:
raise Exception("setup_data indicated an OUT transfer, but you requested an IN transfer")
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
if setup_ev != 0:
raise TestFailure("setup_ev should be 0 at the start of the test, was: {:02x}".format(setup_ev))
# Setup stage
self.dut._log.info("setup stage")
yield self.transaction_setup(addr, setup_data)
setup_ev = yield self.read(self.csrs['usb_setup_ev_pending'])
if setup_ev != 1:
raise TestFailure("setup_ev should be 1, was: {:02x}".format(setup_ev))
yield self.write(self.csrs['usb_setup_ev_pending'], setup_ev)
# Data stage
in_ev = yield self.read(self.csrs['usb_in_ev_pending'])
if in_ev != 0:
raise TestFailure("in_ev should be 0 at the start of the test, was: {:02x}".format(in_ev))
if (setup_data[7] != 0 or setup_data[6] != 0) and descriptor_data is None:
raise Exception("setup_data indicates data, but no descriptor data was specified")
if (setup_data[7] == 0 and setup_data[6] == 0) and descriptor_data is not None:
raise Exception("setup_data indicates no data, but descriptor data was specified")
if descriptor_data is not None:
self.dut._log.info("data stage")
yield self.transaction_data_in(addr, epaddr_in, descriptor_data)
# Give the signal one clock cycle to perccolate through the event manager
yield RisingEdge(self.dut.clk12)
in_ev = yield self.read(self.csrs['usb_in_ev_pending'])
if in_ev != 1:
raise TestFailure("in_ev should be 1 at the end of the test, was: {:02x}".format(in_ev))
yield self.write(self.csrs['usb_in_ev_pending'], in_ev)
# Status stage
self.dut._log.info("status stage")
out_ev = yield self.read(self.csrs['usb_out_ev_pending'])
if out_ev != 0:
raise TestFailure("out_ev should be 0 at the start of the test, was: {:02x}".format(out_ev))
yield self.transaction_status_out(addr, epaddr_out)
yield RisingEdge(self.dut.clk12)
out_ev = yield self.read(self.csrs['usb_out_ev_pending'])
if out_ev != 1:
raise TestFailure("out_ev should be 1 at the end of the test, was: {:02x}".format(out_ev))
yield self.write(self.csrs['usb_out_ev_pending'], out_ev)
@cocotb.test()
def iobuf_validate(dut):
"""Sanity test that the Wishbone bus actually works"""
harness = UsbTest(dut)
yield harness.reset()
USB_PULLUP_OUT = harness.csrs['usb_pullup_out']
val = yield harness.read(USB_PULLUP_OUT)
dut._log.info("Value at start: {}".format(val))
if dut.usb_pullup != 0:
raise TestFailure("USB pullup didn't start at zero")
yield harness.write(USB_PULLUP_OUT, 1)
val = yield harness.read(USB_PULLUP_OUT)
dut._log.info("Memory value: {}".format(val))
if val != 1:
raise TestFailure("USB pullup is not set!")
raise TestSuccess("iobuf validated")
@cocotb.test()
def test_control_setup(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
# We write to address 0, because we just want to test that the control
# circuitry works. Normally you wouldn't do this.
yield harness.write(harness.csrs['usb_address'], 0)
yield harness.transaction_setup(0, [0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x00, 0x00])
yield harness.transaction_data_in(0, 0, [])
@cocotb.test()
def test_control_transfer_in(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
yield harness.write(harness.csrs['usb_address'], 20)
yield harness.control_transfer_in(
20,
# Get descriptor, Index 0, Type 03, LangId 0000, wLength 10?
[0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00],
# 12 byte descriptor, max packet size 8 bytes
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x08, 0x09, 0x0A, 0x0B],
)
@cocotb.test()
def test_control_transfer_in_lazy(dut):
"""Test that we can transfer data in without immediately draining it"""
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
yield harness.write(harness.csrs['usb_address'], 0)
### SETUP packet
harness.dut._log.info("sending initial SETUP packet")
# Send a SETUP packet without draining it on the device side
yield harness.host_send_token_packet(PID.SETUP, 0, epaddr_in)
yield harness.host_send_data_packet(PID.DATA0, [0x80, 0x06, 0x00, 0x05, 0x00, 0x00, 0x0A, 0x00])
yield harness.host_expect_ack()
# Set it up so we ACK the final IN packet
data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x08, 0x09, 0x0A, 0x0B]
for b in data:
yield harness.write(harness.csrs['usb_in_data'], b)
yield harness.write(harness.csrs['usb_in_ctrl'], 0)
# Send a few packets while we "process" the data as a slow host
for i in range(10):
yield harness.host_send_token_packet(PID.IN, 0, 0)
yield harness.host_expect_nak()
# Read the data, which should unblock the sending
setup_data = yield harness.drain_setup()
if len(setup_data) != 10:
raise TestFailure("1. expected setup data to be 10 bytes, but was {} bytes: {}".format(len(setup_data), setup_data))
# Perform the final "read"
yield harness.host_recv(PID.DATA1, 0, 0, data)
# Status stage
yield harness.transaction_status_out(0, epaddr_out)
### SET ADDRESS
harness.dut._log.info("setting USB address")
# Set the address. Again, don't drain the device side yet.
yield harness.host_send_token_packet(PID.SETUP, 0, epaddr_out)
yield harness.host_send_data_packet(PID.DATA0, [0x00, 0x05, 11, 0x00, 0x00, 0x00, 0x00, 0x00])
yield harness.host_expect_ack()
# Send a few packets while we "process" the data as a slow host
for i in range(10):
yield harness.host_send_token_packet(PID.IN, 0, 0)
yield harness.host_expect_nak()
setup_data = yield harness.drain_setup()
if len(setup_data) != 10:
raise TestFailure("2. expected setup data to be 10 bytes, but was {} bytes: {}".format(len(setup_data), data, len(setup_data), len(setup_data) != 10))
# Note: the `out` buffer hasn't been drained yet
yield harness.host_send_token_packet(PID.IN, 0, 0)
yield harness.host_expect_data_packet(PID.DATA1, [])
yield harness.host_send_ack()
for i in range(1532, 1541):
yield harness.host_send_sof(i)
### STALL TEST
harness.dut._log.info("sending a STALL test")
# Send a SETUP packet without draining it on the device side
yield harness.host_send_token_packet(PID.SETUP, 0, epaddr_in)
yield harness.host_send_data_packet(PID.DATA0, [0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00])
yield harness.host_expect_ack()
# Send a few packets while we "process" the data as a slow host
for i in range(10):
yield harness.host_send_token_packet(PID.IN, 0, 0)
yield harness.host_expect_nak()
# Read the data, which should unblock the sending
setup_data = yield harness.drain_setup()
if len(setup_data) != 10:
raise TestFailure("1. expected setup data to be 10 bytes, but was {} bytes: {}".format(len(setup_data), setup_data))
yield harness.write(harness.csrs['usb_in_ctrl'], 0x10) # Set STALL
# Perform the final "read"
yield harness.host_send_token_packet(PID.IN, 0, 0)
yield harness.host_expect_stall()
### RESUMING
# Send a SETUP packet to the wrong endpoint
harness.dut._log.info("sending a packet to the wrong endpoint that should be ignored")
yield harness.host_send_token_packet(PID.SETUP, 11, epaddr_in)
yield harness.host_send_data_packet(PID.DATA0, [0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00])
# yield harness.host_expect_ack()
yield harness.write(harness.csrs['usb_address'], 11)
### SETUP packet without draining
harness.dut._log.info("sending a packet without draining SETUP")
# Send a SETUP packet without draining it on the device side
yield harness.host_send_token_packet(PID.SETUP, 11, epaddr_in)
yield harness.host_send_data_packet(PID.DATA0, [0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00])
yield harness.host_expect_ack()
# Set it up so we ACK the final IN packet
data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x08, 0x09, 0x0A, 0x0B]
for b in data:
yield harness.write(harness.csrs['usb_in_data'], b)
yield harness.write(harness.csrs['usb_in_ctrl'], 0)
# Send a few packets while we "process" the data as a slow host
for i in range(2):
yield harness.host_send_token_packet(PID.IN, 11, 0)
yield harness.host_expect_nak()
# Read the data, which should unblock the sending
harness.dut._log.info("draining SETUP which should unblock it")
setup_data = yield harness.drain_setup()
if len(setup_data) != 10:
raise TestFailure("3. expected setup data to be 10 bytes, but was {} bytes: {}".format(len(setup_data), setup_data))
# Perform the final send
yield harness.host_send_token_packet(PID.IN, 11, 0)
yield harness.host_expect_data_packet(PID.DATA1, data)
yield harness.host_send_ack()
@cocotb.test()
def test_control_transfer_in_large(dut):
"""Test that we can transfer data in without immediately draining it"""
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
yield harness.write(harness.csrs['usb_address'], 0)
# Set address to 11
yield harness.control_transfer_out(
0,
# Set address (to 11)
[0x00, 0x05, 11, 0x00, 0x00, 0x00, 0x00, 0x00],
# 18 byte descriptor, max packet size 8 bytes
None,
)
yield harness.write(harness.csrs['usb_address'], 11)
### Send a packet that's longer than 64 bytes
string_data = [
0x4e, 0x3, 0x46, 0x0, 0x6f, 0x0, 0x6d, 0x0,
0x75, 0x0, 0x20, 0x0, 0x44, 0x0, 0x46, 0x0,
0x55, 0x0, 0x20, 0x0, 0x42, 0x0, 0x6f, 0x0,
0x6f, 0x0, 0x74, 0x0, 0x6c, 0x0, 0x6f, 0x0,
0x61, 0x0, 0x64, 0x0, 0x65, 0x0, 0x72, 0x0,
0x20, 0x0, 0x76, 0x0, 0x31, 0x0, 0x2e, 0x0,
0x38, 0x0, 0x2e, 0x0, 0x37, 0x0, 0x2d, 0x0,
0x38, 0x0, 0x2d, 0x0, 0x67, 0x0, 0x31, 0x0,
0x36, 0x0, 0x36, 0x0, 0x34, 0x0, 0x66, 0x0,
0x33, 0x0, 0x35, 0x0, 0x0, 0x0
]
# Send a SETUP packet without draining it on the device side
yield harness.host_send_token_packet(PID.SETUP, 11, epaddr_in)
yield harness.host_send_data_packet(PID.DATA0, [0x80, 0x06, 0x02, 0x03, 0x09, 0x04, 0xFF, 0x00])
yield harness.host_expect_ack()
yield harness.drain_setup()
# Send a few packets while we "process" the data as a slow host
for i in range(10):
yield harness.host_send_token_packet(PID.IN, 11, 0)
yield harness.host_expect_nak()
datax = PID.DATA1
sent_data = 0
for i, chunk in enumerate(grouper_tofit(64, string_data)):
sent_data = 1
harness.dut._log.debug("Actual data we're expecting: {}".format(chunk))
for b in chunk:
yield harness.write(harness.csrs['usb_in_data'], b)
yield harness.write(harness.csrs['usb_in_ctrl'], 0)
recv = cocotb.fork(harness.host_recv(datax, 11, 0, chunk))
yield recv.join()
# Send a few packets while we "process" the data as a slow host
for i in range(10):
yield harness.host_send_token_packet(PID.IN, 11, 0)
yield harness.host_expect_nak()
if datax == PID.DATA0:
datax = PID.DATA1
else:
datax = PID.DATA0
if not sent_data:
yield harness.write(harness.csrs['usb_in_ctrl'], 0)
recv = cocotb.fork(harness.host_recv(datax, 11, 0, []))
yield harness.send_data(datax, 0, string_data)
yield recv.join()
yield harness.host_send_token_packet(PID.OUT, 11, 0)
yield harness.host_send_data_packet(PID.DATA0, [])
yield harness.host_expect_ack()
@cocotb.test()
def test_sof_stuffing(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
yield harness.host_send_sof(0x04ff)
yield harness.host_send_sof(0x0512)
yield harness.host_send_sof(0x06e1)
yield harness.host_send_sof(0x0519)
@cocotb.test()
def test_sof_is_ignored(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
addr = 0x20
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
yield harness.write(harness.csrs['usb_address'], addr)
data = [0, 1, 8, 0, 4, 3, 0, 0]
@cocotb.coroutine
def send_setup_and_sof():
# Send SOF packet
yield harness.host_send_sof(2)
# Setup stage
# ------------------------------------------
# Send SETUP packet
yield harness.host_send_token_packet(PID.SETUP, addr, EndpointType.epnum(epaddr_out))
# Send another SOF packet
yield harness.host_send_sof(3)
# Data stage
# ------------------------------------------
# Send DATA packet
yield harness.host_send_data_packet(PID.DATA1, data)
yield harness.host_expect_ack()
# Send another SOF packet
yield harness.host_send_sof(4)
# Indicate that we're ready to receive data to EP0
# harness.write(harness.csrs['usb_in_ctrl'], 0)
xmit = cocotb.fork(send_setup_and_sof())
yield harness.expect_setup(epaddr_out, data)
yield xmit.join()
# # Status stage
# # ------------------------------------------
yield harness.set_response(epaddr_out, EndpointResponse.ACK)
yield harness.transaction_status_out(addr, epaddr_out)
@cocotb.test()
def test_control_setup_clears_stall(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
addr = 28
epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
yield harness.write(harness.csrs['usb_address'], addr)
yield harness.host_send_sof(0)
d = [0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0, 0]
# Send the data -- just to ensure that things are working
yield harness.transaction_data_out(addr, epaddr_out, d)
# Send it again to ensure we can re-queue things.
yield harness.transaction_data_out(addr, epaddr_out, d)
# STALL the endpoint now
yield harness.write(harness.csrs['usb_out_stall'], 0x10)
# Do another receive, which should fail
harness.dut._log.info("next transaction should stall")
yield harness.transaction_data_out(addr, epaddr_out, d, expected=PID.STALL)
# Do a SETUP, which should pass
yield harness.write(harness.csrs['usb_out_stall'], 0)
yield harness.control_transfer_out(addr, d)
# Finally, do one last transfer, which should succeed now
# that the endpoint is unstalled.
yield harness.transaction_data_out(addr, epaddr_out, d)
@cocotb.test()
def test_control_transfer_in_nak_data(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
addr = 22
yield harness.write(harness.csrs['usb_address'], addr)
# Get descriptor, Index 0, Type 03, LangId 0000, wLength 64
setup_data = [0x80, 0x06, 0x00, 0x03, 0x00, 0x00, 0x40, 0x00]
in_data = [0x04, 0x03, 0x09, 0x04]
epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
# yield harness.clear_pending(epaddr_in)
yield harness.write(harness.csrs['usb_address'], addr)
# Setup stage
# -----------
yield harness.transaction_setup(addr, setup_data)
# Data stage
# -----------
yield harness.set_response(epaddr_in, EndpointResponse.NAK)
yield harness.host_send_token_packet(PID.IN, addr, epaddr_in)
yield harness.host_expect_nak()
yield harness.set_data(epaddr_in, in_data)
yield harness.set_response(epaddr_in, EndpointResponse.ACK)
yield harness.host_send_token_packet(PID.IN, addr, epaddr_in)
yield harness.host_expect_data_packet(PID.DATA1, in_data)
yield harness.host_send_ack()
# @cocotb.test()
# def test_control_transfer_in_nak_status(dut):
# harness = UsbTest(dut)
# yield harness.reset()
# yield harness.connect()
# addr = 20
# setup_data = [0x00, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00]
# out_data = [0x00, 0x01]
# epaddr_out = EndpointType.epaddr(0, EndpointType.OUT)
# epaddr_in = EndpointType.epaddr(0, EndpointType.IN)
# yield harness.clear_pending(epaddr_out)
# yield harness.clear_pending(epaddr_in)
# # Setup stage
# # -----------
# yield harness.transaction_setup(addr, setup_data)
# # Data stage
# # ----------
# yield harness.set_response(epaddr_out, EndpointResponse.ACK)
# yield harness.transaction_data_out(addr, epaddr_out, out_data)
# # Status stage
# # ----------
# yield harness.set_response(epaddr_in, EndpointResponse.NAK)
# yield harness.host_send_token_packet(PID.IN, addr, epaddr_in)
# yield harness.host_expect_nak()
# yield harness.host_send_token_packet(PID.IN, addr, epaddr_in)
# yield harness.host_expect_nak()
# yield harness.set_response(epaddr_in, EndpointResponse.ACK)
# yield harness.host_send_token_packet(PID.IN, addr, epaddr_in)
# yield harness.host_expect_data_packet(PID.DATA1, [])
# yield harness.host_send_ack()
# yield harness.clear_pending(epaddr_in)
@cocotb.test()
def test_control_transfer_in(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()
yield harness.clear_pending(EndpointType.epaddr(0, EndpointType.OUT))
yield harness.clear_pending(EndpointType.epaddr(0, EndpointType.IN))
yield harness.write(harness.csrs['usb_address'], 20)
yield harness.host_send_sof(0)
yield harness.control_transfer_in(
20,
# Get descriptor, Index 0, Type 03, LangId 0000, wLength 10?
[0x80, 0x06, 0x00, 0x06, 0x00, 0x00, 0x0A, 0x00],
# 12 byte descriptor, max packet size 8 bytes
[0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,
0x08, 0x09, 0x0A, 0x0B],
)
@cocotb.test()
def test_control_transfer_in_out(dut):
harness = UsbTest(dut)
yield harness.reset()
yield harness.connect()