Skip to content

Commit

Permalink
Merge pull request #4 from openshift-integration/customizer-properties
Browse files Browse the repository at this point in the history
Use properties instead of customizers
  • Loading branch information
lburgazzoli committed Oct 15, 2020
2 parents eab3600 + 847404f commit 18fe6fd
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 156 deletions.
209 changes: 103 additions & 106 deletions Transformations.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
// camel-k: language=java property-file=transformation.properties dependency=camel:jacksonxml dependency=camel:http dependency=camel:gson
// camel-k: source=customizers/CSVCustomizer.java source=customizers/PostgreSQLCustomizer.java
// camel-k: language=java property-file=transformation.properties
// camel-k: dependency=camel:jacksonxml
// camel-k: dependency=camel:http
// camel-k: dependency=camel:gson
// camel-k: dependency=camel:jdbc
// camel-k: dependency=camel:csv
// camel-k: dependency=mvn:org.postgresql:postgresql:jar:42.2.13
// camel-k: dependency=mvn:org.apache.commons:commons-dbcp2:jar:2.7.0

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -15,148 +21,139 @@

public class Transformations extends RouteBuilder {

@Override
@Override
public void configure() throws Exception {

//The following processors store relevant info as properties
// The following processors store relevant info as properties
Processor processCsv = new CSVProcessor();
Processor processXML = new XMLProcessor();

//Preparing properties to build a GeoJSON Feature
// Preparing properties to build a GeoJSON Feature
Processor processDB = new DBProcessor();

//Just collects all features in a collection for the final GeoJSON
// Just collects all features in a collection for the final GeoJSON
Processor buildGeoJSON = new GeoJSONProcessor();

//Aggregate all messages into one message with the list of bodies
// Aggregate all messages into one message with the list of bodies
AggregationStrategy aggregationStrategy = new CollectToListStrategy();

//This is the actual route
// This is the actual route
from("timer:java?period=100000")
// Reference URL for air quality e-Reporting on EEA
// https://www.eea.europa.eu/data-and-maps/data/aqereporting-2

//We start by reading our data.csv file, looping on each row
.to("{{source.csv}}")
.unmarshal("customCSV")
.split(body()).streaming()
// We start by reading our data.csv file, looping on each row
.to("{{source.csv}}").unmarshal("customCSV").split(body()).streaming()

//we store on exchange properties all the data we are interested in
// we store on exchange properties all the data we are interested in
.process(processCsv)

//on each row, we query an XML API service
.setBody().constant("")
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
// on each row, we query an XML API service
.setBody().constant("").setHeader(Exchange.HTTP_METHOD, constant("GET"))
.setHeader(Exchange.HTTP_QUERY, simple("lat=${exchangeProperty.lat}&lon=${exchangeProperty.lon}&format=xml"))
.to("https://nominatim.openstreetmap.org/reverse")
.unmarshal().jacksonxml()
.to("https://nominatim.openstreetmap.org/reverse").unmarshal().jacksonxml()

//we store on exchange properties all the data we are interested in
// we store on exchange properties all the data we are interested in
.process(processXML)

//now we query the postgres database for more data
// now we query the postgres database for more data
.setBody().simple("SELECT info FROM descriptions WHERE id like '${exchangeProperty.pollutant}'")
.to("jdbc:postgresBean?readSize=1")
//we store on exchange properties all the data we are interested in

// we store on exchange properties all the data we are interested in
.process(processDB)

//we collect all rows into one message
.aggregate(constant(true), aggregationStrategy)
.completionSize(5)
.process(buildGeoJSON)
.marshal().json(JsonLibrary.Gson)
// we collect all rows into one message
.aggregate(constant(true), aggregationStrategy).completionSize(5).process(buildGeoJSON).marshal()
.json(JsonLibrary.Gson)

.to("log:info?showBody=true")
//and finally store the result on the postgres database
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')"))
.to("jdbc:postgresBean")

//Write some log to know it finishes properly
// and finally store the result on the postgres database
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')")).to("jdbc:postgresBean")

// Write some log to know it finishes properly
.log("Information stored");
}

private final class CollectToListStrategy
extends AbstractListAggregationStrategy<Object> {
@Override
public Object getValue(Exchange exchange) {
return exchange.getMessage().getBody();
}
}
private final class CollectToListStrategy extends AbstractListAggregationStrategy<Object> {
@Override
public Object getValue(Exchange exchange) {
return exchange.getMessage().getBody();
}
}

private final class GeoJSONProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> res = new HashMap<String, Object>();
res.put("features", exchange.getMessage().getBody());
res.put("type", "FeatureCollection");
exchange.getIn().setBody(res);
private final class GeoJSONProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> res = new HashMap<String, Object>();
res.put("features", exchange.getMessage().getBody());
res.put("type", "FeatureCollection");
exchange.getIn().setBody(res);
}
}

