-
Notifications
You must be signed in to change notification settings - Fork 149
/
XrdClPostMaster.hh
207 lines (183 loc) · 9.13 KB
/
XrdClPostMaster.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//------------------------------------------------------------------------------
// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
// Author: Lukasz Janyst <ljanyst@cern.ch>
//------------------------------------------------------------------------------
// XRootD is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// XRootD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
//------------------------------------------------------------------------------
#ifndef __XRD_CL_POST_MASTER_HH__
#define __XRD_CL_POST_MASTER_HH__
#include <stdint.h>
#include <map>
#include <vector>
#include "XrdCl/XrdClStatus.hh"
#include "XrdCl/XrdClURL.hh"
#include "XrdCl/XrdClPostMasterInterfaces.hh"
#include "XrdSys/XrdSysPthread.hh"
namespace XrdCl
{
class Poller;
class TaskManager;
class Channel;
class JobManager;
//----------------------------------------------------------------------------
//! A hub for dispaching and receiving messages
//----------------------------------------------------------------------------
class PostMaster
{
public:
//------------------------------------------------------------------------
//! Constructor
//------------------------------------------------------------------------
PostMaster();
//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
virtual ~PostMaster();
//------------------------------------------------------------------------
//! Initializer
//------------------------------------------------------------------------
bool Initialize();
//------------------------------------------------------------------------
//! Finalizer
//------------------------------------------------------------------------
bool Finalize();
//------------------------------------------------------------------------
//! Start the post master
//------------------------------------------------------------------------
bool Start();
//------------------------------------------------------------------------
//! Stop the postmaster
//------------------------------------------------------------------------
bool Stop();
//------------------------------------------------------------------------
//! Reinitialize after fork
//------------------------------------------------------------------------
bool Reinitialize();
//------------------------------------------------------------------------
//! Send a message synchronously - synchronously means that
//! it will block until the message is written to a socket
//!
//! DEADLOCK WARNING: no lock should be taken while calling this method
//! that are used in the callback as well.
//!
//! @param url recipient of the message
//! @param msg message to be sent
//! @param statful physical stream disconnection causes an error
//! @param expires unix timestamp after which a failure should be
//! reported if sending was unsuccessful
//! @return success if the message has been pushed through the wire,
//! failure otherwise
//------------------------------------------------------------------------
Status Send( const URL &url,
Message *msg,
bool stateful,
time_t expires );
//------------------------------------------------------------------------
//! Send the message asynchronously - the message is inserted into the
//! send queue and a listener is called when the message is successuly
//! pushed through the wire or when the timeout elapses
//!
//! DEADLOCK WARNING: no lock should be taken while calling this method
//! that are used in the callback as well.
//!
//! @param url recipient of the message
//! @param msg message to be sent
//! @param expires unix timestamp after which a failure is reported
//! to the handler
//! @param handler handler will be notified about the status
//! @param stateful physical stream disconnection causes an error
//! @return success if the message was successfuly inserted
//! into the send quees, failure otherwise
//------------------------------------------------------------------------
Status Send( const URL &url,
Message *msg,
OutgoingMsgHandler *handler,
bool stateful,
time_t expires );
//------------------------------------------------------------------------
//! Synchronously receive a message - blocks until a message maching
//! a filter is found in the incomming queue or the timout passes
//!
//! @param url sender of the message
//! @param msg reference to a message pointer, the pointer will
//! point to the received message
//! @param filter filter object defining what to look for
//! @param expires expiration timestamp
//! @return success when the message has been received
//! successfuly, failure otherwise
//------------------------------------------------------------------------
Status Receive( const URL &url,
Message *&msg,
MessageFilter *filter,
time_t expires );
//------------------------------------------------------------------------
//! Listen to incomming messages, the listener is notified when a new
//! message arrives and when the timeout passes
//!
//! @param url sender of the message
//! @param handler handler to be notified about new messages
//! @param expires expiration timestamp
//! @return success when the listener has been inserted correctly
//------------------------------------------------------------------------
Status Receive( const URL &url,
IncomingMsgHandler *handler,
time_t expires );
//------------------------------------------------------------------------
//! Query the transport handler for a given URL
//!
//! @param url the channel to be queried
//! @param query the query as defined in the TransportQuery struct or
//! others that may be recognized by the protocol transport
//! @param result the result of the query
//! @return status of the query
//------------------------------------------------------------------------
Status QueryTransport( const URL &url,
uint16_t query,
AnyObject &result );
//------------------------------------------------------------------------
//! Register channel event handler
//------------------------------------------------------------------------
Status RegisterEventHandler( const URL &url,
ChannelEventHandler *handler );
//------------------------------------------------------------------------
//! Remove a channel event handler
//------------------------------------------------------------------------
Status RemoveEventHandler( const URL &url,
ChannelEventHandler *handler );
//------------------------------------------------------------------------
//! Get the task manager object user by the post master
//------------------------------------------------------------------------
TaskManager *GetTaskManager()
{
return pTaskManager;
}
//------------------------------------------------------------------------
//! Get the job manager object user by the post master
//------------------------------------------------------------------------
JobManager *GetJobManager()
{
return pJobManager;
}
private:
Channel *GetChannel( const URL &url );
typedef std::map<std::string, Channel*> ChannelMap;
Poller *pPoller;
TaskManager *pTaskManager;
ChannelMap pChannelMap;
XrdSysMutex pChannelMapMutex;
bool pInitialized;
JobManager *pJobManager;
};
}
#endif // __XRD_CL_POST_MASTER_HH__