This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
/
MessageBuffer.java
79 lines (69 loc) · 3.46 KB
/
MessageBuffer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/*
* Copyright (c) 2017, 2018, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
* following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
* disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
*
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.salesforce.storm.spout.dynamic.buffer;
import com.salesforce.storm.spout.dynamic.Message;
import com.salesforce.storm.spout.dynamic.VirtualSpoutIdentifier;
import java.util.Map;
/**
* This interface defines an abstraction around essentially a concurrent queue.
* Abstracting this instead of using directly a queue object allows us to do things like
* implement a "fairness" algorithm on the poll() method for pulling off of the queue.
* Using a straight ConcurrentQueue would give us FIFO semantics (see {@link FifoBuffer})
* but with an abstraction we could implement round robin (see {@link RoundRobinBuffer}) across
* VirtualSpouts or any scheduling algorithm that we'd like.
*/
public interface MessageBuffer {
/**
* Called prior to utilizing the instance.
* @param spoutConfig copy of the storm topology config.
*/
void open(Map spoutConfig);
/**
* Let the Implementation know that we're adding a new VirtualSpoutId.
* @param virtualSpoutId identifier of new Virtual Spout
*/
void addVirtualSpoutId(final VirtualSpoutIdentifier virtualSpoutId);
/**
* Let the Implementation know that we're removing/cleaning up from closing a VirtualSpout.
* @param virtualSpoutId identifier of Virtual Spout to be cleaned up
*/
void removeVirtualSpoutId(final VirtualSpoutIdentifier virtualSpoutId);
/**
* Put a new message onto the queue. This method is blocking if the queue buffer is full.
* @param message message to be added to the queue
* @throws InterruptedException thrown if a thread is interrupted while blocked adding to the queue
*/
void put(final Message message) throws InterruptedException;
/**
* Get the size of the buffer.
* @return size of the buffer
*/
int size();
/**
* Get the next message to be processed out of the queue.
* @return next message to be processed out of the queue
*/
Message poll();
}