Skip to content
Permalink
Fetching contributors…
Cannot retrieve contributors at this time
119 lines (100 sloc) 4.48 KB
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package rocks.nifi.examples.processors;
import com.jayway.jsonpath.JsonPath;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
/**
*
* @author phillip
*/
@SideEffectFree
@Tags({"JSON", "NIFI ROCKS"})
@CapabilityDescription("Fetch value from json path.")
public class JsonProcessor extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
public static final String MATCH_ATTR = "match";
public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
.name("Json Path")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("Succes relationship")
.build();
@Override
public void init(final ProcessorInitializationContext context){
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(JSON_PATH);
this.properties = Collections.unmodifiableList(properties);
Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final AtomicReference<String> value = new AtomicReference<>();
FlowFile flowfile = session.get();
session.read(flowfile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try{
String json = IOUtils.toString(in);
String result = JsonPath.read(json, context.getProperty(JSON_PATH).getValue());
value.set(result);
}catch(Exception ex){
ex.printStackTrace();
getLogger().error("Failed to read json string.");
}
}
});
// Write the results to an attribute
String results = value.get();
if(results != null && !results.isEmpty()){
flowfile = session.putAttribute(flowfile, "match", results);
}
// To write the results back out ot flow file
flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(value.get().getBytes());
}
});
session.transfer(flowfile, SUCCESS);
}
@Override
public Set<Relationship> getRelationships(){
return relationships;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return properties;
}
}
You can’t perform that action at this time.