Skip to content

Commit

Permalink
Improve code test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Symbianx committed May 20, 2020
1 parent 94b4fbf commit 04be206
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/source/policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ Simple Batch
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This policy's behaviour is similar to the Simple policy but it will process files in batches instead
of processing them all at once.
of processing all the files at once.

You can learn more about the properties of this policy :ref:`here<config_options-policies-simple_batch>`.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -48,19 +50,20 @@ public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOExcepti
Map<String, String> configMap = buildConfigMap(fsConfig.getDirectories());
configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "1");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap);

Path dir = fsConfig.getDirectories().get(0);

fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt"));
//this file does not match the regexp
// this file does not match the regexp
fsConfig.getFs().createNewFile(new Path(dir, System.nanoTime() + ".txt"));

Policy policy = ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), sourceTaskConfig);

Policy policy = ReflectionUtils.makePolicy(
(Class<? extends Policy>) fsConfig.getSourceTaskConfig().getClass(FsSourceTaskConfig.POLICY_CLASS),
sourceTaskConfig);
fsConfig.setPolicy(policy);

Iterator<FileMetadata> it = fsConfig.getPolicy().execute();

// First batch of files (1 file)
assertFalse(fsConfig.getPolicy().hasEnded());
assertTrue(it.hasNext());
Expand All @@ -72,10 +75,27 @@ public void execPolicyBatchesFiles(PolicyFsTestConfig fsConfig) throws IOExcepti
// Second batch of files (1 file)
it = fsConfig.getPolicy().execute();
assertTrue(it.hasNext());

assertNotEquals(firstPath, it.next().getPath());

assertFalse(it.hasNext());
assertTrue(fsConfig.getPolicy().hasEnded());
}

@ParameterizedTest
@MethodSource("fileSystemConfigProvider")
public void invalidBatchSize(PolicyFsTestConfig fsConfig) {
Map<String, String> configMap = buildConfigMap(fsConfig.getDirectories());
configMap.put(SimpleBatchPolicy.BATCH_POLICY_BATCH_SIZE, "one");
FsSourceTaskConfig sourceTaskConfig = new FsSourceTaskConfig(configMap);
assertThrows(ConfigException.class, () -> {
try {
ReflectionUtils.makePolicy((Class<? extends Policy>) fsConfig.getSourceTaskConfig()
.getClass(FsSourceTaskConfig.POLICY_CLASS), sourceTaskConfig);
} catch (Exception e) {
throw e.getCause();
}
});

}
}

0 comments on commit 04be206

Please sign in to comment.