Skip to content

Commit

Permalink
add elastic search log integration
Browse files Browse the repository at this point in the history
  • Loading branch information
galihlasahido committed Jul 6, 2020
1 parent 27b0f12 commit 63b619a
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 2 deletions.
7 changes: 6 additions & 1 deletion libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ ext {
postgresJDBCVersion = '42.2.11'
flywaydbVersion = '6.0.4'
liquibaseVersion = '3.8.0'
elkVersion = '7.8.0'
orgjsonVersion = '20200518'

libraries = [
//jUnit (Tests)
Expand Down Expand Up @@ -120,7 +122,10 @@ ext {
guava: "com.google.guava:guava:${guavaVersion}",
httpAsyncClient: "org.apache.httpcomponents:httpasyncclient:${httpAsyncClientVersion}",
flywaydb: "org.flywaydb:flyway-core:${flywaydbVersion}",
liquibase: "org.liquibase:liquibase-core:${liquibaseVersion}"
liquibase: "org.liquibase:liquibase-core:${liquibaseVersion}",

elk: "org.elasticsearch.client:elasticsearch-rest-high-level-client:${elkVersion}",
orgJson: "org.json:json:${orgjsonVersion}"
]

jsonSchemaValidatorLibs = [
Expand Down
11 changes: 11 additions & 0 deletions modules/elasticsearch/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
description = 'jPOS-EE :: Elastic search Support Module'
group = 'org.jpos.elk'

dependencies {
implementation libraries.orgJson
implementation libraries.jpos
implementation libraries.elk
}

apply from: "${rootProject.projectDir}/jpos-app.gradle"

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.jpos.elasticsearch;

import org.jpos.core.Configurable;
import org.jpos.core.Configuration;
import org.jpos.core.ConfigurationException;
import org.jpos.space.Space;
import org.jpos.space.SpaceError;
import org.jpos.space.SpaceFactory;
import org.jpos.space.TSpace;
import org.jpos.util.FrozenLogEvent;
import org.jpos.util.LogEvent;
import org.jpos.util.LogListener;
import org.json.JSONObject;
import org.json.XML;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

public class ElasticSearchLogListener implements LogListener, Configurable {
String queueName;
String indexName;
Space sp;
Space buffer;
long timeout = 3000L;
boolean frozen = true;
Configuration cfg;

@SuppressWarnings("unused")
public ElasticSearchLogListener() {
super();
}

@SuppressWarnings("unchecked")
public synchronized LogEvent log(LogEvent ev) {
LogEvent e = frozen ? new FrozenLogEvent(ev) : ev;
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(os);
e.dump(ps,"");
JSONObject jsonObject= XML.toJSONObject(os.toString(), true);

try {
getSpace().out(queueName, jsonObject, timeout);
return null;
} catch (Throwable t) {
ev.addMessage("Unable to log to " + queueName);
return ev;
}
}

/**
* @param cfg Configuration object
* @throws ConfigurationException
*/
@Override
public void setConfiguration(Configuration cfg) throws ConfigurationException {
this.cfg = cfg;
queueName = cfg.get("queue", null);
indexName = cfg.get("index-name", "jpos");
if (queueName == null)
throw new ConfigurationException("'queue' property not configured");

timeout = cfg.getLong("timeout", timeout);
frozen = cfg.getBoolean("frozen", true);
}

@SuppressWarnings("unchecked")
private Space getSpace() {
if (sp == null) {
try {
sp = SpaceFactory.getSpace(cfg.get("space"));
if (buffer != null) {
while (buffer.rdp(queueName) != null)
sp.out (queueName, buffer.inp(queueName));
buffer = null;
}
} catch (SpaceError e) {
return (buffer = new TSpace());
}
}
return sp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* jPOS Project [http://jpos.org]
* Copyright (C) 2000-2019 jPOS Software SRL
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package org.jpos.elasticsearch;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.jpos.core.Configuration;
import org.jpos.core.ConfigurationException;
import org.jpos.q2.QBeanSupport;
import org.jpos.space.Space;
import org.jpos.space.SpaceFactory;
import org.jpos.util.NameRegistrar;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.TimeZone;
import java.util.UUID;

public class ElasticSearchService extends QBeanSupport implements Runnable {
private RestHighLevelClient client;
private String queue;
private Space sp;
private String indexname;

public RestHighLevelClient getClient() {
return client;
}

@Override
@SuppressWarnings("unchecked")
public void run() {
boolean stopping = false;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

while (running() && !stopping) {
try {
BulkRequest br = new BulkRequest();
Object obj = sp.rd (queue);
while ((obj = sp.inp (queue)) != null) {
SimpleDateFormat sdfat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
SimpleDateFormat sdfatufc = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdfatufc.setTimeZone(TimeZone.getTimeZone("UTC"));

if (obj instanceof JSONObject) {
JSONObject jsonObject = (JSONObject) obj;
JSONObject jsonLog = jsonObject.getJSONObject("log");
String direction = jsonLog.keySet().stream().filter(item->item.matches("receive|send")).findAny().orElse("");
if(!jsonLog.isNull("lifespan")) {
jsonLog.put("lifespan-int", Integer.valueOf(jsonLog.get("lifespan").toString().replaceAll("ms", "")));
}

if(!jsonLog.isNull("at")) {
jsonObject.put("time-utc", sdfatufc.format(sdfat.parse(jsonLog.get("at").toString())));
}

if(direction!="") {
JSONObject receiveLog = jsonLog.getJSONObject(direction);
if(!receiveLog.isNull("isomsg")) {
jsonLog.put("direction", direction);
JSONObject isomsgLog = receiveLog.getJSONObject("isomsg");
JSONArray fieldLog = isomsgLog.getJSONArray("field");
for (int i = 0; i < fieldLog.length(); i++) {
jsonLog.put("field" + fieldLog.getJSONObject(i).get("id"), fieldLog.getJSONObject(i).get("value"));
}
jsonLog.remove(direction);
jsonObject.put("action", direction + " message");
}
} else {
jsonObject.put("action", "logging");
}


IndexRequest req = new IndexRequest( indexname+"-"+sdf.format(new Date()))
.id(UUID.randomUUID().toString())
.source(jsonObject.toString(), XContentType.JSON)
.timeout(TimeValue.timeValueSeconds(5));

br.add(req);
} else {
stopping = true;
break;
}
}
if (br.estimatedSizeInBytes() > 0) {
BulkResponse bulkResponse = client.bulk(br, RequestOptions.DEFAULT);
if(bulkResponse.hasFailures()) {
getLog().warn (bulkResponse.buildFailureMessage());
}
}
} catch (IOException | ParseException e) {
getLog().warn (e);
}
}
}

@Override
protected void initService() {
// String[] urls = cfg.getAll("url");
// HttpHost[] hosts = new HttpHost[urls.length];
// for (int i=0; i<hosts.length; i++) {
// hosts[i] = HttpHost.create(urls[i]);
// }

HttpHost[] hosts = Arrays.stream(cfg.getAll("url")).map(HttpHost::create).toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);

String user = cfg.get("user", null);
String password = cfg.get("password", null);

if (user != null && password !=null) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider));
}
client = new RestHighLevelClient(builder);
NameRegistrar.register(getName(), this);
}

@Override
protected void startService() {
if (queue != null)
new Thread(this, getName()).start();
try {
index("start");
} catch (Exception e) {
getLog().warn(e);
}
}

@Override
protected void stopService() {
try {
if (queue != null)
sp.out (queue, Boolean.FALSE);

index("stop");
client.close();
} catch (IOException e) {
getLog().warn(e);
}
}

@Override
protected void destroyService() {
NameRegistrar.unregister(getName());
}

@Override
public void setConfiguration (Configuration cfg) throws ConfigurationException {
super.setConfiguration(cfg);
queue = cfg.get("queue", null);
this.indexname = cfg.get("index-name", "jpos");
if (queue != null) {
sp = SpaceFactory.getSpace(cfg.get("space"));
}
}

private void index(String action) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("instance", getServer().getInstanceId().toString());
builder.timeField("time", new Date());
builder.field("action", action);
}

builder.endObject();
IndexRequest req = new IndexRequest(this.indexname +"-"+sdf.format(new Date()))
.id(UUID.randomUUID().toString())
.source(builder)
.timeout(TimeValue.timeValueSeconds(5)
);
client.index(req, RequestOptions.DEFAULT);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<logger name="Q2" class="org.jpos.q2.qbean.LoggerAdaptor">
<!--
<log-listener class="org.jpos.util.SimpleLogListener" />
<log-listener class="org.jpos.util.BufferedLogListener">
<property name="max-size" value="100" />
<property name="name" value="logger.Q2.buffered" />
</log-listener>
-->
<log-listener class="org.jpos.elasticsearch.ElasticSearchLogListener">
<property name="queue" value="logger.elastic" />
<property name="space" value="tspace:elastic" />
<property name="index-name" value="jpos" />
<property name="frozen" value="true" />
</log-listener>
</logger>

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<logger name="DAEMON" class="org.jpos.q2.qbean.LoggerAdaptor">
<elastic class='org.jpos.elasticsearch.ElasticSearchService' logger='Q2'>
<property name="url" value="http://localhost:9200" />
<property name="queue" value="logger.elastic" />
<property name="space" value="tspace:elastic" />
<!--
<property name="user" value="user" />
<property name="password" value="password" />
<property name="index-name" value="jpos" />
-->
</elastic>
</logger>
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ include ':modules:core',
':modules:iso-http-servlet',
':modules:http-client',
':modules:seqno',
':modules:db-flyway'
':modules:db-flyway',
':modules:elasticsearch'

rootProject.name = 'jposee'

0 comments on commit 63b619a

Please sign in to comment.