Skip to content

licaonfee/selina

Repository files navigation

selina

Test Go Report Card Coverage Status godoc

Simple Pipeline for go, inspired on ratchet https://github.com/dailyburn/ratchet

Unstable API, please use go modules

Installation

go get github.com/licaonfee/selina

Usage

package main

import (
    "fmt"
    "os"
    "strings"
    "context"

    "github.com/licaonfee/selina"
    "github.com/licaonfee/selina/workers/regex"
    "github.com/licaonfee/selina/workers/text"
)

const sample = `this is a sample text
this is second line
#lines started with # will be skipped
this line pass`

func main() {
    rd := strings.NewReader(sample)
    input := selina.NewNode("Read", text.NewReader(text.ReaderOptions{Reader: rd}))
    //https://regex101.com/r/7ZS3Uw/1
    filter := selina.NewNode("Filter", regex.NewFilter(regex.FilterOptions{Pattern: "^[^#].+"}))
    output := selina.NewNode("Write", text.NewWriter(text.WriterOptions{Writer: os.Stdout}))
    pipe := selina.NewSimplePipeline(input, filter, output)
    if err := pipe.Run(context.Background()); err != nil {
        fmt.Printf("ERR: %v\n", err)
    }
    for name, stat := range pipe.Stats(){
        fmt.Printf("Node:%s=%v\n", name, stat)
    }
}

Graphviz

Optionally you can render a graph of your pipeline

func main(){
    // here pipeline is built
    p := selina.FreePipeline(read,filter,write,store,custom,csv)
    selina.Graph(p, os.Stdout)
}

With te previous code you get a .dot graph

digraph {
  rankdir=LR;
  X0001FXACQQKSTZV3SGXZPF3C9C[label="Filter"];
  X0001FXACQQKSTZV3SGY4YSE6NX[label="Write"];
  X0001FXACQQKSTZV3SGY2XQMB1X[label="Custom"];
  X0001FXACQQKSTZV3SGY5QMFJ0Q[label="Store"];
  X0001FXACQQKSTZV3SGXTVX3MPP[label="Read"];
  X0001FXACQQKSTZV3SGXXXA43EF[label="CSV"];
  X0001FXACQQKSTZV3SGXTVX3MPP -> X0001FXACQQKSTZV3SGXXXA43EF [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXTVX3MPP -> X0001FXACQQKSTZV3SGXZPF3C9C [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXXXA43EF -> X0001FXACQQKSTZV3SGY4YSE6NX [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGXZPF3C9C -> X0001FXACQQKSTZV3SGY2XQMB1X [label="count=10000,bytes=1MiB"];
  X0001FXACQQKSTZV3SGY2XQMB1X -> X0001FXACQQKSTZV3SGY5QMFJ0Q [label="count=10000,bytes=1MiB"];
}

Renderized with graphviz

graph

Builtin workers

By default selina has this workers implemented

  • csv.Encoder : Transform data from json to csv
  • csv.Decoder : Transform csv data into json
  • custom.Function : Allow to execute custom functions into a pipeline node
  • ops.Cron : Allow scheduled messages into a pipeline
  • ops.TimeSerie: Generate time series data
  • random.Random : Generate random byte slices
  • regex.Filter : Filter data using a regular expresion
  • remote.Server : Listen for remote data
  • remote.Client : Send data to a remote pipeline
  • sql.Reader : Execute a query against a database and return its rows as json objects
  • sql.Writer : Insert rows into a table from json objects
  • text.Reader : Use any io.Reader and read its contents as text
  • text.Writer : Write text data into any io.Writer
  • filesystem.Reader : Use afero.Fs to read arbitrary files
  • filesystem.Writer : Use afero.Fs to write to arbitrary files

Design

Selina have three main components

  • Pipeline
  • Node
  • Worker

Some utility functions are provided to build pipelines, LinealPipeline(n ... Node)*Pipeliner chain all nodes in same order as their are passed. FreePipeline(n ...Node)*Pipeliner Just runs all nodes without chain them so you can build any pipeline, including ciclic graphs or aciclic graphs

Pipeline

Start data processing and manage all chained nodes in a single object

Node

Contains methods to pass data from Worker to Worker and get metrics

Worker

All data Extraction/Transformation/Load logic is encapsulated in a Worker instance

Conventions for workers

  • A nil input channel is only for workers that produces data if a worker does not allow nil input channel it must returns selina.ErrNilUpstream
  • Workers must close its output channel when finish their job
  • A closed input channel must gracefully finalize worker
  • Pipeline finalization is triggered vía channel closing
  • All workers must handle context cancellation

Codec

Most of workers receive an optional configuration Codec that implements Marshaler/Unmarshaler interfaces, by default msgpack is used if no Codec is provided

Command line Usage

Binary

selina -file pipeline.yml -timeout 10h

Docker

#all paths are relatives to /app/selina
#you can also use absolute paths
docker run -v$PWD:/data/sample:ro licaonfee/selina:rolling -f /data/pipeline.yml

Where pipeline.yml is

---
nodes:
  - name: employes
    type: read_file
    args:
      filename: /data/employes.csv
  - name: filter_it
    type: regex
    args:
      pattern: '^.*,it,.*$'
    fetch:
      - employes
  - name: to_json
    type: csv
    args:
      mode: decode
      header: [name,role,department,id]
    fetch:
      - filter_it
  - name: it_employes
    type: write_file
    args:
      filename: it_employes.txt
      ifexists: overwrite
      mode: 0644
    fetch:
      - to_json

Autocompletion

Also yun can use any LSP compatible editor with to autocomplete selina pipelines

Get json-shema

selina -schema > /home/user/selina-schema.json

VIM

In .vim/coc-settings.json

{
  "yaml.schemas": {
  "/home/user/selina-schema.json": "*.selina.yaml"
  }
}

VSCode

  • Install redhat.vscode-yaml extension

In settings.json

{
  "yaml.schemas": {
  "/home/user/selina-schema.json": "*.selina.yaml"
  }
}