-
Notifications
You must be signed in to change notification settings - Fork 42
/
chat.py
143 lines (114 loc) · 4.73 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Client for the CometD Chat Example"""
import asyncio
import argparse
from typing import Dict, Any
from aioconsole import ainput # type: ignore
from aiocometd import Client, ConnectionType
from aiocometd.exceptions import AiocometdException
async def chat_client(url: str, nickname: str,
connection_type: ConnectionType) -> None:
"""Runs the chat client until it's canceled
:param url: CometD server URL
:param nickname: The user's nickname
:param connection_type: Connection type
"""
try:
room_name = "demo"
room_channel = "/chat/" + room_name
members_changed_channel = "/members/" + room_name
members_channel = "/service/members"
# start the client with the given connection type
async with Client(url, connection_type) as client:
print(f"Connected to '{url}' using connection "
f"type '{connection_type.value}'\n")
# subscribe to the chat room's channel to receive messages
await client.subscribe(room_channel)
# subscribe to the members channel to get notifications when the
# list of the room's members changes
await client.subscribe(members_changed_channel)
# publish to the room's channel that the user has joined
await client.publish(room_channel, {
"user": nickname,
"membership": "join",
"chat": nickname + " has joined"
})
# add the user to the room's members
await client.publish(members_channel, {
"user": nickname,
"room": room_channel
})
# start the message publisher task
input_task = asyncio.ensure_future(
input_publisher(client, nickname, room_channel))
last_user = None
try:
# listen for incoming messages
async for message in client:
# if a chat message is received
if message["channel"] == room_channel:
data = message["data"]
if data["user"] == last_user:
user = "..."
else:
last_user = data["user"]
user = data["user"] + ":"
# print the incoming message
print(f"{user} {data['chat']}")
# if the room's members change
elif message["channel"] == members_changed_channel:
print("MEMBERS:", ", ".join(message["data"]))
last_user = None
finally:
input_task.cancel()
except AiocometdException as error:
print("Encountered an error: " + str(error))
except asyncio.CancelledError:
pass
finally:
print("\nExiting...")
async def input_publisher(client: Client, nickname: str,
room_channel: str) -> None:
"""Read text from stdin and publish it on the *room_channel*
:param client: A client object
:param nickname: The user's nickname
:param room_channel: The chat room's channel
"""
up_one_line = "\033[F"
clear_line = "\033[K"
while True:
try:
# read from stdin
message_text = await ainput("")
except asyncio.CancelledError:
return
# clear the last printed line
print(up_one_line, end="")
print(clear_line, end="", flush=True)
# publish the message on the room's channel
await client.publish(room_channel, {
"user": nickname,
"chat": message_text
})
def get_arguments() -> Dict[str, Any]:
"""Returns the argument's parsed from the command line"""
parser = argparse.ArgumentParser(description="CometD chat example client")
parser.add_argument("url", metavar="server_url", type=str,
help="CometD server URL")
parser.add_argument("nickname", type=str, help="Chat nickname")
parser.add_argument("-c", "--connection_type", type=ConnectionType,
choices=list(ConnectionType),
default=ConnectionType.WEBSOCKET.value,
help="Connection type")
return vars(parser.parse_args())
def main() -> None:
"""Starts the chat client application"""
arguments = get_arguments()
loop = asyncio.get_event_loop()
chat_task = asyncio.ensure_future(chat_client(**arguments), loop=loop)
try:
loop.run_until_complete(chat_task)
except KeyboardInterrupt:
chat_task.cancel()
loop.run_until_complete(chat_task)
if __name__ == "__main__":
main()