Browse files

initial commit

  • Loading branch information...
0 parents commit 2b953fe2ebf34561d73fdf14e65788fcc6a39b6d @jt6211 committed May 22, 2012
Showing with 3,054 additions and 0 deletions.
  1. +5 −0 .gitignore
  2. +107 −0 README.md
  3. +105 −0 pom.xml
  4. +38 −0 src/main/assembly.xml
  5. +106 −0 src/main/java/io/covert/dns/collection/CollectionJob.java
  6. +207 −0 src/main/java/io/covert/dns/collection/CollectionMapper.java
  7. +140 −0 src/main/java/io/covert/dns/collection/DnsRequest.java
  8. +62 −0 src/main/java/io/covert/dns/collection/DnsRequestInputFormat.java
  9. +131 −0 src/main/java/io/covert/dns/collection/DnsRequestRecordReader.java
  10. +229 −0 src/main/java/io/covert/dns/collection/ResolverThread.java
  11. +41 −0 src/main/java/io/covert/dns/extract/Extractor.java
  12. +88 −0 src/main/java/io/covert/dns/extract/ExtractorJob.java
  13. +62 −0 src/main/java/io/covert/dns/extract/ExtractorMapper.java
  14. +57 −0 src/main/java/io/covert/dns/filtering/Filter.java
  15. +87 −0 src/main/java/io/covert/dns/filtering/FilterJob.java
  16. +54 −0 src/main/java/io/covert/dns/filtering/FilterMapper.java
  17. +86 −0 src/main/java/io/covert/dns/geo/GeoJob.java
  18. +65 −0 src/main/java/io/covert/dns/geo/GeoMapper.java
  19. +159 −0 src/main/java/io/covert/dns/geo/IpGeo.java
  20. +72 −0 src/main/java/io/covert/dns/parse/ParseJob.java
  21. +60 −0 src/main/java/io/covert/dns/parse/ParseMapper.java
  22. +62 −0 src/main/java/io/covert/dns/storage/StorageJob.java
  23. +74 −0 src/main/java/io/covert/dns/storage/StorageMapper.java
  24. +28 −0 src/main/java/io/covert/dns/storage/StorageModule.java
  25. +23 −0 src/main/java/io/covert/dns/storage/StorageModuleFactory.java
  26. +95 −0 src/main/java/io/covert/dns/storage/accumulo/AccumuloStorageModule.java
  27. +85 −0 src/main/java/io/covert/dns/storage/accumulo/AccumuloStorageModuleFactory.java
  28. +111 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/EdgeMutationGenerator.java
  29. +74 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/EdgeMutationGeneratorFactory.java
  30. +57 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/EventMutuationGenerator.java
  31. +39 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/EventMutuationGeneratorFactory.java
  32. +26 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/MutationGenerator.java
  33. +23 −0 src/main/java/io/covert/dns/storage/accumulo/mutgen/MutationGeneratorFactory.java
  34. +70 −0 src/main/java/io/covert/dns/util/DumpResponses.java
  35. +171 −0 src/main/java/io/covert/dns/util/JsonUtils.java
  36. +73 −0 src/main/java/io/covert/util/Pair.java
  37. +30 −0 src/main/java/io/covert/util/UniqueKeyOnlyReducer.java
  38. +52 −0 src/main/java/io/covert/util/Utils.java
