Skip to content

Commit

Permalink
Added Control Rate And Drop Groovy scripts for NiFi
Browse files Browse the repository at this point in the history
  • Loading branch information
pvillard31 committed May 11, 2017
1 parent 65a9278 commit cbe6824
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
76 changes: 76 additions & 0 deletions hdf/nifi/InvokeScriptedProcessor/ControlRateAndDrop.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
class ControlRateAndDrop implements Processor {

def REL_SUCCESS = new Relationship.Builder()
.name('signal')
.description('One flow file will be routed to this relationship according to the given frequency')
.build();

def REL_DROP = new Relationship.Builder()
.name('drop')
.description('Every other flow file will be routed to this relationship')
.build();

def FREQUENCY = new PropertyDescriptor.Builder()
.name('Frequency')
.description('Frequency used to release one flow file in the success relationship')
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();

def lastSignal = 0L;

@Override
void initialize(ProcessorInitializationContext context) { }

@Override
Set<Relationship> getRelationships() { return [REL_SUCCESS, REL_DROP] as Set }

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'Frequency': return FREQUENCY
default: return null
}
}

@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

@Override
List<PropertyDescriptor> getPropertyDescriptors() { return [FREQUENCY] as List }

@Override
String getIdentifier() { return 'ControlRateAndDrop-InvokeScriptedProcessor' }

@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {

def session = sessionFactory.createSession()
def flowFile = session.get()
if (flowFile == null) {
return
}

def freq = context.getProperty(FREQUENCY).asTimePeriod(java.util.concurrent.TimeUnit.MILLISECONDS).longValue();

if(new Date().getTime() - lastSignal > freq) {
session.transfer(flowFile, REL_SUCCESS)
session.commit()
lastSignal = new Date().getTime()
} else {
session.transfer(flowFile, REL_DROP)
session.commit()
}

} catch(e) {
throw new ProcessException(e)
}
}

}

processor = new ControlRateAndDrop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
class ControlRateAndDropLineageMeanMax implements Processor {

def REL_SUCCESS = new Relationship.Builder()
.name('signal')
.description('One flow file will be routed to this relationship according to the given frequency')
.build();

def REL_DROP = new Relationship.Builder()
.name('drop')
.description('Every other flow file will be routed to this relationship')
.build();

def FREQUENCY = new PropertyDescriptor.Builder()
.name('Frequency')
.description('Frequency used to release one flow file in the success relationship')
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();

def lastSignal = 0L;
def eventCount = 0L;
def sum = 0L;
def max = 0L;

@Override
void initialize(ProcessorInitializationContext context) { }

@Override
Set<Relationship> getRelationships() { return [REL_SUCCESS, REL_DROP] as Set }

@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }

@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'Frequency': return FREQUENCY
default: return null
}
}

@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }

@Override
List<PropertyDescriptor> getPropertyDescriptors() { return [FREQUENCY] as List }

@Override
String getIdentifier() { return 'ControlRateAndDropLineageMeanMax-InvokeScriptedProcessor' }

@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {

def session = sessionFactory.createSession()
def flowFile = session.get()
if (flowFile == null) {
return
}

def freq = context.getProperty(FREQUENCY).asTimePeriod(java.util.concurrent.TimeUnit.MILLISECONDS).longValue()
def now = new Date().getTime()
def duration = now - flowFile.getLineageStartDate()

eventCount++
sum += duration
max = Math.max(max, duration)

if(now - lastSignal > freq) {
flowFile = session.putAttribute(flowFile, "mean", String.valueOf(Math.round(sum / eventCount)))
flowFile = session.putAttribute(flowFile, "max", String.valueOf(max))
session.transfer(flowFile, REL_SUCCESS)
session.commit()
lastSignal = now
sum = 0L
max = 0L
eventCount = 0L
} else {
session.transfer(flowFile, REL_DROP)
session.commit()
}

} catch(e) {
throw new ProcessException(e)
}
}

}

processor = new ControlRateAndDropLineageMeanMax()

0 comments on commit cbe6824

Please sign in to comment.