Skip to content

Example: GunDB

Lorenzo Mangani edited this page Sep 24, 2017 · 4 revisions

Gun is a "new school" realtime, distributed, offline-first, graph database.

PaStash-gun branch integrates an experimental Gun filter example for read/writing of GunDB properties for inline distributed pairing and injection of "realtime" correlation vectors across different protocols, flows and events processed by a cluster of connected paStash intances.

In this basic example, we're going to use a pair of filters to extract and inject "al dente" fields across different event types from our beloved Janus WebRTC Gateway

Gun write optionally supports a ttl parameter to trigger self-destruction, expressed in seconds.

Example: ParmiJanus (Pastash + Janus)

input {
 http {
    host => 127.0.0.1
    port => 8080
 }
}

filter{
 if [type] == 2 {
  gun_write{
    field => session_id
    source => event.opaque_id
    ttl => 120
  }
 }
 if [type] == 32 {
  gun_read{
     target_field => correlation
     field => session_id
     source => event.opaque_id
   }
 }
}

output {
 elasticsearch{
   host => 127.0.0.1
   port => 9200
   bulk_limit => 1000
   bulk_timeout => 100
   index_prefix => janus
   data_type => event
   basic_auth_user => janus
   basic_auth_password => parmijanus
 }
}

Relational Flow:

In our example, "opaque_id" from Type 2 is leaked into Type 32 as a join-correlation vector:

Output Samples

Extraction from TYPE 2

  gun_write{
    field => session_id
    source => event.opaque_id
  }

{
  "_index": "fajanus-2017.07.08",
  "_type": "event",
  "_id": "AV0jV0h0s0iKvk_zTTRx",
  "_score": null,
  "_source": {
    "type": 2,
    "timestamp": 1499105804472212,
    "session_id": "715597540605813",
    "handle_id": 8796940787397620,
    "event": {
      "name": "attached",
      "plugin": "janus.plugin.videoroom",
      "opaque_id": "videoroomtest-MamUvDmUymu84N_"
    }
  }
}

Injection to TYPE 32

  gun_read{
     target_field => correlation
     field => session_id
     source => event.opaque_id
   }

{
  "_index": "janus-2017.07.08",
  "_type": "event",
  "_id": "AV0jF1O4s0iKvk_zTSM6",
  "_score": null,
  "_source": {
    "type": 32,
    "timestamp": 1499105804472714,
    "session_id": "715597540605813",
    "handle_id": 8796940787397620,
    "event": {
      "media": "video",
      "base": 90000,
      "lsr": 143368072,
      "lost": 26,
      "lost-by-remote": 0,
      "jitter-local": 565,
      "jitter-remote": 0,
      "packets-received": 100,
      "packets-sent": 0,
      "bytes-received": 78159,
      "bytes-sent": 0,
      "nacks-received": 0,
      "nacks-sent": 30
    }
    "correlation": "videoroomtest-MamUvDmUymu84N_"
  }
}

Experiment

Results from this example can be replicated using the latest Janus Gateways, or simulated using faJanus, a very dumb Janus Event simulator for CI.

Janus Mapping


PUT _template/janus
{
  "template": "janus*",
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "_default_": {
      "_all": {
        "enabled": false
      },
      "dynamic_templates": [
        {
          "strings": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "string",
              "index": "analyzed",
              "omit_norms": true,
              "fields": {
                "raw": {
                  "type": "string",
                  "index": "not_analyzed",
                  "ignore_above": 256
                }
              }
            }
          }
        }
      ]
    }
  }
}

Working Example

The following example simulates the shared Gun functionality with TTL expiration

Recipe

Save your Gun recipe to a file, ie: /tmp/pastash_gun.conf

input {
  http {
    port => 4444
    host => 127.0.0.1
  }
}

filter {
 json_fields {}

 if [type] == 1 {
   gun_write {
     gun_shared => true
     field => session
     source => event
     ttl => 10
   }
 }

 if [type] == 2 {
   gun_read {
     gun_shared => true
     target_field => correlation
     field => session
     source => event
   }
 }
}

output {
  stdout {}
}

Execute paStash

./bin/pastash --config_file=/tmp/pastash_gun.conf

Validate

Forge a JSON document with a session and event parameters:

curl -H "Content-Type: application/json" -d '{"event":"xyz","session":"a123", "type": 1}' http://localhost:4444

Immediately after, forge a JSON document with a session parameter and no events:

curl -H "Content-Type: application/json" -d '{"session":"a123", "type": 2}' http://localhost:4444

PaStash will use Gun to insert the desired correlation:

[STDOUT] {
  "message": "{\"session\":\"a123\", \"type\": 2}",
  "host": "localhost.localdomain",
  "http_port": "4444",
  "type": 2,
  "@timestamp": "2017-09-24T08:31:07.969Z",
  "@version": "1",
  "session": "a123",
  "correlation": {
    "event": "xyz"
  }
}

Next, wait 10 seconds for the TTL to expire, and send the same document again:

curl -H "Content-Type: application/json" -d '{"session":"a123", "type": 2}' http://localhost:4444

paStash will no longer find the expired key, and the original object is returned.

Clone this wiki locally