Skip to content
No description, website, or topics provided.
Branch: master
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Type Name Latest commit message Commit time
Failed to load latest commit information.

Ingest service worker

A golang saleable job scheduler for long running ingestion jobs. It is currently designed for for parsing news feed and twitter and storing them in elasticsearch and minio

Just some experimentation with creating ingest service.

currently only RSS/News feeds and twitter are supported. Hopefully expand to cover:

  • reddit
  • weather
  • stocks
  • hackernews


Designed to be run in a containerised Infrastructure. needs connections to an ETCD, elasticsearch and Minio clusters. ETCD is used for config and data sharing between the workers.

In this current implementation there is no concept of master-slave, all jobs are evenly distributed between them. Although the code to do this sharing is rather simplistic and assumes that all workers can process all jobs.

Since we are using ETCD when a new worker comes into the clusters the other workers detect and stop all jobs and will re-distribute the current list of jobs.



As noted above you need several components to get all the components to work they are an elasticsearch a S3 compatible server and a ETCD cluster. Below is a list of Docker containers that I have used.

# Minio S3 compatible server that you can selfhost
docker run -p 9000:9000 --name minio1 -v /mnt/data:/data -v /mnt/config:/root/.minio minio/minio server /data

# etcd v3 Web UI
git clone
cd e3w
docker-compose up

docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"


Clone this repo and get the golang requirements

go get
go get
go get
go get

Build the package go build

Adding new jobs

Adding additional jobs is hopefully a fairly simple process of creating a job that implements the following Job interface

type Job interface {
# function to set up a job. This is where the parmas get parsed into the required struct. The params variable is a stringified json
# When the job gets triggered a pointer the ETCD client, Logger the SettingStore and input and output channel
Init(*clientv3.Client, *logrus.Logger, *settings.SettingStore, *chan string, *chan Model)
# After the Job initialisation the the run function gets called inside a go fuc with a context and WaitGroup passed it. So that the jobs can be stopped
Run(context.Context, *sync.WaitGroup)
# returns how many things have been process
GetCount() int
# returns the params needed for this job
GetParams() *JobParams

#In a init method register this job with the job factory
func init() {
  RegisterJob("JOBNAME", pointer to job struct)

type JobJson struct {
  ID     string `json:"id"`
  Name   string `json:"name"`
  Type   string `json:"type"`
  Params string `json:"parmas"`
  Time   string `json:"time"`

type JobParams struct {
  Type   string   `json:"type"`
  Params []string `json:"params"`


Build the docker containers

docker-compose build
docker stack deploy -c docker-compose.yml ingest_service


There is a small webapp that runs that shows the current jobs with the ability to add or delete jobs


Adding new Job

Based on the list of parameters that get returned by the GetParams() job function the ui will display the required list. This means that so long as the list of parameters for you job is simple there will be no need for any UI coding



This project is licensed under the MIT License - see the file for details

You can’t perform that action at this time.