Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/cloud_object_storage/cos_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
sink_config['region'] = region

# Run the sink
source = stream.add_sink(sink_config)
sink = stream.add_sink(sink_config)

# Send file contents to Cloud Object Storage:
stream << "File contents sample!"
Expand Down
8 changes: 5 additions & 3 deletions examples/cloud_object_storage/cos_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import ray
import rayvens
import sys
import time
import json

# This example demonstrates how to receive objects from AWS S3 or
# IBM Cloud Object Storage. It requires a bucket name, HMAC credentials,
Expand Down Expand Up @@ -62,12 +62,14 @@

def process_file(event):
print(f'received {len(event)} bytes')
json_event = json.loads(event)
print("Contents:")
print(event)
print(json_event['filename'])
print(json_event['body'])


# Log object sizes to the console
stream >> process_file

# Run for a while
time.sleep(10)
stream.disconnect_all(after_idle_for=2)
8 changes: 4 additions & 4 deletions examples/cloud_object_storage/cos_source_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import ray
import rayvens
import sys
import time
import json

# This example demonstrates how to receive events about data becoming available
# in Cloud Object Storage.
Expand Down Expand Up @@ -61,13 +61,13 @@

def process_file(event):
print(f'received {len(event)} bytes')
json_event = json.loads(event)
print("Contents:")
print(event)
print(json_event['filename'])


# Log object sizes to the console
stream >> process_file
print("Waiting for event...")

# Run for a while
time.sleep(10)
stream.disconnect_all(after_idle_for=2)
8 changes: 5 additions & 3 deletions examples/cloud_object_storage/cos_source_move_after_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import ray
import rayvens
import sys
import time
import json

# This example demonstrates how to receive objects from AWS S3 or
# IBM Cloud Object Storage. It requires a bucket name, HMAC credentials,
Expand Down Expand Up @@ -63,12 +63,14 @@

def process_file(event):
print(f'received {len(event)} bytes')
json_event = json.loads(event)
print("Contents:")
print(event)
print(json_event['filename'])
print(json_event['body'])


# Log object sizes to the console
stream >> process_file

# Run for a while
time.sleep(10)
stream.disconnect_all(after_idle_for=2)
22 changes: 15 additions & 7 deletions rayvens/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,25 @@ def _wait_for_timeout(self, after_idle_for, after):
while True:
time_elapsed_since_last_event = self._idle_time()

# Idle timeout exceeds the user-specified time limit:
if time_elapsed_since_last_event > after_idle_for:
break

# Check again after waiting for the rest of the timeout time:
time.sleep(after_idle_for - time_elapsed_since_last_event + 1)
if time_elapsed_since_last_event is not None:
# Idle timeout exceeds the user-specified time limit:
if time_elapsed_since_last_event > after_idle_for:
break

# Check again after waiting for the rest of the timeout
# time:
time.sleep(after_idle_for - time_elapsed_since_last_event +
1)
else:
time.sleep(after_idle_for)
if after is not None and after > 0:
time.sleep(after)

def _idle_time(self):
return time.time() - ray.get(self.actor._get_latest_timestamp.remote())
latest_timestamp = ray.get(self.actor._get_latest_timestamp.remote())
if latest_timestamp is None:
return None
return time.time() - latest_timestamp


@ray.remote(num_cpus=0)
Expand Down
74 changes: 74 additions & 0 deletions rayvens/core/FileQueueJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright IBM Corporation 2021
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.io.File;

import org.apache.camel.BindToRegistry;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.json.simple.JSONObject;

class Recv implements Processor {
BlockingQueue<Object> queue;

public Recv(BlockingQueue<Object> queue) {
this.queue = queue;
}

public void process(Exchange exchange) throws Exception {
JSONObject returnJsonObject = new JSONObject();
String body = exchange.getIn().getBody(String.class);
returnJsonObject.put("body", body);

Object key = exchange.getIn().getHeader("CamelAwsS3Key");
returnJsonObject.put("filename", key.toString());
queue.add(returnJsonObject.toString());
}
}

class Send implements Processor {
BlockingQueue<Object> queue;

public Send(BlockingQueue<Object> queue) {
this.queue = queue;
}

public void process(Exchange exchange) throws Exception {
Object body = queue.take();
exchange.getIn().setBody(body);
}
}

