Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions examples/kafka/kafka_event_streaming_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ray
import rayvens
import sys

# Send message to Slack sink using the kafka transport.

# Command line arguments and validation:
if len(sys.argv) < 4:
print(f'usage: {sys.argv[0]} <brokers> <password> <slack_channel>'
'<slack_webhook> <run_mode> OR'
f' {sys.argv[0]} <slack_channel> <slack_webhook> <run_mode>')
sys.exit(1)

# Brokers and run mode:
brokers = None
password = None
slack_channel = sys.argv[1]
slack_webhook = sys.argv[2]
run_mode = sys.argv[3]
if len(sys.argv) == 6:
brokers = sys.argv[1]
password = sys.argv[2]
slack_channel = sys.argv[3]
slack_webhook = sys.argv[4]
run_mode = sys.argv[5]

if run_mode not in ['local', 'mixed', 'operator']:
raise RuntimeError(f'Invalid run mode provided: {run_mode}')

# The Kafka topic used for communication.
topic = "externalTopicSink"

# Initialize ray either on the cluster or locally otherwise.
if run_mode == 'operator':
ray.init(address='auto')
else:
ray.init()

# Start rayvens in operator mode."
rayvens.init(mode=run_mode, transport="kafka")

# Create stream.
stream = rayvens.Stream('slack')

# Event sink config.
sink_config = dict(kind='slack-sink',
channel=slack_channel,
webhookUrl=slack_webhook,
kafka_transport_topic=topic,
kafka_transport_partitions=3)

# Add sink to stream.
sink = stream.add_sink(sink_config)

# Sends message to all sinks attached to this stream.
stream << f'Message to Slack sink in run mode {run_mode} and Kafka transport.'

# Disconnect any sources or sinks attached to the stream 2 seconds after
# the stream is idle (i.e. no events were propagated by the stream).
stream.disconnect_all(after_idle_for=2)
72 changes: 72 additions & 0 deletions examples/kafka/kafka_event_streaming_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ray
import rayvens
import sys

# Event streaming from a third-party external source using Kafka.

# Command line arguments and validation:
if len(sys.argv) < 2:
print(f'usage: {sys.argv[0]} <brokers> <password> <run_mode> OR'
f' {sys.argv[0]} <run_mode>')
sys.exit(1)

# Brokers and run mode:
brokers = None
password = None
run_mode = sys.argv[1]
if len(sys.argv) == 4:
brokers = sys.argv[1]
password = sys.argv[2]
run_mode = sys.argv[3]

if run_mode not in ['local', 'mixed', 'operator']:
raise RuntimeError(f'Invalid run mode provided: {run_mode}')

# The Kafka topic used for communication.
topic = "externalTopicSource"

# Initialize ray either on the cluster or locally otherwise.
if run_mode == 'operator':
ray.init(address='auto')
else:
ray.init()

# Start rayvens in operator mode."
rayvens.init(mode=run_mode, transport="kafka")

# Create stream.
stream = rayvens.Stream('http')

# Event source config.
source_config = dict(
kind='http-source',
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
route='/from-http',
period=3000,
kafka_transport_topic=topic,
kafka_transport_partitions=3)

# Attach source to stream.
source = stream.add_source(source_config)

# Log all events from stream-attached sources.
stream >> (lambda event: print('LOG:', event))

# Disconnect source after 10 seconds.
stream.disconnect_all(after=10)
113 changes: 113 additions & 0 deletions examples/kafka/kafka_partitioned_source_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#
# Copyright IBM Corporation 2021
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ray
import rayvens
import sys
import time

# An artificial example of using Kafka sources and sink.
# Typically the user application will interact with an external Kafka
# service to either subscribe or publish data to other services:
#
# EXT. SERVICE => KAFKA => RAYVENS KAFKA SOURCE
# or
# RAYVENS KAFKA SINK => KAFKA => EXT. SERVICE
#
# In this example we put together an artificial example where, to
# demonstrate both Kafka sources and sinks at the same time we
# set a Kafka sink to publish to a test topic then have a Kafka
# source read from that test topic:
#
# RAYVENS KAFKA SINK => KAFKA => RAYVENS KAFKA SOURCE
#

# Command line arguments and validation:
if len(sys.argv) < 2:
print(f'usage: {sys.argv[0]} <brokers> <password> <run_mode> OR'
f' {sys.argv[0]} <run_mode>')
sys.exit(1)

# Brokers and run mode:
brokers = None
password = None
run_mode = sys.argv[1]
if len(sys.argv) == 4:
brokers = sys.argv[1]
password = sys.argv[2]
run_mode = sys.argv[3]

if run_mode not in ['local', 'mixed', 'operator']:
raise RuntimeError(f'Invalid run mode provided: {run_mode}')

