Skip to content

Commit

Permalink
wellenvogel#324 discard messages if pipeline is not emptied within a …
Browse files Browse the repository at this point in the history
…certain time (WIP)
  • Loading branch information
quantenschaum committed Feb 19, 2024
1 parent 84e2c1c commit 0c034fa
Showing 1 changed file with 61 additions and 26 deletions.
87 changes: 61 additions & 26 deletions server/handler/feeder.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def __init__(self,cfgparam):
self.type=AVNWorker.Type.FEEDER
self.listlock=threading.Condition()
self.history=[]
self.sequence=1
self.sequence=0
self.readConfig()

def readConfig(self):
Expand All @@ -104,9 +104,6 @@ def addNMEA(self, entry,source=None,addCheckSum=False,omitDecode=False,sourcePri
@param addCheckSum: add the NMEA checksum
@return:
"""
rt=False
ll=0
hl=0
if len(entry) < 5:
AVNLog.debug("addNMEA: ignoring short data %s",entry)
return False
Expand All @@ -115,18 +112,16 @@ def addNMEA(self, entry,source=None,addCheckSum=False,omitDecode=False,sourcePri
entry+= "*" + NMEAParser.nmeaChecksum(entry) + "\r\n"
else:
if not entry[-2:]=="\r\n":
entry=entry+"\r\n"
entry+="\r\n"
self.listlock.acquire()
self.sequence+=1
if len(self.history) >= self.maxlist:
while len(self.history) >= self.maxlist:
self.history.pop(0)
self.sequence+=1
self.history.append(NmeaEntry(entry,source,omitDecode,sourcePriority))
hl=len(self.history)
rt=True
self.listlock.notify_all()
self.listlock.release()
AVNLog.debug("addNMEA history=%d data=%s",hl,entry)
return rt
AVNLog.debug("addNMEA history=%d data=%s",len(self.history), entry)
return True

def wakeUp(self):
super().wakeUp()
Expand All @@ -136,6 +131,53 @@ def wakeUp(self):
finally:
self.listlock.release()

def get_messages(self, chunk_size=10, nmea_filter=None, handler_name="unknown", timeout=1, discard_time=1):
"""yields chunks of unprocessed messages
- chunks may be shorter than requested or emtpy
- yield single messages if chunk_size==1
- discards messages if not all messages have be processed within discard_time"""
assert chunk_size>=0
seq=0 # sequence id of last processed message
t0=time.monotonic() # timestamp when pipeline was empty
while True:
with self.listlock:
history,sequence=self.history,self.sequence
start=seq+1 # first seq id of messages to yield
end=start+chunk_size # last seq id of msgs to yield, exclusive
unprocessed=sequence-seq # number of unprocessed messages
if unprocessed>len(history): # buffer overflow, too many massages
lost=unprocessed-len(history)
AVNLog.error("%s lost %d messages", handler_name, lost)
#print("OVERFLOW",lost)
start=sequence-len(history)
seq=start
unprocessed=sequence-seq
filled_since=time.monotonic()-t0 # time since pipeline has been emptied
#print("unprocessed",unprocessed,"seq",(seq,sequence),f"age {filled_since:.3f}","S/E",(start,end))
if filled_since>discard_time and unprocessed>chunk_size: # force empty pipeline, discard messages
end=sequence+1 # +1 because end is exclusive
start=max(seq+1,end-chunk_size) # yield newest msgs from buffer
AVNLog.error("%s discarded %d messages", handler_name, start-(seq+1))
#print("DISCARDED",start-(seq+1),"S/E",(start,end))
seq=min(start-1,sequence)
o=sequence-len(history)+1 # offset=sequence-array_index
start,end=start-o,end-o # seq --> history index
end=min(end,len(history)) # limit chunk to available data
assert 0<=start<=len(history) and 0<=end<=len(history), (start,end,len(history))
messages=history[start:end]
seq+=len(messages)
assert seq<=sequence,(seq,sequence)
empty=end==len(history) # pipeline is empty now
if not messages: # wait for new messages
self.listlock.wait(timeout)
if empty:
t0=time.monotonic() # reset time after waiting, it has been empty until now
#print("yield",len(messages),"empty" if empty else "")
assert len(messages)<=chunk_size
# filtering should better happen outside in the handler itself
messages=list(filter(lambda m:NMEAParser.checkFilter(m.data,nmea_filter), messages))
yield messages if chunk_size>1 else messages[0] if messages else None

#fetch entries from the history
#only return entries with higher sequence
#return a tuple (lastSequence,[listOfEntries])
Expand Down Expand Up @@ -202,25 +244,18 @@ def run(self):
AVNLog.info("standalone feeder started")
nmeaParser=NMEAParser(self.navdata)
self.setInfo('main', "running", WorkerStatus.RUNNING)
hasNmea=False
sequence=None
while not self.shouldStop():
try:
while True:
(numErrors,sequence,nmeaList)=self.fetchFromHistory(sequence,
nmeafilter=self.nmeaFilter,
includeSource=True,
waitTime=self.waitTime,
returnError=True)
if numErrors > 0:
AVNLog.error("decoder lost %d records",numErrors)
for data in nmeaList:
if not data is None and not data.omitDecode:
if nmeaParser.parseData(data.data,source=data.source,sourcePriority=data.sourcePriority):
if not hasNmea:
self.setInfo('main',"feeding NMEA",WorkerStatus.NMEA)
for chunk in self.get_messages(nmea_filter=self.nmeaFilter,name="decoder"):
if self.shouldStop(): break
self.setInfo('main',"feeding NMEA", WorkerStatus.NMEA)
for msg in chunk:
if not msg is None and not msg.omitDecode:
nmeaParser.parseData(msg.data,source=msg.source,sourcePriority=msg.sourcePriority)
except Exception as e:
AVNLog.warn("feeder exception - retrying %s",traceback.format_exc())
print(traceback.format_exc())
raise


class AVNGpsdFeeder(AVNFeeder):
Expand Down

0 comments on commit 0c034fa

Please sign in to comment.