5 .gitignore
@@ -0,0 +1,5 @@
+.classpath
+.project
+.settings
+*~
+/target
107 README.md
@@ -0,0 +1,107 @@
+# hadoop-dns-mining
+
+This is a small framework for performing large amounts of DNS lookups using Hadoop. This is a work in progress, pull requests are welcome.
+
+## Here are the steps for getting it working:
+
+### Download, compile and install the Maxmind JAR into maven
+
+
+ wget http://geolite.maxmind.com/download/geoip/api/java/GeoIPJava-1.2.5.zip
+ unzip GeoIPJava-1.2.5.zip
+ cd GeoIPJava-1.2.5/source/com/maxmind/geoip/
+ javac *.java
+ cd ../../../
+ zip -r maxmind.jar com/
+ mvn install:install-file -Dfile=maxmind.jar -DgroupId=com.maxmind -DartifactId=geo-ip -Dversion=1.2.5 -Dpackaging=jar
+
+
+### Obtain the Maxmind IP Geo Database
+
+
+ wget http://geolite.maxmind.com/download/geoip/database/GeoLiteCity.dat.gz
+ gzip -d GeoLiteCity.dat.gz
+
+
+### Obtain the Maxmind ASN Database
+
+
+ wget http://www.maxmind.com/download/geoip/database/asnum/GeoIPASNum.dat.gz
+ gzip -d GeoIPASNum.dat.gz
+
+
+Deploy these decompressed database files to the same location on each machine in your Hadoop cluster
+(e.g. ```/usr/local/lib/maxmind/```)
+
+### Create/obtain large lists of domain names (e.g. domains.txt) and copy them into HDFS
+
+ # you may want to split these domain files before placing in HDFS in order to use more mappers
+ split -a 5 -d -l 100000 domains.txt domains_
+ hadoop fs -put domains_* /data/domains/
+
+
+### Download and build this project
+
+
+ git clone https://jt6211@github.com/jt6211/hadoop-dns-mining.git
+ cd hadoop-dns-mining
+ mvn package assembly:assembly
+
+
+### Run the various MapReduce jobs
+
+
+ # These are the records that will be requested
+ REC_TYPES=A,MX,NS,TXT
+
+ JAR=target/hadoop-dns-mining-1.0-SNAPSHOT-job.jar
+
+ # performs A record, MX record, and NS record lookups on each domain provided using 50
+ # resolving threads per Mapper using the nameserver of 8.8.8.8 and store the results in
+ # HDFS in /data/dns-mining/01_raw
+ # Note: choose the nameserver wisely, otherwise you may overload it. In testing I mainly
+ # used a bind server deployed on each hadoop node so my nameserver was 127.0.0.1
+ time hadoop jar $JAR io.covert.dns.collection.CollectionJob \
+ -D dns.collection.num.resolvers=50 \
+ -D dns.collection.nameservers=8.8.8.8 \
+ IN \
+ "$REC_TYPES" \
+ /data/domains/ \
+ /data/dns-mining/01_raw
+
+ # parse the raw responses into JSON (one record per RR in the DNS responses)
+ time hadoop jar $JAR io.covert.dns.parse.ParseJob \
+ /data/dns-mining/01_raw \
+ /data/dns-mining/02_parsed
+
+ # lookup any IP addresses in the results in the maxmind DBs and enrich the records
+ time hadoop jar $JAR io.covert.dns.geo.GeoJob \
+ /usr/local/lib/maxmind/GeoLiteCity.dat \
+ /usr/local/lib/maxmind/GeoIPASNum.dat \
+ /data/dns-mining/02_parsed \
+ /data/dns-mining/03_enriched
+
+ # run a filter job for the rec types requested as well as for rec types that commonly occur in
+ # the results as part of normal queries. This will separate the various DNS records into their
+ # own directories in HDFS
+ for X REC `echo "$REC_TYPES,SOA,NS,CNAME" | sed 's/,/\n/g'| sort -u`;
+ do
+ time hadoop jar $JAR io.covert.dns.filtering.FilterJob \
+ "type == '$REC'" \
+ /data/dns-mining/03_enriched \
+ /data/dns-mining/04_filtered-type=$REC;
+ done
+
+ # This is a JEXL expression that filters out target fields that are IP addresses
+ # and returns the target field lowercased
+ TARGET_EXPR='if(target !~ "^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\.$")return target.toLowerCase()'
+
+ # extract the 'target' field from the MX records
+ time hadoop jar $JAR io.covert.dns.extract.ExtractorJob "$TARGET_EXPR" \
+ /data/dns-mining/04_filtered-type=MX /data/dns-mining/05_extracted-mailservers
+
+ # extract the 'target' field from the NS records
+ time hadoop jar $JAR io.covert.dns.extract.ExtractorJob "$TARGET_EXPR" \
+ /data/dns-mining/04_filtered-type=NS /data/dns-mining/05_extracted-nameservers
+
+
105 pom.xml
@@ -0,0 +1,105 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>io.covert</groupId>
+ <artifactId>hadoop-dns-mining</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0-SNAPSHOT</version>
+ <name>hadoop-dns-mining</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>3.8.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-core</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>cloudtrace</artifactId>
+ <version>1.4.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.6.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.3</version>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>dnsjava</groupId>
+ <artifactId>dnsjava</artifactId>
+ <version>2.0.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ <version>1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.maxmind</groupId>
+ <artifactId>geo-ip</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+
+ </dependencies>
+
+<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <finalName>${project.name}-${project.version}</finalName>
+ <appendAssemblyId>true</appendAssemblyId>
+ <descriptors>
+ <descriptor>src/main/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </plugin>
+ </plugins>
+</build>
+
+</project>
38 src/main/assembly.xml
@@ -0,0 +1,38 @@
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>job</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <unpack>false</unpack>
+ <scope>runtime</scope>
+ <outputDirectory>lib</outputDirectory>
+ <excludes>
+ <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
+ </excludes>
+ </dependencySet>
+ <dependencySet>
+ <unpack>false</unpack>
+ <scope>system</scope>
+ <outputDirectory>lib</outputDirectory>
+ <excludes>
+ <exclude>${artifact.groupId}:${artifact.artifactId}</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+ <fileSets>
+ <fileSet>
+ <directory>${basedir}/target/classes</directory>
+ <outputDirectory>/</outputDirectory>
+ <excludes>
+ <exclude>*.jar</exclude>
+ </excludes>
+ </fileSet>
+ </fileSets>
+</assembly>
+
106 src/main/java/io/covert/dns/collection/CollectionJob.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.mortbay.log.Log;
+
+public class CollectionJob extends Configured implements Tool {
+
+ private static void usage(String msg)
+ {
+ System.err.println("Usage: hadoop jar JARFILE.jar "+CollectionJob.class.getName()+" <requestClass> <requestTypes> <inDir> <outDir>");
+ System.err.println(" requestClass - request class, e.g. IN, CH, etc");
+ System.err.println(" requestTypes - resource record types (comma delim), e.g. A,MX,NS etc");
+ System.err.println(" inDir - HDFS input dir");
+ System.err.println(" outDir - HDFS output dir");
+ System.exit(-1);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if(args.length != 4)
+ {
+ usage("");
+ }
+
+ String dclass = args[0];
+ String types = args[1];
+ String inDir = args[2];
+ String outDir = args[3];
+
+ Configuration conf = getConf();
+
+ if(conf.get("dns.collection.num.resolvers") == null)
+ conf.setInt("dns.collection.num.resolvers", 50);
+ if(conf.get("dns.collection.nameservers") == null)
+ conf.set("dns.collection.nameservers", "127.0.0.1");
+
+ Job job = new Job(conf);
+ job.setJobName(CollectionJob.class.getSimpleName()+": types="+types+", dclass="+dclass+
+ " inDir="+inDir+", outDir="+outDir+", resolvers="+conf.get("dns.collection.nameservers"));
+ job.setJarByClass(getClass());
+
+ job.setMapperClass(CollectionMapper.class);
+ job.setNumReduceTasks(0);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(BytesWritable.class);
+
+ job.setInputFormatClass(DnsRequestInputFormat.class);
+ DnsRequestInputFormat.setInputPaths(job, new Path(inDir));
+ DnsRequestInputFormat.configure(job, dclass.toUpperCase(), Arrays.asList(types.split(",")), Arrays.asList(""));
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ job.submit();
+
+ int retVal = job.waitForCompletion(true)?0:1;
+
+ CounterGroup counters = job.getCounters().getGroup(CollectionMapper.RESOLVER_GROUP);
+ Counter constructMessageMS = counters.findCounter(CollectionMapper.CONSTRUCT_MESSAGE_MS);
+ Counter parseResponseMS = counters.findCounter(CollectionMapper.PARSE_RESPONSE_MS);
+ Counter performRequestMS = counters.findCounter(CollectionMapper.PERFORM_REQUEST_MS);
+ Counter totalRequestHandlingMS = counters.findCounter(CollectionMapper.TOTAL_REQUEST_HANDLING_MS);
+
+ Log.info("Total ConstructMessage percent: "+ (double)(constructMessageMS.getValue()*100L)/((double)totalRequestHandlingMS.getValue()));
+ Log.info("Total ParseResponse percent: "+ (double)(parseResponseMS.getValue()*100L)/((double)totalRequestHandlingMS.getValue()));
+ Log.info("Total PerformRequest percent: "+ (double)(performRequestMS.getValue()*100L)/((double)totalRequestHandlingMS.getValue()));
+
+ return retVal;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new CollectionJob(), args);
+ }
+}
207 src/main/java/io/covert/dns/collection/CollectionMapper.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import io.covert.util.Pair;
+import io.covert.util.Utils;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.mortbay.log.Log;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Message;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.Type;
+
+public class CollectionMapper extends Mapper<Text, DnsRequest, Text, BytesWritable>{
+
+ private static final String REQUEST_TIMEOUTS = "REQUEST_TIMEOUTS";
+ private static final String RCODES_GROUP = "RCODES";
+ public static final String COUNTER_GROUP = CollectionMapper.class.getSimpleName();
+ public static final String RESOLVER_GROUP = ResolverThread.class.getSimpleName();
+
+ public static final String QUEUE_FULL = "QUEUE_FULL";
+ public static final String GOOD_NAME = "GOOD_NAME";
+ public static final String BAD_NAME = "BAD_NAME";
+ public static final String PERFORM_REQUEST_MS = "PerformRequestMS";
+ public static final String TOTAL_REQUEST_HANDLING_MS = "TotalRequestHandlingMS";
+ public static final String PARSE_RESPONSE_MS = "ParseResponseMS";
+ public static final String CONSTRUCT_MESSAGE_MS = "ConstructMessageMS";
+ public static final String NUM_REQUESTS = "NUM_REQUESTS";
+ public static final String LOOKUP_FAILURES = "LOOKUP_FAILURES";
+ public static final String REQUEST_PARSE_FAILURES = "REQUEST_PARSE_FAILURES";
+
+ final ConcurrentLinkedQueue<DnsRequest> inQueue = new ConcurrentLinkedQueue<DnsRequest>();
+ final ConcurrentLinkedQueue<Pair<Record, Message>> outQueue = new ConcurrentLinkedQueue<Pair<Record, Message>>();
+ final AtomicLong inQueueSize = new AtomicLong(0);
+ List<ResolverThread> threads = new LinkedList<ResolverThread>();
+ WriterThread writer;
+ long maxOutstandingRequests;
+
+ protected void setup(Context context) throws java.io.IOException ,InterruptedException
+ {
+ Configuration conf = context.getConfiguration();
+
+ writer = new WriterThread(outQueue, context);
+ writer.start();
+
+ int numThreads = conf.getInt("dns.collection.num.resolvers", 50);
+ String[] nameservers = conf.get("dns.collection.nameservers").split(",");
+ maxOutstandingRequests = conf.getLong("dns.collection.max.outstanding.requests", 5000);
+ int timeoutSecs = conf.getInt("dns.collection.timeout.secs", 5);
+
+ if(nameservers.length == 0)
+ {
+ throw new IOException("dns.collection.num.resolvers was not defined correctly");
+ }
+
+ for(int i = 0; i < numThreads; ++i)
+ {
+ ResolverThread res = new ResolverThread(inQueue, inQueueSize, outQueue, nameservers, timeoutSecs);
+ res.start();
+ threads.add(res);
+ }
+ }
+
+ // parse input query request
+ // put request on queue if queue isn't too big
+ //
+
+ @Override
+ protected void map(Text domain, DnsRequest request, org.apache.hadoop.mapreduce.Mapper<Text,DnsRequest,Text, BytesWritable>.Context context)
+ throws java.io.IOException ,InterruptedException
+ {
+ inQueue.add(request);
+ long queueSize = inQueueSize.incrementAndGet();
+ context.setStatus("Queue size: "+queueSize);
+ while(queueSize >= maxOutstandingRequests)
+ {
+ context.getCounter(COUNTER_GROUP, QUEUE_FULL).increment(1);
+ context.setStatus("Queue size: "+queueSize);
+ Utils.sleep(100);
+ queueSize = inQueueSize.get();
+ }
+ }
+
+ // wait until in queue is empty
+ // notify all threads to finish
+ // wait for outQueue to go to zero, sleep a few seconds, stop writer thread
+
+ protected void cleanup(Context context)
+ throws java.io.IOException ,InterruptedException
+ {
+ context.setStatus("Cleanup: Queue size: "+ inQueueSize.get());
+ Log.info("Stopping Resolver Threads ...");
+ for(ResolverThread res : threads)
+ {
+ res.stopRunning();
+ }
+
+ while(inQueueSize.get() > 0)
+ {
+ context.setStatus("Cleanup: Queue size: "+ inQueueSize.get());
+ Log.info("Cleanup: Queue size: "+ inQueueSize.get());
+ Utils.sleep(1000);
+ }
+
+ Log.info("Joining Resolver Threads ...");
+ for(ResolverThread res : threads)
+ {
+ res.join();
+ context.getCounter(RESOLVER_GROUP, CONSTRUCT_MESSAGE_MS).increment(res.getConstructMessageMS());
+ context.getCounter(RESOLVER_GROUP, PARSE_RESPONSE_MS).increment(res.getParseResponseMS());
+ context.getCounter(RESOLVER_GROUP, TOTAL_REQUEST_HANDLING_MS).increment(res.getTotalRequestHandlingMS());
+ context.getCounter(RESOLVER_GROUP, PERFORM_REQUEST_MS).increment(res.getPerformRequestMS());
+
+ context.getCounter(RESOLVER_GROUP, NUM_REQUESTS).increment(res.getNumRequests());
+ context.getCounter(RESOLVER_GROUP, LOOKUP_FAILURES).increment(res.getLookupsFailures());
+ context.getCounter(RESOLVER_GROUP, REQUEST_TIMEOUTS).increment(res.getRequestTimeouts());
+ context.getCounter(RESOLVER_GROUP, REQUEST_PARSE_FAILURES).increment(res.getRequestParseFailures());
+
+ for(Object rcode : res.getRcodes().uniqueSet())
+ {
+ int count = res.getRcodes().getCount(rcode);
+ context.getCounter(RCODES_GROUP, rcode.toString()).increment(count);
+ }
+
+ Log.info("This thread perfomed: "+res.getNumRequests()+" DNS requests");
+ Log.info("ConstructMessage percent: "+ (double)(res.getConstructMessageMS()*100L)/((double)res.getTotalRequestHandlingMS()));
+ Log.info("ParseResponse percent: "+ (double)(res.getParseResponseMS()*100L)/((double)res.getTotalRequestHandlingMS()));
+ Log.info("PerformRequest percent: "+ (double)(res.getPerformRequestMS()*100L)/((double)res.getTotalRequestHandlingMS()));
+ Log.info("---");
+ }
+
+ Log.info("Stopping Writer ...");
+ writer.stopRunning();
+ writer.join();
+ Log.info("Writer Joined");
+ }
+
+
+
+ private static class WriterThread extends Thread
+ {
+ Context context;
+ ConcurrentLinkedQueue<Pair<Record, Message>> outQueue;
+ volatile boolean keepRunning = true;
+
+ StringBuilder buffer = new StringBuilder();
+ Text outKey = new Text();
+ BytesWritable outVal = new BytesWritable();
+
+ public WriterThread(ConcurrentLinkedQueue<Pair<Record, Message>> outQueue, Context context)
+ {
+ this.context = context;
+ this.outQueue = outQueue;
+ }
+
+ private void stopRunning() {
+ keepRunning = false;
+ }
+
+ @Override
+ public void run() {
+ while(keepRunning || !outQueue.isEmpty())
+ {
+ try {
+ Pair<Record, Message> value = outQueue.remove();
+
+ buffer.setLength(0);
+ buffer.append(value.getKey().getName().toString()).append("\t");
+ buffer.append(DClass.string(value.getKey().getDClass())).append("\t");
+ buffer.append(Type.string(value.getKey().getType()));
+ outKey.set(buffer.toString());
+
+ byte[] result = value.getValue().toWire();
+ outVal.set(result, 0, result.length);
+
+ context.write(outKey, outVal);
+ } catch (Exception e) {
+ Utils.sleep(100);
+ }
+ }
+ }
+ }
+}
140 src/main/java/io/covert/dns/collection/DnsRequest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Type;
+
+public class DnsRequest implements WritableComparable<DnsRequest> {
+
+ String name = "";
+ int requestType = 0;
+ int dclass = 0;
+
+ public DnsRequest(){}
+
+ public DnsRequest(String name, int requestType, int dclass) {
+ super();
+ this.name = name;
+ this.requestType = requestType;
+ this.dclass = dclass;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = in.readUTF();
+ requestType = in.readInt();
+ dclass = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(name);
+ out.writeInt(requestType);
+ out.writeInt(dclass);
+ }
+
+ protected String getName() {
+ return name;
+ }
+
+ protected void setName(String name) {
+ this.name = name;
+ }
+
+ protected int getRequestType() {
+ return requestType;
+ }
+
+ protected void setRequestType(int requestType) {
+ this.requestType = requestType;
+ }
+
+ protected int getDclass() {
+ return dclass;
+ }
+
+ protected void setDclass(int dclass) {
+ this.dclass = dclass;
+ }
+
+ @Override
+ public int compareTo(DnsRequest o) {
+
+ int ret = name.compareTo(o.name);
+ if(ret == 0)
+ {
+ if(requestType > o.requestType)
+ return 1;
+ else if(requestType < o.requestType)
+ return -1;
+ else
+ {
+ if(dclass > o.dclass)
+ return 1;
+ else if(dclass < o.dclass)
+ return -1;
+ else
+ return 0;
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + dclass;
+ result = prime * result + ((name == null) ? 0 : name.hashCode());
+ result = prime * result + requestType;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ DnsRequest other = (DnsRequest) obj;
+ if (dclass != other.dclass)
+ return false;
+ if (name == null) {
+ if (other.name != null)
+ return false;
+ } else if (!name.equals(other.name))
+ return false;
+ if (requestType != other.requestType)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+
+ return "[name="+name+",dclass="+DClass.string(dclass)+",type="+Type.string(requestType)+"]";
+ }
+}
62 src/main/java/io/covert/dns/collection/DnsRequestInputFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import io.covert.util.Utils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Type;
+
+public class DnsRequestInputFormat extends FileInputFormat<Text, DnsRequest> {
+
+ @Override
+ public RecordReader<Text, DnsRequest> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+
+ int dclass = DClass.value(conf.get("dns.request.dclass", "IN"));
+ List<String> subdomains = Arrays.asList(conf.get("dns.requests.subdomains", "").split(","));
+
+ List<Integer> types = new LinkedList<Integer>();
+ for(String type : conf.get("dns.request.types", "A").split(","))
+ types.add(Type.value(type));
+
+ return new DnsRequestRecordReader(subdomains, types, dclass);
+ }
+
+ public static void configure(Job job, String dclass, Collection<String> recordTypes, Collection<String> subDomains)
+ {
+ Configuration conf = job.getConfiguration();
+ conf.set("dns.request.dclass", dclass);
+ conf.set("dns.request.types", Utils.join(",", recordTypes));
+ conf.set("dns.requests.subdomains", Utils.join(",", subDomains));
+ }
+}
131 src/main/java/io/covert/dns/collection/DnsRequestRecordReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import io.covert.util.Pair;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+
+public class DnsRequestRecordReader extends RecordReader<Text, DnsRequest> {
+
+ Pair<Text, DnsRequest> pair = null;
+ LineRecordReader lineReader = new LineRecordReader();
+
+ Iterable<String> subdomains;
+ Iterable<Integer> types;
+ int dclass;
+
+ List<Pair<Text, DnsRequest>> outstandingRequests = new LinkedList<Pair<Text,DnsRequest>>();
+
+ public DnsRequestRecordReader(Iterable<String> subdomains, Iterable<Integer> types, int dclass)
+ {
+ this.subdomains = subdomains;
+ this.types = types;
+ this.dclass = dclass;
+ }
+
+ @Override
+ public void close() throws IOException {
+ lineReader.close();
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+
+ if(pair == null)
+ return null;
+
+ return pair.getKey();
+ }
+
+ @Override
+ public DnsRequest getCurrentValue() throws IOException, InterruptedException {
+ if(pair == null)
+ return null;
+
+ return pair.getValue();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return lineReader.getProgress();
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ lineReader.initialize(split, context);
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+
+ if(outstandingRequests.size() > 0)
+ {
+ pair = outstandingRequests.remove(0);
+ return true;
+ }
+
+ if(lineReader.nextKeyValue())
+ {
+ Text line = lineReader.getCurrentValue();
+ for(Integer type : types)
+ {
+ for(String subDomain: subdomains)
+ {
+ String name = subDomain;
+ if(name.equals(""))
+ {
+ // allow for lookups on just the domain without any subdomains
+ name = line.toString().trim();
+ }
+ else if(name.endsWith("."))
+ {
+ name += line.toString().trim();
+ }
+ else
+ {
+ name += "."+line.toString().trim();
+ }
+
+ if(!name.endsWith("."))
+ {
+ name += ".";
+ }
+
+ outstandingRequests.add(new Pair<Text, DnsRequest>(line, new DnsRequest(name, type, dclass)));
+ }
+ }
+
+ if(outstandingRequests.size() > 0)
+ {
+ pair = outstandingRequests.remove(0);
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
229 src/main/java/io/covert/dns/collection/ResolverThread.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.collection;
+
+import io.covert.util.Pair;
+import io.covert.util.Utils;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.collections.Bag;
+import org.apache.commons.collections.bag.HashBag;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.xbill.DNS.DClass;
+import org.xbill.DNS.Message;
+import org.xbill.DNS.Name;
+import org.xbill.DNS.Rcode;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.Resolver;
+import org.xbill.DNS.SimpleResolver;
+import org.xbill.DNS.TextParseException;
+import org.xbill.DNS.Type;
+
+public class ResolverThread extends Thread {
+
+ private static final Logger LOG = Logger.getLogger(ResolverThread.class);
+ volatile boolean keepRunning = true;
+
+ Random random = new Random();
+ ConcurrentLinkedQueue<DnsRequest> inQueue;
+ AtomicLong inQueueSize;
+ ConcurrentLinkedQueue<Pair<Record, Message>> outQueue;
+ Resolver[] resolvers;
+ String[] nameservers;
+ Bag rcodes = new HashBag();
+
+ long constructMessageMS = 0;
+ long performRequestMS = 0;
+ long parseResponseMS = 0;
+ long totalRequestHandlingMS = 0;
+
+ long numRequests = 0;
+ long lookupsFailures = 0;
+ long requestTimeouts = 0;
+ long requestParseFailures = 0;
+
+ public ResolverThread(
+ ConcurrentLinkedQueue<DnsRequest> inQueue,
+ AtomicLong inQueueSize,
+ ConcurrentLinkedQueue<Pair<Record, Message>> outQueue,
+ String[] nameservers,
+ int timeoutSecs) {
+ super();
+ this.inQueue = inQueue;
+ this.inQueueSize = inQueueSize;
+ this.outQueue = outQueue;
+ this.nameservers = nameservers;
+
+ resolvers = new Resolver[nameservers.length];
+ for(int i = 0 ; i < resolvers.length; ++i)
+ {
+ try {
+ resolvers[i] = new SimpleResolver(nameservers[i]);
+ resolvers[i].setTimeout(timeoutSecs);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Could not initial resolver for host="+nameservers[i]);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+
+ while(keepRunning || !inQueue.isEmpty())
+ {
+ try {
+ DnsRequest req = inQueue.remove();
+ inQueueSize.decrementAndGet();
+ long elapsed = System.currentTimeMillis();
+ Pair<Record, Message> resp = process(req);
+ elapsed = System.currentTimeMillis() - elapsed;
+ totalRequestHandlingMS += elapsed;
+ numRequests++;
+
+ if(resp != null)
+ outQueue.add(resp);
+
+ } catch (NoSuchElementException e) {
+ Utils.sleep(100);
+ }
+ }
+ }
+
+ private Pair<Record, Message> process(DnsRequest req)
+ {
+ Record requestRecord = null;
+ Pair<Record, Message> result = null;
+
+ // pick a random nameserver
+ int index = random.nextInt(resolvers.length);
+ String nameserver = nameservers[index];
+
+ long elapsed = System.currentTimeMillis();
+ Message request;
+
+ try {
+ requestRecord = Record.newRecord(Name.fromString(req.getName()), req.getRequestType(), req.getDclass());
+ request = Message.newQuery(requestRecord);
+
+ } catch (TextParseException e) {
+ LOG.error("Failed to parse name: "+req);
+ ++requestParseFailures;
+ return null;
+ }
+ elapsed = System.currentTimeMillis() - elapsed;
+ constructMessageMS += elapsed;
+
+ elapsed = System.currentTimeMillis();
+ Message response = null;
+ try {
+ response = resolvers[index].send(request);
+ rcodes.add(Rcode.string(response.getRcode()));
+ result = new Pair<Record, Message>(requestRecord, response);
+ }
+ catch(SocketTimeoutException e)
+ {
+ LOG.error("Timed out when resolving name: "+req+" at nameserver: "+nameserver+", reason: "+e.getMessage());
+ ++requestTimeouts;
+ }
+ catch (IOException e) {
+ LOG.error("Failed resolving name: "+req+" at nameserver: "+nameserver+", reason: "+e.getMessage());
+ ++lookupsFailures;
+ }
+ elapsed = System.currentTimeMillis() - elapsed;
+ performRequestMS += elapsed;
+
+ return result;
+ }
+
+ public void stopRunning()
+ {
+ this.keepRunning = false;
+ }
+
+ public long getConstructMessageMS() {
+ return constructMessageMS;
+ }
+
+ public long getPerformRequestMS() {
+ return performRequestMS;
+ }
+
+ public long getParseResponseMS() {
+ return parseResponseMS;
+ }
+
+ public long getTotalRequestHandlingMS() {
+ return totalRequestHandlingMS;
+ }
+
+ public long getNumRequests() {
+ return numRequests;
+ }
+
+ public long getLookupsFailures() {
+ return lookupsFailures;
+ }
+
+ public long getRequestParseFailures() {
+ return requestParseFailures;
+ }
+
+ public long getRequestTimeouts() {
+ return requestTimeouts;
+ }
+
+ public Bag getRcodes() {
+ return rcodes;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ ConcurrentLinkedQueue<DnsRequest> inQueue = new ConcurrentLinkedQueue<DnsRequest>();
+ AtomicLong inQueueSize = new AtomicLong(0);
+ ConcurrentLinkedQueue<Pair<Record, Message>> outQueue = new ConcurrentLinkedQueue<Pair<Record, Message>>();
+ String[] nameservers = new String[]{"8.8.8.8"};
+
+ inQueue.add(new DnsRequest("www6.google.com.", Type.AAAA, DClass.IN));
+ inQueue.add(new DnsRequest("ipv6.google.com.", Type.AAAA, DClass.IN));
+ inQueue.add(new DnsRequest("gmail.com.", Type.AAAA, DClass.IN));
+ inQueueSize.incrementAndGet();
+ inQueueSize.incrementAndGet();
+ inQueueSize.incrementAndGet();
+
+ ResolverThread res = new ResolverThread(inQueue, inQueueSize, outQueue, nameservers, 5);
+ res.start();
+ res.stopRunning();
+ res.join();
+
+ Pair<Record, Message> result = outQueue.remove();
+ System.out.println(result);
+
+ result = outQueue.remove();
+ System.out.println(result);
+
+ result = outQueue.remove();
+ System.out.println(result);
+ }
+}
41 src/main/java/io/covert/dns/extract/Extractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.extract;
+
+import java.util.Map;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+
+public class Extractor {
+
+ final JexlEngine engine;
+ final Expression expr;
+
+ public Extractor(String jexlExpression)
+ {
+ engine = new JexlEngine();
+ engine.setSilent(true);
+ expr = engine.createExpression(jexlExpression);
+ }
+
+ public Object extract(Map<String, Object> record)
+ {
+ return expr.evaluate(new MapContext(record));
+ }
+}
88 src/main/java/io/covert/dns/extract/ExtractorJob.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.extract;
+
+import io.covert.util.UniqueKeyOnlyReducer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class ExtractorJob extends Configured implements Tool {
+
+ private static void usage(String msg)
+ {
+ System.err.println("Usage: hadoop jar JARFILE.jar "+ExtractorJob.class.getName()+" <expression> <inDir> <outDir>");
+ System.err.println(" expression - JEXL extraction expression");
+ System.err.println(" inDir - HDFS input dir");
+ System.err.println(" outDir - HDFS output dir");
+ System.exit(-1);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if(args.length != 3)
+ {
+ usage("");
+ }
+
+ String expression = args[0];
+ String inDir = args[1];
+ String outDir = args[2];
+
+ Configuration conf = getConf();
+ conf.set(ExtractorMapper.EXTRACTOR_JEXL_EXPRESSION, expression);
+
+ Job job = new Job(conf);
+ job.setJobName(ExtractorJob.class.getSimpleName()+": inDir="+inDir+", outDir="+outDir+", expression=["+expression+"]");
+ job.setJarByClass(getClass());
+
+ job.setMapperClass(ExtractorMapper.class);
+ job.setReducerClass(UniqueKeyOnlyReducer.class);
+ job.setNumReduceTasks(new JobClient(new JobConf(conf)).getClusterStatus().getTaskTrackers());
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, new Path(inDir));
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ job.submit();
+
+ int retVal = job.waitForCompletion(true)?0:1;
+ return retVal;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new ExtractorJob(), args);
+ }
+}
62 src/main/java/io/covert/dns/extract/ExtractorMapper.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.extract;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class ExtractorMapper extends Mapper<Text, Text, Text, Text> {
+
+ public static final String EXTRACTOR_JEXL_EXPRESSION = "extractor.jexl.expression";
+ public static final String STRIP_OUTER_QUOTES = "extractor.strip.outer.quotes";
+
+ Extractor extractor;
+ boolean stripOuterQuotes;
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ Text outKey = new Text();
+
+ protected void setup(Context context) throws java.io.IOException ,InterruptedException
+ {
+ extractor = new Extractor(context.getConfiguration().get(EXTRACTOR_JEXL_EXPRESSION));
+ stripOuterQuotes = context.getConfiguration().getBoolean(STRIP_OUTER_QUOTES, true);
+ }
+
+ protected void map(Text jsonRecord, Text empty, Context context) throws java.io.IOException ,InterruptedException
+ {
+ Map<String, Object> record = objectMapper.readValue(jsonRecord.getBytes(), new TypeReference<Map<String, Object>>(){});
+ Object result = extractor.extract(record);
+ if(result == null)
+ {
+ context.getCounter(ExtractorMapper.class.getSimpleName(), "NO RESULT").increment(1);
+ }
+ else
+ {
+ context.getCounter(ExtractorMapper.class.getSimpleName(), "RESULT").increment(1);
+ String val = objectMapper.writeValueAsString(result);
+ if(stripOuterQuotes)
+ val = val.replaceAll("^\"+", "").replaceAll("\"+$", "");
+ outKey.set(val);
+ context.write(outKey, empty);
+ }
+ context.progress();
+ }
+}
57 src/main/java/io/covert/dns/filtering/Filter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.filtering;
+
+import java.util.Map;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class Filter {
+
+ static
+ {
+ // the WARN messages occur if the expression contains a var that is not in the record.
+ // not good for many expressions, esp ones filtering on IP GEO info
+ Logger.getLogger("org.apache.commons.jexl2").setLevel(Level.ERROR);
+ }
+
+ JexlEngine engine = new JexlEngine();;
+ Expression expr;
+
+ public Filter(String jexlExpression)
+ {
+ expr = engine.createExpression(jexlExpression);
+ }
+
+ public boolean filter(Map<String, Object> record)
+ {
+ JexlContext context = new MapContext(record);
+ Object val = expr.evaluate(context);
+ if (val instanceof Boolean) {
+ Boolean bool = (Boolean) val;
+ return bool;
+ }
+ return false;
+ }
+}
87 src/main/java/io/covert/dns/filtering/FilterJob.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.filtering;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class FilterJob extends Configured implements Tool {
+
+ private static void usage(String msg)
+ {
+ System.err.println("Usage: hadoop jar JARFILE.jar "+FilterJob.class.getName()+" <filter> <inDir> <outDir>");
+ System.err.println(" filter - JEXL filter expression");
+ System.err.println(" inDir - HDFS input dir");
+ System.err.println(" outDir - HDFS output dir");
+ System.exit(-1);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if(args.length != 3)
+ {
+ usage("");
+ }
+
+ String filter = args[0];
+ String inDir = args[1];
+ String outDir = args[2];
+
+ Configuration conf = getConf();
+ conf.set(FilterMapper.FILTER_JEXL_EXPRESSION, filter);
+
+ Job job = new Job(conf);
+ job.setJobName(FilterJob.class.getSimpleName()+": inDir="+inDir+", outDir="+outDir+", filter=["+filter+"]");
+ job.setJarByClass(getClass());
+
+ job.setMapperClass(FilterMapper.class);
+ job.setReducerClass(Reducer.class); // Identity Reduce...
+ job.setNumReduceTasks(new JobClient(new JobConf(conf)).getClusterStatus().getTaskTrackers());
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, new Path(inDir));
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ job.submit();
+
+ int retVal = job.waitForCompletion(true)?0:1;
+ return retVal;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new FilterJob(), args);
+ }
+}
54 src/main/java/io/covert/dns/filtering/FilterMapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.filtering;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class FilterMapper extends Mapper<Text, Text, Text, Text> {
+
+ public static final String FILTER_JEXL_EXPRESSION = "filter.jexl.expression";
+ private static final String MATCHED = "MATCHED";
+ private static final String NON_MATCHED = "NON-MATCHED";
+ Filter filter;
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ protected void setup(Context context) throws java.io.IOException ,InterruptedException
+ {
+ filter = new Filter(context.getConfiguration().get(FILTER_JEXL_EXPRESSION));
+ }
+
+ protected void map(Text jsonRecord, Text empty, Context context) throws java.io.IOException ,InterruptedException
+ {
+ Map<String, Object> record = objectMapper.readValue(jsonRecord.getBytes(), new TypeReference<Map<String, Object>>(){});
+ if(filter.filter(record))
+ {
+ context.getCounter(FilterMapper.class.getSimpleName(), MATCHED).increment(1);
+ context.write(jsonRecord, empty);
+ }
+ else
+ {
+ context.getCounter(FilterMapper.class.getSimpleName(), NON_MATCHED).increment(1);
+ }
+ context.progress();
+ }
+
+}
86 src/main/java/io/covert/dns/geo/GeoJob.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.geo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class GeoJob extends Configured implements Tool {
+
+ private static void usage(String msg)
+ {
+ System.err.println("Usage: hadoop jar JARFILE.jar "+GeoJob.class.getName()+" <maxmindDB> <masxmindASNDB> <inDir> <outDir>");
+ System.err.println(" maxmindDB - maxmind geo db");
+ System.err.println(" maxmindASNDB - maxmind ASN db");
+ System.err.println(" inDir - HDFS input dir");
+ System.err.println(" outDir - HDFS output dir");
+ System.exit(-1);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ if(args.length != 4)
+ {
+ usage("");
+ }
+
+ String dbfile = args[0];
+ String asnDbfile = args[1];
+ String inDir = args[2];
+ String outDir = args[3];
+
+ Configuration conf = getConf();
+ conf.set("maxmind.geo.database.file", dbfile);
+ conf.set("maxmind.asn.database.file", asnDbfile);
+
+ Job job = new Job(conf);
+ job.setJobName(GeoJob.class.getSimpleName()+": dbfile="+dbfile+", asnDB="+asnDbfile+" inDir="+inDir+", outDir="+outDir);
+ job.setJarByClass(getClass());
+
+ job.setMapperClass(GeoMapper.class);
+ job.setNumReduceTasks(0);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, new Path(inDir));
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ job.submit();
+
+ int retVal = job.waitForCompletion(true)?0:1;
+ return retVal;
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new GeoJob(), args);
+ }
+}
65 src/main/java/io/covert/dns/geo/GeoMapper.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.geo;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.maxmind.geoip.LookupService;
+
+public class GeoMapper extends Mapper<Text, Text, Text, Text> {
+
+ IpGeo geo;
+ ObjectMapper objectMapper = new ObjectMapper();
+ Text outKey = new Text();
+
+ protected void setup(Context context) throws java.io.IOException ,InterruptedException
+ {
+ Configuration conf = context.getConfiguration();
+ geo = new IpGeo(conf.get("maxmind.geo.database.file"), conf.get("maxmind.asn.database.file"), LookupService.GEOIP_MEMORY_CACHE);
+ }
+
+ protected void map(Text json, Text empty, Context context) throws java.io.IOException ,InterruptedException
+ {
+ Map<String, Object> rec = objectMapper.readValue(json.getBytes(), new TypeReference<Map<String, Object>>(){});
+ String ip = (String)rec.get("addr");
+ if(ip != null)
+ {
+ // annotate with IP Geo info...
+ Map<String, Object> loc = geo.getLocation(ip);
+ rec.putAll(loc);
+ outKey.set(objectMapper.writeValueAsBytes(rec));
+ context.write(outKey, empty);
+ }
+ else
+ {
+ // No Geo, but pass through anyway
+ context.write(json, empty);
+ }
+ }
+
+ protected void cleanup(Context context) throws java.io.IOException ,InterruptedException
+ {
+ if(geo != null)
+ geo.close();
+ }
+}
159 src/main/java/io/covert/dns/geo/IpGeo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.geo;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
+import com.maxmind.geoip.Location;
+import com.maxmind.geoip.LookupService;
+
+public class IpGeo {
+
+ private static final String UNKNOWN = "UNKNOWN";
+ private static Map<String, Object> UNKNOWN_GEO = new HashMap<String, Object>();
+
+ private static final String RFC1918 = "RFC1918";
+ private static Map<String, Object> RFC1918_GEO = new HashMap<String, Object>();
+
+ private static final Pattern asnumPattern = Pattern.compile("^AS(\\d+)\\s+(.+)$");
+
+ private static final String[] rfc1918Slash8Prefixes = {
+ "10.", "192.", "172."
+ };
+
+ private static final String[] rfc1918AddressPrefixes = {
+ "10.", "192.168.",
+ "172.16.", "172.17.", "172.18.", "172.19.",
+ "172.20.", "172.21.", "172.22.", "172.23.",
+ "172.24.", "172.25.", "172.26.", "172.27.",
+ "172.28.", "172.29.", "172.30.", "172.31.",
+ };
+
+ static
+ {
+ put(UNKNOWN_GEO, "city", UNKNOWN);
+ put(UNKNOWN_GEO, "cc", UNKNOWN);
+ put(UNKNOWN_GEO, "country", UNKNOWN);
+ put(UNKNOWN_GEO, "lat", 0.0f);
+ put(UNKNOWN_GEO, "long", 0.0f);
+ put(UNKNOWN_GEO, "asn", UNKNOWN);
+ put(UNKNOWN_GEO, "org", UNKNOWN);
+
+ put(RFC1918_GEO, "city", RFC1918);
+ put(RFC1918_GEO, "cc", RFC1918);
+ put(RFC1918_GEO, "country", RFC1918);
+ put(RFC1918_GEO, "lat", 0.0f);
+ put(RFC1918_GEO, "long", 0.0f);
+ put(RFC1918_GEO, "asn", RFC1918);
+ put(RFC1918_GEO, "org", RFC1918);
+ }
+
+ LookupService maxmind;
+ LookupService asnLookup;
+ LRUMap lru = new LRUMap(10000);
+
+ public IpGeo(String dbFile, int options) throws IOException
+ {
+ maxmind = new LookupService(dbFile, LookupService.GEOIP_MEMORY_CACHE);
+ }
+
+ public IpGeo(String locationDbFile, String asnDbFile, int options) throws IOException
+ {
+ maxmind = new LookupService(locationDbFile, options);
+ asnLookup = new LookupService(asnDbFile, options);
+ }
+
+ public Map<String, Object> getLocation(String ip) throws JsonGenerationException, JsonMappingException, IOException
+ {
+ for(String prefix : rfc1918Slash8Prefixes)
+ {
+ if(ip.startsWith(prefix))
+ {
+ for(String addrPrefix : rfc1918AddressPrefixes)
+ {
+ if(ip.startsWith(addrPrefix))
+ return RFC1918_GEO;
+ }
+ break;
+ }
+ }
+
+ Object val = lru.get(ip);
+ if(val != null)
+ return (Map<String, Object>)val;
+
+ Location loc = maxmind.getLocation(ip);
+ if(loc == null)
+ {
+ lru.put(ip, UNKNOWN_GEO);
+ return UNKNOWN_GEO;
+ }
+
+ Map<String, Object> rec = new HashMap<String, Object>();
+ put(rec, "city", loc.city);
+ put(rec, "cc", loc.countryCode);
+ put(rec, "country", loc.countryName);
+ put(rec, "lat", loc.latitude);
+ put(rec, "long", loc.longitude);
+
+ if(asnLookup != null)
+ {
+ String org = asnLookup.getOrg(ip);
+ if(org != null)
+ {
+ Matcher mat = asnumPattern.matcher(org);
+ if(mat.matches())
+ {
+ put(rec, "asn", mat.group(1));
+ put(rec, "org", mat.group(2));
+ }
+ else
+ {
+ put(rec, "org", org);
+ }
+ }
+
+ }
+
+ lru.put(ip, rec);
+ return rec;
+ }
+
+ private static void put(Map<String, Object> rec , String name, Object val)
+ {
+ if(val != null)
+ rec.put(name, val.toString().toUpperCase());
+ }
+
+ public void close() {
+ lru.clear();
+ maxmind.close();
+
+ if(asnLookup != null)
+ {
+ asnLookup.close();
+ }
+ }
+}
72 src/main/java/io/covert/dns/parse/ParseJob.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.parse;
+
+import io.covert.util.UniqueKeyOnlyReducer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class ParseJob extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new ParseJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ String inDir = args[0];
+ String outDir = args[1];
+
+ Configuration conf = getConf();
+
+ Job job = new Job(conf);
+ job.setJobName(ParseJob.class.getSimpleName()+": inDir="+inDir+", outDir="+outDir);
+ job.setJarByClass(getClass());
+
+ job.setMapperClass(ParseMapper.class);
+ job.setReducerClass(UniqueKeyOnlyReducer.class);
+ job.setNumReduceTasks(new JobClient(new JobConf(conf)).getClusterStatus().getTaskTrackers());
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, new Path(inDir));
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(outDir));
+ SequenceFileOutputFormat.setCompressOutput(job, true);
+ job.submit();
+
+ return job.waitForCompletion(true)?0:1;
+ }
+
+}
60 src/main/java/io/covert/dns/parse/ParseMapper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.parse;
+
+import io.covert.dns.util.JsonUtils;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.xbill.DNS.Message;
+import org.xbill.DNS.Record;
+import org.xbill.DNS.Section;
+
+public class ParseMapper extends Mapper<Text, BytesWritable, Text, Text> {
+
+ Text outKey = new Text("");
+ Text outVal = new Text("");
+
+ boolean ignoreTTL = true;
+
+ protected void setup(Context context) throws java.io.IOException, InterruptedException
+ {
+ ignoreTTL = context.getConfiguration().getBoolean("parse.ignore.ttl", true);
+ }
+
+ protected void map(Text key, BytesWritable value, Context context) throws java.io.IOException ,InterruptedException
+ {
+ try {
+ Message msg = new Message(value.getBytes());
+
+ int[] sections = {Section.ANSWER, Section.ADDITIONAL, Section.AUTHORITY};
+ for(int section : sections)
+ {
+ for(Record record : msg.getSectionArray(section))
+ {
+ String json = JsonUtils.toJson(record, ignoreTTL);
+ outKey.set(json);
+ context.write(outKey, outVal);
+ }
+ }
+ } catch (Exception e) {
+ context.getCounter(getClass().getSimpleName(), "PARSE_FAIL").increment(1);
+ e.printStackTrace();
+ }
+ }
+}
62 src/main/java/io/covert/dns/storage/StorageJob.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+public class StorageJob extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new StorageJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ String inDir = args[0];
+ Configuration conf = getConf();
+ Job job = new Job(conf);
+ job.setJarByClass(getClass());
+ job.setJobName(StorageJob.class.getSimpleName()+": inDir="+inDir);
+
+ job.setMapperClass(StorageMapper.class);
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, new Path(inDir));
+
+ // This job doesn't write output via Hadoop, it uses the configured storage modules
+ job.setOutputFormatClass(NullOutputFormat.class);
+ job.setMapOutputKeyClass(NullWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ job.submit();
+
+ return job.waitForCompletion(true)?0:1;
+ }
+}
74 src/main/java/io/covert/dns/storage/StorageMapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.storage;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class StorageMapper extends Mapper<Text, Text, NullWritable, NullWritable> {
+
+ StorageModule storageModule = null;
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ protected void setup(Context context) throws java.io.IOException, InterruptedException
+ {
+ Configuration conf = context.getConfiguration();
+ try {
+ Class<StorageModuleFactory> clz =
+ (Class<StorageModuleFactory>)Class.forName(conf.get("storage.module.factory"));
+ StorageModuleFactory factory = clz.newInstance();
+ storageModule = factory.create(conf);
+
+ } catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void map(Text jsonRecord, Text empty, Context context) throws java.io.IOException, InterruptedException
+ {
+ Map<String, Object> record;
+ try {
+ record = objectMapper.readValue(jsonRecord.getBytes(), new TypeReference<Map<String, Object>>(){});
+ } catch (Exception e) {
+ context.getCounter("ERROR:"+e.getClass().getSimpleName(), "BAD_RECORDS").increment(1);
+ return;
+ }
+
+ try {
+ storageModule.store(record);
+ context.getCounter(storageModule.getClass().getSimpleName(), "RECORDS_STORED").increment(1);
+ } catch (Exception e) {
+ context.getCounter("ERROR:"+e.getClass().getSimpleName(),
+ "FAILED_STORE:"+storageModule.getClass().getSimpleName()).increment(1);
+ }
+ context.getCounter("TOTAL", "RECORDS").increment(1);
+ context.progress();
+ }
+
+ protected void cleanup(Context context) throws java.io.IOException, InterruptedException
+ {
+ storageModule.flush();
+ storageModule.close();
+ }
+}
28 src/main/java/io/covert/dns/storage/StorageModule.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.storage;
+
+import java.util.Map;
+
+public interface StorageModule {
+
+ public void store(Map<String, Object> record) throws java.io.IOException;
+
+ public void close() throws java.io.IOException;
+
+ public void flush() throws java.io.IOException;
+}
23 src/main/java/io/covert/dns/storage/StorageModuleFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface StorageModuleFactory {
+ public StorageModule create(Configuration conf) throws Exception;
+}
95 src/main/java/io/covert/dns/storage/accumulo/AccumuloStorageModule.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.covert.dns.storage.accumulo;
+
+import io.covert.dns.storage.StorageModule;
+import io.covert.dns.storage.accumulo.mutgen.MutationGenerator;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.data.Mutation;
+
+public class AccumuloStorageModule implements StorageModule {
+
+ Collection<MutationGenerator> generators = new LinkedList<MutationGenerator>();
+ MultiTableBatchWriter bw;
+ Connector conn;
+
+ public AccumuloStorageModule(String inst, String zooKeepers, String user, String password, long maxMemory, long maxLatency, int maxWriteThreads, Collection<MutationGenerator> generators)
+ throws AccumuloException, AccumuloSecurityException
+ {
+ this.generators.addAll(generators);
+ this.conn = new ZooKeeperInstance(inst, zooKeepers).getConnector(user, password);
+ this.bw = conn.createMultiTableBatchWriter(maxMemory, maxLatency, maxWriteThreads);
+ }
+
+ @Override
+ public void store(Map<String, Object> record) throws IOException {
+
+ for(MutationGenerator generator : generators)
+ {
+ Map<String, Collection<Mutation>> muts = generator.generate(record);
+ for(Entry<String, Collection<Mutation>> e : muts.entrySet())
+ {
+ try {
+ BatchWriter writer = bw.getBatchWriter(e.getKey());
+ for(Mutation mut : e.getValue())
+ {
+ writer.addMutation(mut);
+ }
+ } catch (AccumuloException e1) {
+ throw new IOException(e1);
+ } catch (AccumuloSecurityException e1) {
+ throw new IOException(e1);
+ } catch (TableNotFoundException e1) {
+ throw new IOException(e1);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ bw.close();
+ } catch (MutationsRejectedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ bw.flush();
+ } catch (MutationsRejectedException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
85 src/main/java/io/covert/dns/storage/accumulo/AccumuloStorageModuleFactory.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */