1
1
# -*- coding: utf-8 -*-
2
+ import asyncio
3
+ import logging
2
4
3
- from storeclient import MessageLogStore
5
+ from ircb . storeclient import MessageLogStore
4
6
from collections import deque
5
7
8
+ logger = logging .getLogger ('publisher' )
9
+
6
10
7
11
class MessageLogPublisher (object ):
12
+ """
13
+ Publish latest logs in a room in realtime.
14
+
15
+ It fetches latest chat logs for a room from the store, initially.
16
+ Then, it keeps on listening to the store for WRITE events on
17
+ MessageLog model and keeps the record of latest chat logs fetched
18
+ always updated.
19
+
20
+ This can be used to push latest chat logs in a room to a client
21
+ in realtime.
22
+ """
8
23
name = 'latest_message_logs'
9
24
10
- def __init__ (self , roomname , user_id , limit = 30 ):
25
+ def __init__ (self , hostname , roomname , user_id , limit = 30 ):
26
+ self .hostname = hostname
11
27
self .roomname = roomname
12
28
self .user_id = user_id
13
29
self .limit = limit
14
- self .logs = deque (maxlen = self .limit )
30
+ self .results = deque (maxlen = self .limit )
15
31
self .index = {}
16
- MessageLogStore .on_update (self .handle_update )
17
- MessageLogStore .on_delete (self .handle_delete )
18
- MessageLogStore .on_create (self .handle_create )
32
+ self .fields = []
33
+ self .fetched = False
34
+ MessageLogStore .on ('create' , self .handle_create ,
35
+ raw = True )
36
+ asyncio .Task (self .fetch ())
19
37
20
38
@property
21
39
def signature (self ):
@@ -24,23 +42,83 @@ def signature(self):
24
42
user_id = self .user_id , limit = self .limit )
25
43
26
44
def fetch (self ):
27
- results = MessageLogStore .get ({
28
- 'filter' : {},
29
- 'order_by' : '' ,
30
- 'limit' : self .n
31
- })
45
+ """
46
+ Fetch initial latest chat logs for a room.
47
+ """
48
+ results = yield from MessageLogStore .get ({
49
+ 'filter' : {
50
+ 'hostname' : self .hostname ,
51
+ 'roomname' : self .roomname ,
52
+ 'user_id' : self .user_id
53
+ },
54
+ 'order_by' : ('-timestamp' ,),
55
+ 'limit' : self .limit ,
56
+ # 'fields': self.fields,
57
+ 'sort' : 'timestamp'
58
+ }, raw = True )
59
+ logger .debug ('fetched' , results )
32
60
self .normalize (results )
61
+ self .fetched = True
33
62
34
63
def normalize (self , results ):
35
64
for result in results :
36
- self .index [result .id ] = result
37
- self .logs .append (result .id )
65
+ self .index [result ['id' ]] = result
66
+ self .results .append (result ['id' ])
67
+ logger .debug ('normalized index: %s' , self .index )
68
+ logger .debug ('normalized results: %s' , self .results )
69
+
70
+ def handle_update (self , data ):
71
+ """
72
+ Check if an update operation on a row of message_logs table
73
+ affects our data, and update it if needed.
74
+ """
75
+ if self .skip (data ):
76
+ logger .debug ('skipping update data' , data )
77
+ return
78
+ if data ['id' ] in self .index :
79
+ self .index [data ['id' ]] = data
80
+
81
+ def handle_create (self , data ):
82
+ """
83
+ Check if an insert operation in message_logs table affects our
84
+ results. If yes, append it to results.
85
+ """
86
+ if self .skip (data ):
87
+ return
88
+ if self .results and data ['timestamp' ] < self .index [
89
+ self .results [- 1 ]]['timestamp' ]:
90
+ return
91
+ if not self .fetched :
92
+ return
93
+
94
+ logger .debug ('skip created data' , data )
95
+ self .index [data ['id' ]] = data
96
+ self .results .append (data ['id' ])
97
+ logger .debug ('updated results: %s, %s' , self .results , self .index )
38
98
39
- def handle_update (self ):
40
- pass
99
+ def skip (self , data ):
100
+ """
101
+ We'll skip updating our results if the insert/update event
102
+ is not relevant to us.
103
+ """
104
+ return data ['user_id' ] != self .user_id or \
105
+ data ['hostname' ] != self .hostname or \
106
+ data ['roomname' ] != self .roomname
41
107
42
- def handle_create (self ):
43
- pass
44
108
45
- def handle_delete (self ):
46
- pass
109
+ if __name__ == '__main__' :
110
+ import sys
111
+ from ircb .storeclient import initialize
112
+ from ircb .utils .config import load_config
113
+ load_config ()
114
+ initialize ()
115
+ try :
116
+ hostname = sys .argv [1 ]
117
+ roomname = sys .argv [2 ]
118
+ user_id = sys .argv [3 ]
119
+ except :
120
+ print ("Usage: __init__.py '<hostname>' '<roomname>' '<user_id>'" )
121
+ sys .exit (1 )
122
+ MessageLogPublisher (hostname , roomname , int (user_id ))
123
+ loop = asyncio .get_event_loop ()
124
+ loop .run_forever ()
0 commit comments