private final class DBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Object> body = exchange.getMessage().getBody(List.class);

Map<String, Object> outputBody = new HashMap<String, Object>();
outputBody.put("unit", exchange.getProperty("unit"));
outputBody.put("level", exchange.getProperty("level"));
outputBody.put("pollutant", exchange.getProperty("pollutant"));
outputBody.put("address", exchange.getProperty("address"));

// If we got any response from the DB, add it
if (body.size() > 0) {
outputBody.put("info", body.get(0).toString());
}
}

private final class DBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Object> body = exchange.getMessage().getBody(List.class);

Map<String, Object> outputBody = new HashMap<String, Object>();
outputBody.put("unit", exchange.getProperty("unit"));
outputBody.put("level", exchange.getProperty("level"));
outputBody.put("pollutant", exchange.getProperty("pollutant"));
outputBody.put("address", exchange.getProperty("address"));

//If we got any response from the DB, add it
if(body.size() > 0) {
outputBody.put("info", body.get(0).toString());
}

List<String> coordinates = new ArrayList<String>();
coordinates.add(exchange.getProperty("lat", "").toString());
coordinates.add(exchange.getProperty("lon", "").toString());

Map<String, Object> geometry = new HashMap<String, Object>();
geometry.put("type", "Point");
geometry.put("coordinates", coordinates);

Map<String, Object> res = new HashMap<String, Object>();
res.put("geometry", geometry);
res.put("properties", outputBody);
res.put("type", "Feature");

exchange.getIn().setBody(res);
}
}

private final class XMLProcessor implements Processor {
@Override
List<String> coordinates = new ArrayList<String>();
coordinates.add(exchange.getProperty("lat", "").toString());
coordinates.add(exchange.getProperty("lon", "").toString());

Map<String, Object> geometry = new HashMap<String, Object>();
geometry.put("type", "Point");
geometry.put("coordinates", coordinates);

Map<String, Object> res = new HashMap<String, Object>();
res.put("geometry", geometry);
res.put("properties", outputBody);
res.put("type", "Feature");

exchange.getIn().setBody(res);
}
}

private final class XMLProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);
exchange.setProperty("address", body.get("addressparts"));
}
}
}

private final class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);

if (body != null) {
extractValue(exchange, body, "Latitude of station", "lat");
extractValue(exchange, body, "Longitude of station", "lon");
extractValue(exchange, body, "Unit", "unit");
extractValue(exchange, body, "Air pollution level", "level");
extractValue(exchange, body, "Air pollutant", "pollutant");
}
}

private final class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);

if (body != null) {
extractValue(exchange, body, "Latitude of station", "lat");
extractValue(exchange, body, "Longitude of station", "lon");
extractValue(exchange, body, "Unit", "unit");
extractValue(exchange, body, "Air pollution level", "level");
extractValue(exchange, body, "Air pollutant", "pollutant");
}
}

private void extractValue(Exchange exchange, Map<String, String> body,
String param, String keyName) {
if (body.containsKey(param)) {
exchange.setProperty(keyName, body.get(param));
}
}
}
private void extractValue(Exchange exchange, Map<String, String> body, String param, String keyName) {
if (body.containsKey(param)) {
exchange.setProperty(keyName, body.get(param));
}
}
}
}
18 changes: 0 additions & 18 deletions customizers/CSVCustomizer.java

This file was deleted.

30 changes: 0 additions & 30 deletions customizers/PostgreSQLCustomizer.java

This file was deleted.

12 changes: 10 additions & 2 deletions transformation.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,13 @@ postgresql.host=mypostgres
postgresql.database=example

# CSV dataformat settings
camel.dataformat.csv.allow-missing-column-names=true
camel.dataformat.csv.use-maps=true
camel.beans.customCSV = #class:org.apache.camel.model.dataformat.CsvDataFormat
camel.beans.customCSV.allow-missing-column-names = true
camel.beans.customCSV.use-maps = true

# JDBC dataformat settings
camel.beans.postgresBean = #class:org.apache.commons.dbcp2.BasicDataSource
camel.beans.postgresBean.url = jdbc:postgresql://{{postgresql.host}}:5432/{{postgresql.database}}
camel.beans.postgresBean.username = {{postgresql.user}}
camel.beans.postgresBean.password = {{postgresql.password}}
camel.beans.postgresBean.validation-query = SELECT 1

0 comments on commit 18fe6fd

Please sign in to comment.