Skip to content

Commit

Permalink
Adding thread-safe message queue and updating test.
Browse files Browse the repository at this point in the history
  • Loading branch information
morphex committed Jul 18, 2022
1 parent 5e519cb commit 8c565c5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
12 changes: 12 additions & 0 deletions client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@

print("From server:", ssl_connection.recv(1024).decode())

time.sleep(2)

ssl_connection.send("list".encode())

time.sleep(5)

messages = ssl_connection.recv(1024).decode()
print("From server:", messages)

messages = ssl_connection.recv(1024).decode()
print("From server:", messages)

ssl_connection.send("QUIT".encode())

time.sleep(5)
Expand Down
48 changes: 47 additions & 1 deletion smps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,64 @@
import socket
import time
import sys
import threading

class SimpleMessageStorage:

def __init__(self, expiry=600):
"""expiry is seconds a message is valid."""
self.messages = []
self.expiry = expiry
self._lock = threading.Lock()

def add_message(self, message):
self._acquire_lock()
self.messages.append((time.time(), message))
self._release_lock()

def _acquire_lock(self):
"""Acquires a lock on the message database."""
print("Locking message database")
self._lock.acquire()

def _release_lock(self):
"""Acquires a lock on the message database."""
print("Releasing message database")
self._lock.release()

def get_messages(self):
"""Returns a list of messages that are still valid, and
clears out old messages."""
new_messages = []
self._acquire_lock()
for timestamp, message in self.messages:
if timestamp < (time.time() + self.expiry):
new_messages.append((timestamp, message))
self.messages = new_messages
self._release_lock()
return self.messages

message_storage = SimpleMessageStorage()

class Handler(StreamRequestHandler):

def handle(self):
global message_storage
try:
while True:
input = self.request.recv(1024)
input = input.decode("utf-8")
print("From client:", input)
message_storage.add_message(input)
self.wfile.write("Message accepted".encode())
time.sleep(2)
if input.lower() == "quit":
if input.lower().strip() == "list":
print("Listing messages")
messages = ""
for timestamp, message in message_storage.get_messages():
messages += "%f,%s\n" % (timestamp, message)
self.wfile.write(messages.encode())
if input.lower().strip() == "quit":
print("Exiting")
self.server.shutdown()
except Exception as instance:
Expand Down

0 comments on commit 8c565c5

Please sign in to comment.