Skip to content

Conversation

@andresantoniuk
Copy link

This is a preliminary version of a new LogListener this goals:

  • Use Kafka as a way to "concentrate" logs from different jPOS modules/instances/servers
  • Events formated as jSON so it's is easy to ingest this events to Elastic via Logstash Kafka Input

Configuration is done via a new log-listener section on 00_logger.xml that forwards configuration options to the Kafka Producer API

  <log-listener class="org.jpos.util.KafkaLogListener">
    <property name="topic" value="jpos" />
    <property name="KafkaProducer.bootstrap.servers" value="localhost:9092" />
    <property name="KafkaProducer.acks" value="all" />
    <property name="KafkaProducer.key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
    <property name="KafkaProducer.value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
    <property name="KafkaProducer.retries" value="0" />
    <property name="KafkaProducer.batch.size" value="16384" />
    <property name="KafkaProducer.linger.ms" value="1" />
    <property name="KafkaProducer.buffer.memory" value="33554432" />
  </log-listener>

Testing environment

input {  
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["jpos"]
  }
}
filter {
  json {
    source => "message"
  }
  mutate {
    remove_field => [ "message" ]
  }
}
output {
  stdout { 
    codec => rubydebug
  }
  elasticsearch{
    hosts => ["localhost:9200"]
    index => "jposlog-%{+YYYY.MM.dd.hh}"
  }
}
  • Sample log data in elastic
GET jposlog-2018*/_search

