Skip to content

Commit

Permalink
Merge pull request #756 from hbs/udptcp-clients
Browse files Browse the repository at this point in the history
Added retrieval of store/directory clients
  • Loading branch information
hbs authored May 10, 2020
2 parents 9df6239 + c5088b7 commit ff5bc16
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
5 changes: 3 additions & 2 deletions warp10/src/main/java/io/warp10/plugins/tcp/TCPClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018 SenX S.A.S.
// Copyright 2018-2020 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import io.warp10.script.WarpScriptStack;
import io.warp10.script.WarpScriptStack.Macro;
import io.warp10.script.WarpScriptStackRegistry;
import io.warp10.warp.sdk.AbstractWarp10Plugin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +59,7 @@ public class TCPClient implements Runnable {
remotePort = this.socket.getPort();

this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), charset));
this.stack = new MemoryWarpScriptStack(null, null, new Properties());
this.stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
this.stack.setAttribute(WarpScriptStack.ATTRIBUTE_NAME, "[Warp10TCPPlugin " + socket.getLocalPort() + "]");
stack.maxLimits();
}
Expand Down
8 changes: 5 additions & 3 deletions warp10/src/main/java/io/warp10/plugins/tcp/TCPManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018 SenX S.A.S.
// Copyright 2018-2020 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
import io.warp10.script.MemoryWarpScriptStack;
import io.warp10.script.WarpScriptStack.Macro;
import io.warp10.script.WarpScriptStopException;
import io.warp10.warp.sdk.AbstractWarp10Plugin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,7 +119,7 @@ public TCPManager(Path p) throws Exception {
in.close();

warpscript = new String(baos.toByteArray(), StandardCharsets.UTF_8);
stack = new MemoryWarpScriptStack(null, null, new Properties());
stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
stack.maxLimits();

try {
Expand Down Expand Up @@ -189,7 +191,7 @@ private void initExecutors() {

for (int i = 0; i < parallelism; i++) {

final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(null, null, new Properties());
final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
stack.maxLimits();

final LinkedBlockingQueue<List<Object>> queue = queues[Math.min(i, queues.length - 1)];
Expand Down
8 changes: 5 additions & 3 deletions warp10/src/main/java/io/warp10/plugins/udp/UDPConsumer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2018 SenX S.A.S.
// Copyright 2018-2020 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,8 @@
import io.warp10.script.WarpScriptStack.Macro;
import io.warp10.script.WarpScriptStackRegistry;
import io.warp10.script.WarpScriptStopException;
import io.warp10.warp.sdk.AbstractWarp10Plugin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,7 +99,7 @@ public UDPConsumer(Path p) throws Exception {
in.close();

this.warpscript = new String(baos.toByteArray(), StandardCharsets.UTF_8);
this.stack = new MemoryWarpScriptStack(null, null, new Properties());
this.stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
stack.maxLimits();

try {
Expand Down Expand Up @@ -157,7 +159,7 @@ public void run() {

for (int i = 0; i < this.parallelism; i++) {

final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(null, null, new Properties());
final MemoryWarpScriptStack stack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
stack.setAttribute(WarpScriptStack.ATTRIBUTE_NAME, "[Warp10UDPPlugin " + socket.getLocalPort() + " #" + i + "]");
stack.maxLimits();

Expand Down

0 comments on commit ff5bc16

Please sign in to comment.