forked from jan-hoelscher-mss/safmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ServiceThread.h
executable file
·110 lines (77 loc) · 3.12 KB
/
ServiceThread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright 2004-2008 Matthew J. Battey
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
This software implements a platform independent Store and Forward Message Queue.
*/
#if !defined(_SERVICETHREAD_H_)
#define _SERVICETHREAD_H_
#pragma warning(disable: 4786)
#include "tcpsocket/socstream.h"
#include "safmq.h"
#include "QStorage.h"
#include "thdlib.h"
namespace safmq {
class TransactionManager;
class CommandFactory;
class ServiceThread : public Thread
{
public: // typedefs
typedef SAFMQ_INT32 QueueHandle;
public: // implementation
explicit ServiceThread(tcpsocket::socstream* client, bool trusted=false, const std::string& trustedUser="");
virtual ~ServiceThread();
bool isTrusted() const;
const std::string& getUsername() const;
void setUsername(const std::string& username);
const sockaddr_in* getPeername() const;
void getClientInfo(ClientInfo& ci);
QStorage* getOpenQueue(QueueHandle qh);
TransactionManager* getTransactionManager();
bool loggedIn();
void addCursor(QueueHandle queueH, QStorage::CursorHandle cursorH);
void removeCursor(QueueHandle queueH, QStorage::CursorHandle cursorH);
ServiceThread::QueueHandle registerQueue(QStorage* pqueue);
ErrorCode shutdownQueue(ServiceThread::QueueHandle queueID);
protected: // implementation
void* run();
void UnknownMessage(char command);
void Login();
protected: //implementation
ErrorCode commitRetrieve(ServiceThread::QueueHandle qid, const QStorage::CursorHandle& cursor);
ErrorCode rollbackRetrieve(ServiceThread::QueueHandle qid, const QStorage::CursorHandle& cursor);
ErrorCode commitEnqueue(ServiceThread::QueueHandle qid, const QStorage::CursorHandle& cursor);
ErrorCode rollbackEnqueue(ServiceThread::QueueHandle qid, const QStorage::CursorHandle& cursor);
protected: // data
typedef std::map<QueueHandle, QStorage*> QueueMap;
typedef std::set<std::pair<QueueHandle, QStorage::CursorHandle> > CursorSet;
tcpsocket::socstream* client;
std::string username;
struct ::sockaddr_in peername;
bool logged_in;
bool trusted;
QueueMap queues;
CursorSet cursors;
TransactionManager* transactionManager;
CommandFactory* commandFactory;
Mutex loginMtx;
friend class TransactionManager;
};
class ClientRegistry
{
public:
static void RegisterClient(ServiceThread* pService);
static void UnregisterClient(ServiceThread* pService);
static void GetClientInfo(std::list<ClientInfo>& info);
private:
static std::list<ServiceThread*> clients;
static Mutex mtx;
};
} // end of samfq namespace
#endif // !defined(_SERVICETHREAD_H_)