{
  "took": 1134,
  "timed_out": false,
  "_shards": {
    "total": 10,
    "successful": 10,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 206,
    "max_score": 1,
    "hits": [
      {
        "_index": "jposlog-2018.07.19.01",
        "_type": "doc",
        "_id": "nT8QsGQBU4jSE_wmPlfZ",
        "_score": 1,
        "_source": {
          "IP": "52.7.83.125",
          "ElementType_0": "java.lang.String",
          "ElementContent_0": "Try 0 isobridge.jpos.org:9000",
          "@version": "1",
          "@timestamp": "2018-07-19T01:03:31.940Z",
          "Port": "9000",
          "Tag": "connect",
          "Realm": "channel"
        }
      },
      {
        "_index": "jposlog-2018.07.19.12",
        "_type": "doc",
        "_id": "Bz_cr2QBU4jSE_wmNlcw",
        "_score": 1,
        "_source": {
          "IP": "52.7.83.125",
          "ElementType_0": "org.jpos.iso.ISOMsg",
          "ISOMsgPackager_0": "org.jpos.iso.packager.XMLPackager",
          "@version": "1",
          "@timestamp": "2018-07-19T00:06:40.665Z",
          "Port": "9000",
          "Tag": "receive",
          "ISOMsg_org.jpos.iso.packager.XMLPackager_0": {
            "0": "1810",
            "7": "0718210640",
            "11": "800376",
            "12": "800048",
            "37": "152634",
            "38": "249453",
            "39": "00",
            "63b": "576564204A756C2031382032313A30363A343020474D542D30333A30302032303138"
          },
          "Realm": "channel"
        }
      },
      {
        "_index": "jposlog-2018.07.19.12",
        "_type": "doc",
        "_id": "Cj_cr2QBU4jSE_wmNlcw",
        "_score": 1,
        "_source": {
          "ISOMsg_org.jpos.iso.packager.Base1Packager_0": {
            "0": {
              "description": "MESSAGE TYPE INDICATOR",
              "class": "org.jpos.iso.IFB_NUMERIC",
              "value": 1800
            },
            "7": {
              "description": "TRANSMISSION DATE AND TIME",
              "class": "org.jpos.iso.IFB_NUMERIC",
              "value": 718210640
            },
            "11": {
              "description": "SYSTEM TRACE AUDIT NUMBER",
              "class": "org.jpos.iso.IFB_NUMERIC",
              "value": 800698
            },
            "12": {
              "description": "TIME, LOCAL TRANSACTION",
              "class": "org.jpos.iso.IFB_NUMERIC",
              "value": 800376
            },
            "63b": "576564204A756C2031382032313A30363A343020474D542D30333A30302032303138"
          },
          "IP": "127.0.0.1",
          "ElementType_0": "org.jpos.iso.ISOMsg",
          "ISOMsgPackager_0": "org.jpos.iso.packager.Base1Packager",
          "@version": "1",
          "@timestamp": "2018-07-19T00:06:40.719Z",
          "Port": "49476",
          "Tag": "receive",
          "Realm": "channel"
        }
      },
      {
        "_index": "jposlog-2018.07.19.12",
        "_type": "doc",
        "_id": "Ez_cr2QBU4jSE_wmNlcw",
        "_score": 1,
        "_source": {
          "EventDump": """
<log realm="org.jpos.transaction.TransactionManager" at="2018-07-18T21:06:41.288" lifespan="303ms">
  <commit>
    txnmgr-0:idle:5
    <context>
      TIMESTAMP: Wed Jul 18 21:06:40 UYT 2018
      SOURCE: org.jpos.iso.channel.VAPChannel@15ddcd75
      REQUEST: 
       <isomsg>
         <!-- org.jpos.iso.packager.XMLPackager -->
         <header>160102004E0000000000000000000000000000000000</header>
         <field id="0" value="1800"/>
         <field id="7" value="0718210640"/>
         <field id="11" value="800977"/>
         <field id="12" value="800698"/>
         <field id="63" value="576564204A756C2031382032313A30363A343020474D542D30333A30302032303138" type="binary"/>
       </isomsg>
      
      DESTINATION: jPOS-AUTORESPONDER
      RESULT: 
       <result/>
      
      :paused_transaction: 
       id: 5
      
      RESPONSE: 
       <isomsg direction="outgoing">
         <!-- org.jpos.iso.packager.Base1Packager -->
         <header>160102004E0000000000000000000000000000000000</header>
         <field id="0" value="1810"/>
         <field id="7" value="0718210640"/>
         <field id="11" value="800977"/>
         <field id="12" value="800698"/>
         <field id="37" value="431490"/>
         <field id="38" value="484293"/>
         <field id="39" value="00"/>
         <field id="63" value="576564204A756C2031382032313A30363A343020474D542D30333A30302032303138" type="binary"/>
       </isomsg>
      
    </context>
            prepare: o.j.t.p.QueryHost PREPARED PAUSE READONLY NO_JOIN
            prepare: o.j.t.p.SendResponse PREPARED READONLY
             commit: o.j.t.p.SendResponse
     in-transit=0, head=6, tail=6, paused=0, outstanding=0, active-sessions=2/128, tps=3, peak=3, avg=0.18, elapsed=302ms
    <profiler>
      prepare: o.j.t.p.QueryHost [4.6/4.6]
      resume [292.0/296.7]
      prepare: o.j.t.p.SendResponse [0.7/297.5]
       commit: o.j.t.p.SendResponse [4.6/302.1]
      end [3.9/306.1]
    </profiler>
  </commit>
</log>

""",
          "@version": "1",
          "@timestamp": "2018-07-19T00:06:41.291Z",
          "Tag": "info",
          "Realm": "org.jpos.transaction.TransactionManager"
        }
      },

A nice view from kibana here


Notes:

  • Originally I plan to add this LogListener as a new JPOS-EE module, but as JSON is needed I consider better to enable JSON packager that was on hold here.

  • Need to continue working on LogEvents that do not contain payload, specially on TransactionManager

  • This is part of my journey with jPOS and Elastic for monitoring and transactional data analysis.

map.put(prefix + c.getKey(), ((ISOField)c).getValue());
}else {
Map field = new LinkedHashMap();
if((((ISOBasePackager) packager).getFieldPackager((int) c.getKey()).getClass().getName()).contains("NUMERIC")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're assuming ISOBasePackager and this could throw a class cast exception.

map.put(prefix + c.getKey(), ((ISOField)c).getValue());
}else {
Map field = new LinkedHashMap();
if((((ISOBasePackager) packager).getFieldPackager((int) c.getKey()).getClass().getName()).contains("NUMERIC")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't go by name containing NUMERIC to determine the type - need something better - isn't there some packager patterns JPOS uses already?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Vishu, I didn't merge this PR just yet because I'm not sure which JSON dependency we want to add. In jPOS-EE we are using Jackson, I believe this one uses JSON Simple (kinda dead), we have some others using GSON (kinda dead). @vsalaman suggested https://bolerio.github.io/mjson which looks very nice. So we'll see which one to use in jPOS, and then rework this PR with @andresantoniuk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ar thanks for the note. I didn't know GSON is considered obsolete. I see commits in Aug 2018. I haven't used mjson but will try it out some time.

<property name="name" value="logger.Q2.buffered" />
</log-listener>

<log-listener class="org.jpos.util.KafkaLogListener">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this logger file should not add Kafka listener - perhaps provide this as an example elsewhere?

@ar
Copy link
Member

ar commented Dec 9, 2020

Great PR, but I think it's better suited as a jPOS-EE module

@ar ar closed this Dec 9, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants