Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

ASMI simulation models

  • Loading branch information...
commit a5915101890ecf7052c058d860aad546170f792c 1 parent b7a84b3
Sébastien Bourdeauducq sbourdeauducq authored
6 doc/index.rst
View
@@ -590,7 +590,7 @@ A FIR filter
.. include:: ../examples/fir.py
:code: python
-Wishbone
-========
-.. include:: ../examples/wb_initiator.py
+Abstract bus transactions
+=========================
+.. include:: ../examples/abstract_transactions.py
:code: python
44 examples/wb_initiator.py → examples/abstract_transactions.py
View
@@ -6,7 +6,7 @@
from migen.fhdl.structure import *
from migen.fhdl import autofragment
from migen.bus.transactions import *
-from migen.bus import wishbone
+from migen.bus import wishbone, asmibus
from migen.sim.generic import Simulator
from migen.sim.icarus import Runner
@@ -33,23 +33,30 @@ def my_generator():
yield None
# Our bus slave.
-class MyModel(wishbone.TargetModel):
- def __init__(self):
- self.prng = Random(763627)
-
+class MyModel:
def read(self, address):
return address + 4
-
+
+class MyModelWB(MyModel, wishbone.TargetModel):
+ def __init__(self):
+ self.prng = Random(763627)
+
def can_ack(self, bus):
+ # Simulate variable latency.
return self.prng.randrange(0, 2)
-def main():
+class MyModelASMI(MyModel, asmibus.TargetModel):
+ pass
+
+def test_wishbone():
+ print("*** Wishbone test")
+
# The "wishbone.Initiator" library component runs our generator
# and manipulates the bus signals accordingly.
master = wishbone.Initiator(my_generator())
# The "wishbone.Target" library component examines the bus signals
# and calls into our model object.
- slave = wishbone.Target(MyModel())
+ slave = wishbone.Target(MyModelWB())
# The "wishbone.Tap" library component examines the bus at the slave port
# and displays the transactions on the console (<TRead...>/<TWrite...>).
tap = wishbone.Tap(slave.bus)
@@ -63,7 +70,26 @@ def end_simulation(s):
sim = Simulator(fragment, Runner())
sim.run()
-main()
+def test_asmi():
+ print("*** ASMI test")
+
+ # Create a hub with one port for our initiator.
+ hub = asmibus.Hub(32, 32)
+ port = hub.get_port()
+ hub.finalize()
+ # Create the initiator, target and tap (similar to the Wishbone case).
+ master = asmibus.Initiator(port, my_generator())
+ slave = asmibus.Target(hub, MyModelASMI())
+ tap = asmibus.Tap(hub)
+ # Run the simulation (same as the Wishbone case).
+ def end_simulation(s):
+ s.interrupt = master.done
+ fragment = autofragment.from_local() + Fragment(sim=[end_simulation])
+ sim = Simulator(fragment, Runner())
+ sim.run()
+
+test_wishbone()
+test_asmi()
# Output:
# <TWrite adr:0x0 dat:0x0>
115 migen/bus/asmibus.py
View
@@ -1,6 +1,7 @@
from migen.fhdl.structure import *
from migen.corelogic.misc import optree
from migen.bus.transactions import *
+from migen.sim.generic import Proxy
class FinalizeError(Exception):
pass
@@ -169,6 +170,49 @@ def get_fragment(self):
]
return ports + Fragment(comb)
+class Tap:
+ def __init__(self, hub, handler=print):
+ self.hub = hub
+ self.handler = handler
+ self.tag_to_transaction = dict()
+ self.transaction = None
+
+ def do_simulation(self, s):
+ hub = Proxy(s, self.hub)
+
+ # Pull any data announced in the previous cycle.
+ if isinstance(self.transaction, TWrite):
+ self.transaction.data = hub.dat_w
+ self.transaction.sel = ~hub.dat_wm
+ self.handler(self.transaction)
+ self.transaction = None
+ if isinstance(self.transaction, TRead):
+ self.transaction.data = hub.dat_r
+ self.handler(self.transaction)
+ self.transaction = None
+
+ # Tag issue. Transaction objects are created here
+ # and placed into the tag_to_transaction dictionary.
+ for tag, slot in enumerate(self.hub.get_slots()):
+ if s.rd(slot.allocate):
+ adr = s.rd(slot.allocate_adr)
+ we = s.rd(slot.allocate_we)
+ if we:
+ transaction = TWrite(adr)
+ else:
+ transaction = TRead(adr)
+ transaction.latency = s.cycle_counter
+ self.tag_to_transaction[tag] = transaction
+
+ # Tag call.
+ if hub.call:
+ transaction = self.tag_to_transaction[hub.tag_call]
+ transaction.latency = s.cycle_counter - transaction.latency + 1
+ self.transaction = transaction
+
+ def get_fragment(self):
+ return Fragment(sim=[self.do_simulation])
+
class Initiator:
def __init__(self, port, generator):
self.port = port
@@ -225,3 +269,74 @@ def do_simulation(self, s):
def get_fragment(self):
return Fragment(sim=[self.do_simulation])
+
+class TargetModel:
+ def __init__(self):
+ self.last_slot = 0
+
+ def read(self, address):
+ return 0
+
+ def write(self, address, data, mask):
+ pass
+
+ # Round-robin scheduling.
+ def select_slot(self, pending_slots):
+ if not pending_slots:
+ return -1
+ self.last_slot += 1
+ if self.last_slot > max(pending_slots):
+ self.last_slot = 0
+ while self.last_slot not in pending_slots:
+ self.last_slot += 1
+ return self.last_slot
+
+class Target:
+ def __init__(self, hub, model):
+ self.hub = hub
+ self.model = model
+ self._calling_tag = -1
+ self._write_request_d = -1
+ self._write_request = -1
+ self._read_request = -1
+
+ def do_simulation(self, s):
+ slots = self.hub.get_slots()
+
+ # Data I/O
+ if self._write_request >= 0:
+ self.model.write(self._write_request,
+ s.rd(self.hub.dat_w), s.rd(self.hub.dat_wm))
+ if self._read_request >= 0:
+ s.wr(self.hub.dat_r, self.model.read(self._read_request))
+
+ # Request pipeline
+ self._read_request = -1
+ self._write_request = self._write_request_d
+ self._write_request_d = -1
+
+ # Examine pending slots and possibly choose one.
+ # Note that we do not use the SLOT_PROCESSING state here.
+ # Selected slots are immediately called.
+ pending_slots = set()
+ for tag, slot in enumerate(slots):
+ if tag != self._calling_tag and s.rd(slot.state) == SLOT_PENDING:
+ pending_slots.add(tag)
+ slot_to_call = self.model.select_slot(pending_slots)
+
+ # Call slot.
+ if slot_to_call >= 0:
+ slot = slots[slot_to_call]
+ s.wr(self.hub.call, 1)
+ s.wr(self.hub.tag_call, slot_to_call)
+ self._calling_tag = slot_to_call
+ if s.rd(slot.we):
+ self._write_request_d = s.rd(slot.adr)
+ else:
+ self._read_request = s.rd(slot.adr)
+ else:
+ s.wr(self.hub.call, 0)
+ self._calling_tag = -1
+
+ def get_fragment(self):
+ return Fragment(sim=[self.do_simulation])
Please sign in to comment.
Something went wrong with that request. Please try again.