Skip to content

Commit

Permalink
PublishersCompiler: namespace publishers to avoid conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre-Alexandre Meyer <pierre@ning.com>
  • Loading branch information
Pierre-Alexandre Meyer committed Feb 17, 2012
1 parent 3f0f57a commit 7a6b305
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
Expand Up @@ -116,9 +116,10 @@ public void addStream(final StreamConfig streamConfig) throws ClassNotFoundExcep
// Instantiate its publisher(s)
final LinkedHashMap<String, UpdateListener> publishers = new LinkedHashMap<String, UpdateListener>();
for (final PublisherConfig route : streamConfig.getPublishers()) {
publishers.put(route.getType(), instantiateUpdateListener(route));
final UpdateListener updateListener = instantiateUpdateListener(route);
publishers.put(route.getType(), updateListener);
publisherInstances.put(streamConfig.getName(), updateListener);
}
publisherInstances.putAll(publishers);

// Add the stream in the Esper engine
for (final String sqlStatement : streamConfig.getSql()) {
Expand Down
Expand Up @@ -55,36 +55,38 @@ public class StreamResource
private static final Logger log = LoggerFactory.getLogger(StreamResource.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private final ResourceListener resourceListener;
private final PublishersCompiler compiler;

@Inject
public StreamResource(final PublishersCompiler compiler)
{
this.compiler = compiler;
this.resourceListener = (ResourceListener) this.compiler.getPublisherInstances().get(ResourceListener.class.getName());
}

/**
* Get the data points associated with a field in an Esper query.
* For this to work, you need to add ResourceListener as a publisher:
* For this to work, you need to add ResourceListener as a publisher and configure it in a stream:
* {
* "name": "Jansky",
* "type": "com.ning.metrics.meteo.publishers.ResourceListener",
* "@class": "com.ning.metrics.meteo.publishers.ResourcePublisherConfig"
* }
*
* @param callback Javascript callback
* @param stream Stream name
* @param attribute the SQL alias of an Esper query
* @return jsonp representation of the data points in memory
*/
@GET
@Path("/{attribute}")
@Path("/{stream}/{attribute}")
@Produces(MediaType.APPLICATION_JSON)
public Response getSamplesByRoute(@QueryParam("callback") @DefaultValue("callback") final String callback,
@PathParam("stream") final String stream,
@PathParam("attribute") final String attribute)
{
Cache<Object, Object> samplesCache = null;

final ResourceListener resourceListener = (ResourceListener) compiler.getPublisherInstances().get(stream);
if (resourceListener != null) {
final Map<String, Cache<Object, Object>> samples = resourceListener.getSamplesCache();
samplesCache = samples.get(attribute);
Expand All @@ -103,6 +105,10 @@ public Response getSamplesByRoute(@QueryParam("callback") @DefaultValue("callbac
@Consumes(MediaType.APPLICATION_JSON)
public Response addStream(final StreamConfig streamConfig)
{
if (compiler.getPublisherInstances().get(streamConfig.getName()) != null) {
return Response.status(Response.Status.CONFLICT).header("Warning", "199 Stream already exists").build();
}

try {
compiler.addStream(streamConfig);
}
Expand Down

0 comments on commit 7a6b305

Please sign in to comment.