Skip to content

Commit

Permalink
Add parsed_data
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Nov 11, 2022
1 parent e8c55c5 commit 6ff0b93
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions irrd/server/http/event_stream.py
Expand Up @@ -17,6 +17,7 @@

from irrd.conf import get_setting
from irrd.rpki.status import RPKIStatus
from irrd.rpsl.rpsl_objects import rpsl_object_from_text
from irrd.scopefilter.status import ScopeFilterStatus
from irrd.storage.database_handler import DatabaseHandler
from irrd.storage.event_stream import AsyncEventStreamClient
Expand Down Expand Up @@ -57,7 +58,7 @@ async def stream_response(self, sources: List[str], object_classes: List[str]):
}) + '\n'

query = RPSLDatabaseQuery(column_names=[
'rpsl_pk', 'object_class', 'object_text', 'source', 'updated',
'rpsl_pk', 'object_class', 'object_text', 'source', 'updated', 'parsed_data'
]).rpki_status(
[RPKIStatus.not_found.name, RPKIStatus.valid.name]
).scopefilter_status(
Expand All @@ -80,12 +81,17 @@ async def stream_response(self, sources: List[str], object_classes: List[str]):
fp.seek(0)
csv.field_size_limit(sys.maxsize)
for row in csv.reader(fp):
pk, object_class, object_text, source, updated, parsed_data_text = row
parsed_data = ujson.decode(parsed_data_text)
if 'auth' in parsed_data:
parsed_data['auth'] = [remove_auth_hashes(p) for p in parsed_data['auth']]
yield ujson.encode({
'pk': row[0],
'object_class': row[1],
'object_text': remove_auth_hashes(row[2]),
'source': row[3],
'updated': row[4],
'pk': pk,
'object_class': object_class,
'object_text': remove_auth_hashes(object_text),
'source': source,
'updated': updated,
'parsed_data': parsed_data,
}) + '\n'
dh.close()

Expand Down Expand Up @@ -145,6 +151,8 @@ async def _send_new_journal_entries(self, websocket: WebSocket, after_journal_se
new_highest_serial_journal = 0

for entry in journal_entries:
object_text = remove_auth_hashes(entry['object_text'])
rpsl_obj = rpsl_object_from_text(object_text, strict_validation=False)
await websocket.send_text(ujson.encode({
'message_type': 'rpsl_journal',
'event_data': {
Expand All @@ -156,7 +164,8 @@ async def _send_new_journal_entries(self, websocket: WebSocket, after_journal_se
'serial_nrtm': entry['serial_nrtm'],
'origin': entry['origin'].name,
'timestamp': entry['timestamp'].isoformat(),
'object_text': remove_auth_hashes(entry['object_text']),
'object_text': object_text,
'parsed_data': rpsl_obj.parsed_data,
}
}))
new_highest_serial_journal = max([entry['serial_journal'], new_highest_serial_journal])
Expand Down

0 comments on commit 6ff0b93

Please sign in to comment.