Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use properties instead of customizers #4

Merged
merged 1 commit into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
209 changes: 103 additions & 106 deletions Transformations.java
@@ -1,5 +1,11 @@
// camel-k: language=java property-file=transformation.properties dependency=camel:jacksonxml dependency=camel:http dependency=camel:gson
// camel-k: source=customizers/CSVCustomizer.java source=customizers/PostgreSQLCustomizer.java
// camel-k: language=java property-file=transformation.properties
// camel-k: dependency=camel:jacksonxml
// camel-k: dependency=camel:http
// camel-k: dependency=camel:gson
// camel-k: dependency=camel:jdbc
// camel-k: dependency=camel:csv
// camel-k: dependency=mvn:org.postgresql:postgresql:jar:42.2.13
// camel-k: dependency=mvn:org.apache.commons:commons-dbcp2:jar:2.7.0

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -15,148 +21,139 @@

public class Transformations extends RouteBuilder {

@Override
@Override
public void configure() throws Exception {

//The following processors store relevant info as properties
// The following processors store relevant info as properties
Processor processCsv = new CSVProcessor();
Processor processXML = new XMLProcessor();

//Preparing properties to build a GeoJSON Feature
// Preparing properties to build a GeoJSON Feature
Processor processDB = new DBProcessor();

//Just collects all features in a collection for the final GeoJSON
// Just collects all features in a collection for the final GeoJSON
Processor buildGeoJSON = new GeoJSONProcessor();

//Aggregate all messages into one message with the list of bodies
// Aggregate all messages into one message with the list of bodies
AggregationStrategy aggregationStrategy = new CollectToListStrategy();

//This is the actual route
// This is the actual route
from("timer:java?period=100000")
// Reference URL for air quality e-Reporting on EEA
// https://www.eea.europa.eu/data-and-maps/data/aqereporting-2

//We start by reading our data.csv file, looping on each row
.to("{{source.csv}}")
.unmarshal("customCSV")
.split(body()).streaming()
// We start by reading our data.csv file, looping on each row
.to("{{source.csv}}").unmarshal("customCSV").split(body()).streaming()

//we store on exchange properties all the data we are interested in
// we store on exchange properties all the data we are interested in
.process(processCsv)

//on each row, we query an XML API service
.setBody().constant("")
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
// on each row, we query an XML API service
.setBody().constant("").setHeader(Exchange.HTTP_METHOD, constant("GET"))
.setHeader(Exchange.HTTP_QUERY, simple("lat=${exchangeProperty.lat}&lon=${exchangeProperty.lon}&format=xml"))
.to("https://nominatim.openstreetmap.org/reverse")
.unmarshal().jacksonxml()
.to("https://nominatim.openstreetmap.org/reverse").unmarshal().jacksonxml()

//we store on exchange properties all the data we are interested in
// we store on exchange properties all the data we are interested in
.process(processXML)

//now we query the postgres database for more data
// now we query the postgres database for more data
.setBody().simple("SELECT info FROM descriptions WHERE id like '${exchangeProperty.pollutant}'")
.to("jdbc:postgresBean?readSize=1")
//we store on exchange properties all the data we are interested in

// we store on exchange properties all the data we are interested in
.process(processDB)

//we collect all rows into one message
.aggregate(constant(true), aggregationStrategy)
.completionSize(5)
.process(buildGeoJSON)
.marshal().json(JsonLibrary.Gson)
// we collect all rows into one message
.aggregate(constant(true), aggregationStrategy).completionSize(5).process(buildGeoJSON).marshal()
.json(JsonLibrary.Gson)

.to("log:info?showBody=true")
//and finally store the result on the postgres database
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')"))
.to("jdbc:postgresBean")

//Write some log to know it finishes properly
// and finally store the result on the postgres database
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')")).to("jdbc:postgresBean")

// Write some log to know it finishes properly
.log("Information stored");
}

private final class CollectToListStrategy
extends AbstractListAggregationStrategy<Object> {
@Override
public Object getValue(Exchange exchange) {
return exchange.getMessage().getBody();
}
}
private final class CollectToListStrategy extends AbstractListAggregationStrategy<Object> {
@Override
public Object getValue(Exchange exchange) {
return exchange.getMessage().getBody();
}
}

private final class GeoJSONProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> res = new HashMap<String, Object>();
res.put("features", exchange.getMessage().getBody());
res.put("type", "FeatureCollection");
exchange.getIn().setBody(res);
private final class GeoJSONProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Map<String, Object> res = new HashMap<String, Object>();
res.put("features", exchange.getMessage().getBody());
res.put("type", "FeatureCollection");
exchange.getIn().setBody(res);
}
}