# The Kafka topic used for communication.
topic = "externalTopic"

# If using the Kafka broker started by Rayvens the following brokers
# are possible:
# - from inside the cluster: kafka:9092
# - from outside the cluster: localhost:31093
# If using a different Kafka service please provide the brokers in the
# form of host:port,host1:port1, ... .
if brokers is None:
brokers = 'localhost:31093'
if run_mode == 'operator':
brokers = "kafka:9092"

# Initialize ray either on the cluster or locally otherwise.
if run_mode == 'operator':
ray.init(address='auto')
else:
ray.init()

# Start rayvens in operator mode.
rayvens.init(mode=run_mode)

# Create source stream and configuration.
source_stream = rayvens.Stream('kafka-source-stream')

source_config = dict(kind='kafka-source',
route='/fromkafka',
topic=topic,
brokers=brokers,
partitions=3)
if password is not None:
source_config['SASL_password'] = password
source = source_stream.add_source(source_config)
# Log all events from stream-attached sources.
source_stream >> (lambda event: print('KAFKA SOURCE:', event))

# Create sink stream and configuration.
sink_stream = rayvens.Stream('kafka-sink-stream')
sink_config = dict(kind='kafka-sink',
route='/tokafka',
topic=topic,
brokers=brokers,
partitions=3)
if password is not None:
sink_config['SASL_password'] = password
sink = sink_stream.add_sink(sink_config)

time.sleep(10)

# Sends message to all sinks attached to this stream.
sink_stream << f'Sending message to Kafka sink in run mode {run_mode}.'

# Give a grace period to the message to propagate then disconnect source
# and sink.
time.sleep(30)
source_stream.disconnect_all()
sink_stream.disconnect_all()
8 changes: 4 additions & 4 deletions rayvens/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ def produce(self, data):
self.producer.produce(self.name, data.encode('utf-8'))


def kafka_send_to(integration_name, handle):
def kafka_send_to(kafka_transport_topic, handle):
# use kafka consumer thread to push from camel source to rayvens stream
consumer = Consumer({
'bootstrap.servers': brokers(),
'group.id': 'ray',
'auto.offset.reset': 'latest'
})

consumer.subscribe([integration_name])
consumer.subscribe([kafka_transport_topic])

def append():
while True:
Expand All @@ -168,9 +168,9 @@ def append():
threading.Thread(target=append).start()


def kafka_recv_from(integration_name, handle):
def kafka_recv_from(integration_name, kafka_transport_topic, handle):
# use kafka producer actor to push from rayvens stream to camel sink
helper = KafkaProducerActor.remote(integration_name)
helper = KafkaProducerActor.remote(kafka_transport_topic)
handle.send_to.remote(helper, integration_name)


Expand Down
35 changes: 34 additions & 1 deletion rayvens/core/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
#

from rayvens.core.utils import random_port
from rayvens.core.common import brokers
from rayvens.core.utils import random_port, create_partitioned_topic
from rayvens.core.name import name_integration
from rayvens.core import catalog
from rayvens.core import kamel
Expand All @@ -39,10 +40,17 @@ def __init__(self, stream_name, source_sink_name, config):

self.integration_name = name_integration(self.stream_name,
self.source_sink_name)

# Establish kafka transport topic name:
self.kafka_transport_topic = self.integration_name
if "kafka_transport_topic" in config:
self.kafka_transport_topic = config["kafka_transport_topic"]

self.port = random_port()
self.invocation = None
self.service_name = None
self.server_address = None
self.environment_preparators = []

def invoke_local_run(self, mode, integration_content):
self.invocation = kamel.local_run(
Expand Down Expand Up @@ -107,6 +115,31 @@ def accepts_data_type(self, data):
return True
return False

# Method that checks if, based on the configuration, the integration
# requires something to be run or created before the integration is run.
def prepare_environment(self, mode):
# Create a multi-partition topic for a kafka source/sink.
if (self.config['kind'] == 'kafka-source' or
self.config['kind'] == 'kafka-sink') and \
'partitions' in self.config and self.config['partitions'] > 1:
topic = self.config['topic']
partitions = self.config['partitions']
kafka_brokers = self.config['brokers']

# Create topic
create_partitioned_topic(topic, partitions, kafka_brokers)

# Create a multi-partition topic for the Kafka transport of a
# source/sink.
if mode.transport == "kafka" and \
'kafka_transport_partitions' in self.config and \
self.config['kafka_transport_partitions'] > 1:
partitions = self.config['kafka_transport_partitions']

# Create topic
create_partitioned_topic(self.kafka_transport_topic, partitions,
brokers())

def route(self, default=None):
if 'route' in self.config and self.config['route'] is not None:
return self.config['route']
Expand Down
Loading