In [1]:
import streamz
import cudf

In [2]:
from gquant.dataframe_flow import Node
from gquant.dataframe_flow import NodePorts, PortsSpecSchema, TaskSpecSchema
from gquant.dataframe_flow import ConfSchema
from gquant.dataframe_flow import TaskGraph

## Double the Streaming Numbers by Streamz 

### Define the Source Stream Node

In [3]:
class StreamNode(Node):

    def ports_setup(self):
        input_ports = {}
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Source node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        output = {}
        output.update({'stream_out': streamz.Stream()})
        return output


### Define the Double Element Node

In [4]:
class TransformNode(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Transform Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        output = {}
        def double(ele):
            return ele*2
        output.update({'stream_out': in_stream.map(double)})
        return output


### Define the Print Elements Node

In [5]:
class SinkNode(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Simple SinkNode configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        output = {}
        output.update({'stream_out': in_stream.sink(print)})
        return output

### Define the Graph Programmatically 

In [6]:
module_name = 'streamz'
source_spec = {
    TaskSpecSchema.task_id: 'source',
    TaskSpecSchema.node_type: StreamNode,
    TaskSpecSchema.conf: {},
    TaskSpecSchema.inputs: {},
    TaskSpecSchema.module: module_name
}


task_spec = {
    TaskSpecSchema.task_id: 'double',
    TaskSpecSchema.node_type: TransformNode,
    TaskSpecSchema.conf: {},
    TaskSpecSchema.inputs: {
        'stream_in': 'source.stream_out'
    },
    TaskSpecSchema.module: module_name
}

sink_spec = {
    TaskSpecSchema.task_id: 'sink',
    TaskSpecSchema.node_type: SinkNode,
    TaskSpecSchema.conf: {},
    TaskSpecSchema.inputs: {
        'stream_in': 'double.stream_out'
    },
    TaskSpecSchema.module: module_name
}

out_spec = {
    TaskSpecSchema.task_id: '',
    TaskSpecSchema.node_type: "Output_Collector",
    TaskSpecSchema.conf: {},
    TaskSpecSchema.inputs: {
        'in0': 'sink.stream_out',
        'in1': 'source.stream_out'
    }
}

task_list = [source_spec, task_spec, sink_spec, out_spec]

In [7]:
taskgraph = TaskGraph()
taskgraph.draw()

GQuantWidget(sub=HBox())

### Dynamically Register the GQuant Nodes

In [8]:
taskgraph.register_node(module_name, StreamNode)
taskgraph.register_node(module_name, TransformNode)
taskgraph.register_node(module_name, SinkNode)
task_graph = TaskGraph(task_list)
task_graph.draw()

GQuantWidget(sub=HBox(), value=[OrderedDict([('id', 'source'), ('type', 'StreamNode'), ('conf', {}), ('inputs'…

In [9]:
r = task_graph.run()

In [10]:
r.get_keys()

('sink.stream_out', 'source.stream_out')

In [11]:
for i in range(10):
    r['source.stream_out'].emit(i)

0
2
4
6
8
10
12
14
16
18


## Show the Doubling Results in a Sliding Window

### Define the Plot Node

In [12]:
import bqplot
import numpy as np
import bqplot.pyplot as plt

class PlotSinkNode(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'fig_out': {
                PortsSpecSchema.port_type: bqplot.Figure
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Plot configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'fig_out': {}
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        axes_options = {'x': {'label': 'x'}, 'y': {'label': 'y'}}
        x = []
        y = []
        fig = plt.figure(animation_duration=10)
        lines = plt.plot(x=x, y=y, colors=['red', 'green'], axes_options=axes_options)
        output = {}
        def update(numbers):
            if isinstance(numbers[0], tuple):
                elements  = len(numbers[0])
            else:
                elements = len(numbers)
            x = np.arange(elements)
            if isinstance(numbers, cudf.Series):
                y = numbers.to_array()
            else:
                y = np.array(numbers)
            lines.x = x
            lines.y = y
        in_stream.sink(update)
        output.update({'fig_out': fig})
        return output

### Define the Sliding Window Node

In [13]:
class SlideWindowNode(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "SlideWindow Node configure",
            "type": "object",
            "properties": {
                "window": {
                    "type": "integer"
                }
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        output = {}
        output.update({'stream_out': in_stream.sliding_window(self.conf['window'], return_partial=False)})
        return output

In [14]:
taskgraph.register_node(module_name, PlotSinkNode)
taskgraph.register_node(module_name, SlideWindowNode)

In [15]:
task_graph = TaskGraph.load_taskgraph('../taskgraphs/streamz/plot.gq.yaml')
task_graph.draw()

GQuantWidget(sub=HBox(), value=[OrderedDict([('id', 'source'), ('type', 'StreamNode'), ('conf', {}), ('inputs'…

In [16]:
r = task_graph.run()

In [17]:
r.get_keys()

('plot.fig_out', 'source.stream_out')

In [18]:
r['plot.fig_out']

Figure(animation_duration=10, axes=[Axis(label='x', scale=LinearScale(), side='bottom'), Axis(label='y', orien…

In [19]:
for i in range(1000):
    r['source.stream_out'].emit(np.sin(i/3.14))

## Process Two Branches of the stream

### Define the Zip Node to Combine Branches

In [20]:
class ZipNode(Node):

    def ports_setup(self):
        input_ports = {
             'stream1_in': {
                PortsSpecSchema.port_type: streamz.Stream
            }, 
             'stream2_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },       
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Zip Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream1_in': {'element': 'number'},
            'stream2_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream1 = inputs['stream1_in']
        in_stream2 = inputs['stream2_in']
        output = {}
        output.update({'stream_out': in_stream1.zip(in_stream2)})
        return output

In [21]:
taskgraph.register_node(module_name, ZipNode)

In [22]:
task_graph = TaskGraph.load_taskgraph('../taskgraphs/streamz/two_branches.gq.yaml')
task_graph.draw()

GQuantWidget(sub=HBox(), value=[OrderedDict([('id', 'source'), ('type', 'StreamNode'), ('conf', {}), ('inputs'…

In [23]:
r = task_graph.run()

In [24]:
r.get_keys()

('source.stream_out', 'plot.fig_out')

In [25]:
r['plot.fig_out']

Figure(animation_duration=10, axes=[Axis(label='x', scale=LinearScale(), side='bottom'), Axis(label='y', orien…

In [26]:
for i in range(1000):
    r['source.stream_out'].emit(np.sin(i/3.14))

## Use GPU to Accelerate

### Define the Node to Convert a Tuple of Numbers to Cudf Dataframe

In [27]:
class TupleToCudf(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "Tuple of data to Cudf dataframe Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        output = {}
        def convert(ele):
            df = cudf.DataFrame({'x': ele})
            return df
        output.update({'stream_out': in_stream.map(convert)})
        return output

### Define the Node to Convert Cudf stream to streamz.DataFrame 

In [28]:


# Create a streamz dataframe to get stateful word count
class ToDataFrame(Node):

    def ports_setup(self):
        input_ports = {
             'stream_in': {
                PortsSpecSchema.port_type: streamz.Stream
            },           
        }
        output_ports = {
            'df_out': {
                PortsSpecSchema.port_type: streamz.dataframe.DataFrame
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "streamz Dataframe Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        import streamz.dataframe
        self.required = {}

    def columns_setup(self):
        self.required = {
            'stream_in': {'element': 'number'}
        }
        columns_out = {
            'df_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_stream = inputs['stream_in']
        output = {}
        sdf = streamz.dataframe.DataFrame(in_stream, example=cudf.DataFrame({'x':[]}))
        output.update({'df_out': sdf})
        return output

## Define the Node to Convert streamz.DataFrame to Cudf.Series stream

In [29]:
class ToStream(Node):

    def ports_setup(self):
        input_ports = {
             'df_in': {
                PortsSpecSchema.port_type: streamz.dataframe.DataFrame
            },           
        }
        output_ports = {
            'stream_out': {
                PortsSpecSchema.port_type: streamz.Stream
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "df to stream Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        import streamz.dataframe
        self.required = {}

    def columns_setup(self):
        self.required = {
            'df_in': {'element': 'number'}
        }
        columns_out = {
            'stream_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_df = inputs['df_in']
        output = {}
        output.update({'stream_out': in_df.stream.pluck('x')})
        return output

### Define the Node to Double the numbers via streamz.DataFrame API

In [30]:
class GPUDouble(Node):

    def ports_setup(self):        
        input_ports = {
             'df_in': {
                PortsSpecSchema.port_type: streamz.dataframe.DataFrame
            },           
        }
        output_ports = {
            'df_out': {
                PortsSpecSchema.port_type: streamz.dataframe.DataFrame
            },
        }
        return NodePorts(inports=input_ports, outports=output_ports)

    def conf_schema(self):
        json = {
            "title": "GPU double Node configure",
            "type": "object",
            "properties": {
            }
        }
        ui = {}
        return ConfSchema(json=json, ui=ui)

    def init(self):
        import streamz.dataframe
        self.required = {}

    def columns_setup(self):
        self.required = {
            'df_in': {'element': 'number'}
        }
        columns_out = {
            'df_out': {'element': 'number'
            },
        }
        return columns_out

    def process(self, inputs):
        in_df = inputs['df_in']
        in_df['x'] = in_df['x'] * 2
        output = {}
        output.update({'df_out': in_df})
        return output

In [31]:
taskgraph.register_node(module_name, TupleToCudf)
taskgraph.register_node(module_name, ToDataFrame)
taskgraph.register_node(module_name, GPUDouble)
taskgraph.register_node(module_name, ToStream)

In [32]:
task_graph = TaskGraph.load_taskgraph('../taskgraphs/streamz/gpu_double.gq.yaml')
task_graph.draw()

GQuantWidget(sub=HBox(), value=[OrderedDict([('id', 'source'), ('type', 'StreamNode'), ('conf', {}), ('inputs'…

In [33]:
r = task_graph.run()

In [34]:
r['plot.fig_out']

Figure(animation_duration=10, axes=[Axis(label='x', scale=LinearScale(), side='bottom'), Axis(label='y', orien…

In [35]:
for i in range(1000):
    r['source.stream_out'].emit(np.sin(i/3.14))

0     0.000000
1     0.626230
2     1.189481
3     1.633106
4     1.912490
5     1.999535
6     1.885488
7     1.581818
8     1.119066
9     0.543769
10   -0.086215
11   -0.707528
12   -1.257685
13   -1.681357
14   -1.935935
15   -1.995818
16   -1.854981
17   -1.527590
18   -1.046570
19   -0.460296
20    0.172269
21    0.787510
22    1.323550
23    1.726482
24    1.955782
25    1.988390
26    1.821026
27    1.470522
28    0.972128
29    0.375968
30   -0.258003
31   -0.866027
32   -1.386955
33   -1.768398
34   -1.971993
35   -1.977266
36   -1.783685
37   -1.410720
38   -0.895880
39   -0.290941
40    0.343258
41    0.942935
42    1.447782
43    1.807026
44    1.984538
45    1.962465
46    1.743028
47    1.348296
48    0.817966
49    0.205373
Name: x, dtype: float64
0     0.626230
1     1.189481
2     1.633106
3     1.912490
4     1.999535
5     1.885488
6     1.581818
7     1.119066
8     0.543769
9    -0.086215
10   -0.707528
11   -1.257685
12   -1.681357
13   -1.935935
14   -1.995818
1