-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_models.py
149 lines (127 loc) · 5.28 KB
/
data_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from pydantic import BaseModel, Field, PrivateAttr, computed_field
from pydantic.types import List, Annotated
from pydantic.networks import IPv4Address
from datetime import datetime
from uuid import uuid4, UUID
import shutil
import logging
log = logging.getLogger(__name__)
logging.basicConfig(filename="ssoh.log", format='%(asctime)s | %(name)s | %(levelname)s | %(message)s',
level=logging.INFO)
log.addHandler(logging.StreamHandler())
class PingEvent(BaseModel):
"""
PingEvent represents a single instance where the application pinged one target
Attributes:
event_uuid: A UUID4 representing one single event. Generated automatically
timestamp: The timestamp when the ping occurred
ping: ping in ms
failure: shows if ping failed
"""
event_uuid: str = Field(default_factory=lambda: uuid4().hex)
timestamp: datetime
ping: float = Field(ge=0)
failure: bool = Field(default=True)
class PingList(BaseModel):
"""
PingList is a collection of PingEvent objects for a specific ip address
Attributes:
pings: List of PingEvent objects
target: target ip address
"""
pings: List[PingEvent] = Field(default=[])
target: IPv4Address
def add_ping_event(self, ping: PingEvent):
"""
Adds a PingEvent to the PingList
Args:
ping: a fully instantiated PingEvent object
"""
# check if ping already exists in list
for existing_ping in self.pings:
if existing_ping.event_uuid == ping.event_uuid:
log.warning(f"Ping already exists for {ping.event_uuid}. Not adding Ping")
return
# sort pings based on timestamp
self.pings.sort(key=lambda sorted_ping: sorted_ping.timestamp)
# delete pings more than 4
while len(self.pings) >= 5:
self.pings.pop(0)
# append new ping
self.pings.append(ping)
def add_ping(self, failure: bool, latency: float = 9999999, timestamp=datetime.now()):
"""
Creates a PingEvent based on ping details and appends it to the PingList
Args:
failure: boolean indicating whether the ping failed
latency: latency of the ping
timestamp: timestamp of the ping
"""
self.add_ping_event(PingEvent(timestamp=timestamp, target=self.target, ping=latency, failure=failure))
def get_valid_percentage(self) -> float:
"""
Calculates the percentage of valid pings
Returns:
a simple float between 0 and 1 indicating the percentage of valid pings
if no pings were found the percentage is 1.0 (100%)
"""
# get count of PingEvents where failure is False
valid_events = sum(not ping.failure for ping in self.pings)
# prevent zero division
if len(self.pings) == 0:
return 1
return valid_events / len(self.pings)
def did_latest_fail(self) -> bool:
"""
Sorts array and checks if the latest ping failed
Returns: True if latest ping failed; False otherwise
"""
# sort pings based on timestamp
self.pings.sort(key=lambda sorted_ping: sorted_ping.timestamp)
return self.pings[-1].failure
@classmethod
def load(cls, ip: IPv4Address):
"""
Loads a PingList from a json File based on the ip address
automatically instantiates an empty instance of the PingList if it doesn't exist
Args:
ip: ip address of the PingList to load
Returns:
an instance of the PingList class representing the last ping events for the given ip
"""
filename = str(ip) + ".json"
try:
with open(filename, "r") as listfile:
return cls.model_validate_json(listfile.read())
except FileNotFoundError:
log.error("File %s not found or not readable. Creating empty pinglist instead.", filename)
return cls(target=ip)
except ValueError:
backup_file_name: str = filename + "." + str(int(datetime.timestamp(datetime.now())))
# backup old file to [oldfilename].[timestamp]
shutil.move(filename, backup_file_name)
log.error("Invalid JSON. Creating empty pinglist instead. Old file moved to %s", backup_file_name)
return cls(target=ip)
@classmethod
def clear(cls, ip: IPv4Address):
"""
Clears the PingList from a json File based on the ip and saves it
Args:
ip: ip address of the PingList to clear
"""
this_pinglist = cls.load(ip)
this_pinglist.pings = []
log.info(f"Cleared pings on {this_pinglist.target}")
filename = str(ip) + ".json"
backup_file_name: str = filename + "." + str(int(datetime.timestamp(datetime.now())))
log.debug(f"Moving {filename} to {backup_file_name}")
# backup old file to [oldfilename].[timestamp]
shutil.move(filename, backup_file_name)
this_pinglist.save()
def save(self):
"""
Saves the current instance of the PingList to a json file named with the target ip
"""
filename = str(self.target) + ".json"
with open(filename, "w+") as listfile:
listfile.write(self.model_dump_json())