public class FileQueueJson extends RouteBuilder {
BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();

@BindToRegistry
public Recv addToFileJsonQueue() {
return new Recv(queue);
}

@BindToRegistry
public Send takeFromFileJsonQueue() {
return new Send(queue);
}

@Override
public void configure() throws Exception {
}
}
6 changes: 4 additions & 2 deletions rayvens/core/MetaEventQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.json.simple.JSONObject;

class Recv implements Processor {
BlockingQueue<Object> queue;
Expand All @@ -33,9 +34,10 @@ public Recv(BlockingQueue<Object> queue) {
}

public void process(Exchange exchange) throws Exception {
JSONObject returnJsonObject = new JSONObject();
Object key = exchange.getIn().getHeader("CamelAwsS3Key");
exchange.getIn().setBody("None");
queue.add(key.toString());
returnJsonObject.put("filename", key.toString());
queue.add(returnJsonObject.toString());
}
}

Expand Down
9 changes: 3 additions & 6 deletions rayvens/core/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ def construct_source(config, endpoint, inverted=False):
'meta_event_only' in config and config['meta_event_only']:
take_from_queue = 'takeFromMetaEventQueue'
add_to_queue = 'addToMetaEventQueue'
elif config['kind'] == 'cloud-object-storage-source':
take_from_queue = 'takeFromFileJsonQueue'
add_to_queue = 'addToFileJsonQueue'

# Multi-source integration with several routes:
if isinstance(spec, list):
Expand Down Expand Up @@ -538,12 +541,6 @@ def cos_sink(config):
# Create new file route for final spec:
file_spec = {'steps': []}
file_spec['steps'].append({'bean': 'processFile'})
# file_spec['steps'].append({
# 'set-header': {
# 'name': 'CamelAwsS3Key',
# 'simple': uploaded_file_name
# }
# })
file_spec['steps'].append({'to': uri})
spec_list.append((file_spec, from_uri))
elif 'from_directory' in config:
Expand Down
30 changes: 29 additions & 1 deletion rayvens/core/kamel.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def run(integration_content,
queue = os.path.join(os.path.dirname(__file__), 'FileQueue.java')
command.append(queue)

if _integration_requires_file_queue_json(integration_content):
queue = os.path.join(os.path.dirname(__file__),
'FileQueueJson.java')
command.append(queue)

command.append("-d")
command.append("mvn:com.googlecode.json-simple:json-simple:1.1.1")

if _integration_requires_file_watch_queue(integration_content):
queue = os.path.join(os.path.dirname(__file__),
'FileWatchQueue.java')
Expand Down Expand Up @@ -167,14 +175,23 @@ def local_run(integration_content,
'ProcessFile.java')
command.append(process_file)

# Use appropriate Queue type(s).
if mode.transport == 'http':
# Use appropriate Queue type(s).
if integration_type == 'source':
if _integration_requires_file_queue(integration_content):
queue = os.path.join(os.path.dirname(__file__),
'FileQueue.java')
command.append(queue)

if _integration_requires_file_queue_json(integration_content):
queue = os.path.join(os.path.dirname(__file__),
'FileQueueJson.java')
command.append(queue)

command.append("-d")
command.append(
"mvn:com.googlecode.json-simple:json-simple:1.1.1")

if _integration_requires_file_watch_queue(integration_content):
queue = os.path.join(os.path.dirname(__file__),
'FileWatchQueue.java')
Expand All @@ -185,6 +202,10 @@ def local_run(integration_content,
'MetaEventQueue.java')
command.append(queue)

command.append("-d")
command.append(
"mvn:com.googlecode.json-simple:json-simple:1.1.1")

if _integration_requires_queue(integration_content):
queue = os.path.join(os.path.dirname(__file__), 'Queue.java')
command.append(queue)
Expand Down Expand Up @@ -262,6 +283,13 @@ def _integration_requires_file_queue(integration_content):
return False


def _integration_requires_file_queue_json(integration_content):
configuration = yaml.dump(integration_content)
if 'bean: addToFileJsonQueue' in configuration:
return True
return False


def _integration_requires_file_watch_queue(integration_content):
configuration = yaml.dump(integration_content)
if 'bean: addToFileWatchQueue' in configuration:
Expand Down