private final class DBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Object> body = exchange.getMessage().getBody(List.class);

Map<String, Object> outputBody = new HashMap<String, Object>();
outputBody.put("unit", exchange.getProperty("unit"));
outputBody.put("level", exchange.getProperty("level"));
outputBody.put("pollutant", exchange.getProperty("pollutant"));
outputBody.put("address", exchange.getProperty("address"));

// If we got any response from the DB, add it
if (body.size() > 0) {
outputBody.put("info", body.get(0).toString());
}
}

private final class DBProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Object> body = exchange.getMessage().getBody(List.class);

Map<String, Object> outputBody = new HashMap<String, Object>();
outputBody.put("unit", exchange.getProperty("unit"));
outputBody.put("level", exchange.getProperty("level"));
outputBody.put("pollutant", exchange.getProperty("pollutant"));
outputBody.put("address", exchange.getProperty("address"));

//If we got any response from the DB, add it
if(body.size() > 0) {
outputBody.put("info", body.get(0).toString());
}

List<String> coordinates = new ArrayList<String>();
coordinates.add(exchange.getProperty("lat", "").toString());
coordinates.add(exchange.getProperty("lon", "").toString());

Map<String, Object> geometry = new HashMap<String, Object>();
geometry.put("type", "Point");
geometry.put("coordinates", coordinates);

Map<String, Object> res = new HashMap<String, Object>();
res.put("geometry", geometry);
res.put("properties", outputBody);
res.put("type", "Feature");

exchange.getIn().setBody(res);
}
}

private final class XMLProcessor implements Processor {
@Override
List<String> coordinates = new ArrayList<String>();
coordinates.add(exchange.getProperty("lat", "").toString());
coordinates.add(exchange.getProperty("lon", "").toString());

Map<String, Object> geometry = new HashMap<String, Object>();
geometry.put("type", "Point");
geometry.put("coordinates", coordinates);

Map<String, Object> res = new HashMap<String, Object>();
res.put("geometry", geometry);
res.put("properties", outputBody);
res.put("type", "Feature");

exchange.getIn().setBody(res);
}
}

private final class XMLProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);
exchange.setProperty("address", body.get("addressparts"));
}
}
}

private final class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);

if (body != null) {
extractValue(exchange, body, "Latitude of station", "lat");
extractValue(exchange, body, "Longitude of station", "lon");
extractValue(exchange, body, "Unit", "unit");
extractValue(exchange, body, "Air pollution level", "level");
extractValue(exchange, body, "Air pollutant", "pollutant");
}
}

private final class CSVProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
Map<String, String> body = exchange.getIn().getBody(Map.class);

if (body != null) {
extractValue(exchange, body, "Latitude of station", "lat");
extractValue(exchange, body, "Longitude of station", "lon");
extractValue(exchange, body, "Unit", "unit");
extractValue(exchange, body, "Air pollution level", "level");
extractValue(exchange, body, "Air pollutant", "pollutant");
}
}

private void extractValue(Exchange exchange, Map<String, String> body,
String param, String keyName) {
if (body.containsKey(param)) {
exchange.setProperty(keyName, body.get(param));
}
}
}
private void extractValue(Exchange exchange, Map<String, String> body, String param, String keyName) {
if (body.containsKey(param)) {
exchange.setProperty(keyName, body.get(param));
}
}
}
}
18 changes: 0 additions & 18 deletions customizers/CSVCustomizer.java

This file was deleted.

30 changes: 0 additions & 30 deletions customizers/PostgreSQLCustomizer.java

This file was deleted.

12 changes: 10 additions & 2 deletions transformation.properties
Expand Up @@ -8,5 +8,13 @@ postgresql.host=mypostgres
postgresql.database=example

# CSV dataformat settings
camel.dataformat.csv.allow-missing-column-names=true
camel.dataformat.csv.use-maps=true
camel.beans.customCSV = #class:org.apache.camel.model.dataformat.CsvDataFormat
camel.beans.customCSV.allow-missing-column-names = true
camel.beans.customCSV.use-maps = true

# JDBC dataformat settings
camel.beans.postgresBean = #class:org.apache.commons.dbcp2.BasicDataSource
camel.beans.postgresBean.url = jdbc:postgresql://{{postgresql.host}}:5432/{{postgresql.database}}
camel.beans.postgresBean.username = {{postgresql.user}}
camel.beans.postgresBean.password = {{postgresql.password}}
camel.beans.postgresBean.validation-query = SELECT 1