Skip to content
This repository has been archived by the owner on Apr 30, 2019. It is now read-only.

Commit

Permalink
Pig udf and supporting python,pig scripts for simplequeries [BIV-114]
Browse files Browse the repository at this point in the history
  • Loading branch information
Harsha committed Feb 14, 2013
1 parent 93924a3 commit dfda903
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 0 deletions.
@@ -0,0 +1,135 @@
package com.mozilla.telemetry.pig.eval.json;

import java.io.IOException;
import java.util.Map;
import java.math.BigInteger;
import java.security.*;

import org.apache.commons.lang.StringUtils;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.log4j.Logger;

import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

public class GenerateStringHash extends EvalFunc<String> {

static final Logger LOG = Logger.getLogger(GenerateStringHash.class);
private ObjectMapper jsonMapper;

public GenerateStringHash() {
jsonMapper = new ObjectMapper();
}

@Override
public String exec(Tuple input) throws IOException {
try {
String json = (String) input.get(0);
String info = fetchInfo(json);
byte[] bytesOfJson = info.getBytes("UTF-8");
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(bytesOfJson);
String hash = new BigInteger(1,digest).toString(16);
return hash;
} catch(Exception e) {
LOG.error(e);
}
return null;
}

protected String fetchInfo(String json) {
try {
Map<String, Object> jsonMap = jsonMapper.readValue(json, new TypeReference<Map<String, Object>>() {});
Map<String, Object> info = (Map<String,Object>) jsonMap.get("info");
StringBuffer sb = new StringBuffer();
if (info.containsKey("reason"))
sb.append(info.get("reason"));
if (info.containsKey("OS"))
sb.append(info.get("OS"));
if (info.containsKey("appID"))
sb.append(info.get("appID"));
if (info.containsKey("appBuildID"))
sb.append(info.get("appBuildID"));
if (info.containsKey("appName"))
sb.append(info.get("appName"));
if (info.containsKey("platformBuildID"))
sb.append(info.get("platformBuildID"));
if (info.containsKey("appUpdateChannel"))
sb.append(info.get("appUpdateChannel"));
/*if (info.containsKey("platformBuildID"))
sb.append(info.get("platformBuildID"));*/
if (info.containsKey("locale"))
sb.append(info.get("locale"));
if (info.containsKey("cpucount"))
sb.append(info.get("cpucount"));
/*if (info.containsKey("memsize"))
sb.append(info.get("memsize"));*/
if (info.containsKey("arch"))
sb.append(info.get("arch"));
if (info.containsKey("version"))
sb.append(info.get("version"));
if (info.containsKey("device"))
sb.append(info.get("device"));
if (info.containsKey("hardware"))
sb.append(info.get("hardware"));
if (info.containsKey(""))
sb.append(info.get("hardware"));
if (info.containsKey("hasMMX"))
sb.append(info.get("hasMMX"));
if (info.containsKey("hasSSE"))
sb.append(info.get("hasSSE"));
if (info.containsKey("hasSSE2"))
sb.append(info.get("hasSSE2"));
if (info.containsKey("hasSSE3"))
sb.append(info.get("hasSSE3"));
if (info.containsKey("hasSSE4A"))
sb.append(info.get("hasSSE4A"));
if (info.containsKey("hasSSSE3"))
sb.append(info.get("hasSSSE3"));
if (info.containsKey("hasSSE4_1"))
sb.append(info.get("hasSSE4_1"));
if (info.containsKey("hasSSE4_2"))
sb.append(info.get("hasSSE4_2"));
if (info.containsKey("hasEDSP"))
sb.append(info.get("hasEDSP"));
if (info.containsKey("hasARMv6"))
sb.append(info.get("hasARMv6"));
if (info.containsKey("hasARMv7"))
sb.append(info.get("hasARMv7"));
if (info.containsKey("adapterDescription"))
sb.append(info.get("adapterDescription"));
if (info.containsKey("adapterVendorID"))
sb.append(info.get("adapterVendorID"));
if (info.containsKey("apapterDeviceID"))
sb.append(info.get("adapterDeviceID"));
if (info.containsKey("adapterRAM"))
sb.append(info.get("adapterRAM"));
if (info.containsKey("adapterDriver"))
sb.append(info.get("adapterDriver"));
if (info.containsKey("adapterDriverVersion"))
sb.append(info.get("adapterDriverVersion"));
if (info.containsKey("adapterDriverDate"))
sb.append(info.get("adapterDriverDate"));
if (info.containsKey("DWriteEnabled"))
sb.append(info.get("DWriteEnabled"));
if (info.containsKey("DWriteVersion"))
sb.append(info.get("DWriteVersion"));
if (info.containsKey("DWriteEnabled"))
sb.append(info.get("DWriteEnabled"));
/*if (info.containsKey("addons"))
sb.append(info.get("addons"));*/
if (info.containsKey("flashVersion"))
sb.append(info.get("flashVersion"));

return sb.toString();
} catch(Exception e) {
LOG.error(e);
}
return null;
}
}
@@ -0,0 +1,105 @@
package com.mozilla.telemetry.pig.eval.json;

import java.io.IOException;
import java.util.Map;
import java.math.BigInteger;
import java.security.*;

import org.apache.commons.lang.StringUtils;
import org.apache.pig.FilterFunc;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.log4j.Logger;

import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

public class TelemetryValueCompare extends FilterFunc {
static final Logger LOG = Logger.getLogger(TelemetryValueCompare.class);
private ObjectMapper jsonMapper;
private String jsonKey;
private String subJsonKey;
private String comparator;
private Integer compareValue;

public TelemetryValueCompare(String jsonKey,String comparator,String compareValue) {
jsonMapper = new ObjectMapper();
this.jsonKey = jsonKey;
this.comparator = comparator;
this.compareValue = Integer.parseInt(compareValue);
}

public TelemetryValueCompare(String jsonKey,String subJsonKey,String comparator,String compareValue) {
jsonMapper = new ObjectMapper();
this.jsonKey = jsonKey;
this.subJsonKey = subJsonKey;
this.comparator = comparator;
this.compareValue = Integer.parseInt(compareValue);
}

@Override
public Boolean exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
String json = (String) input.get(0);
return getValueFromJson(json);
}


@SuppressWarnings("unchecked")
protected Boolean getValueFromJson(String json) {
try {
Map<String, Object> jsonMap = jsonMapper.readValue(json, new TypeReference<Map<String, Object>>() {});
Map<String, Object> histograms = (Map<String,Object>) jsonMap.get("histograms");
Map<String, Object> simpleMeasurements = (Map<String,Object>) jsonMap.get("simpleMeasurements");

if (histograms.containsKey(jsonKey)) {
Map<String,Object> histogram = (Map<String,Object>) histograms.get(jsonKey);
if (subJsonKey == null || subJsonKey.equals("values")) {
Map<String,Object> histValues = (Map<String,Object>)histogram.get("values");
return compareJsonMap(histValues);
} else {
Integer jsonValue = (Integer) histogram.get(subJsonKey);
return compareJsonValue(jsonValue);
}
} else if (simpleMeasurements.containsKey(jsonKey)) {
Integer jsonValue = (Integer) simpleMeasurements.get(jsonKey);
return compareJsonValue(jsonValue);
}
return false;
} catch(Exception e) {
LOG.error(e);
}
return false;
}

protected Boolean compareJsonMap(Map<String,Object>values) {
Boolean cmpFlag = false;
Integer histValue = null;
for (String key: values.keySet()) {
histValue = Integer.parseInt(key);
cmpFlag = compareJsonValue(histValue);
if (cmpFlag)
return cmpFlag;
}
return cmpFlag;
}

protected Boolean compareJsonValue(Integer jsonValue) {
if (comparator.equals(">"))
return jsonValue > compareValue;
else if (comparator.equals("<"))
return jsonValue < compareValue;
else if (comparator.equals("="))
return jsonValue == compareValue;
else if (comparator.equals("<="))
return jsonValue <= compareValue;
else if (comparator.equals(">="))
return jsonValue >= compareValue;
return false;
}

}
16 changes: 16 additions & 0 deletions src/main/pig/simple_query.pig
@@ -0,0 +1,16 @@
register 'akela-0.4-SNAPSHOT.jar'
register 'telemetry-toolbox-0.2-SNAPSHOT.jar'

SET pig.logfile simple_query.log;
SET pig.tmpfilecompression true;
SET pig.tmpfilecompression.codec lzo;
SET mapred.compress.map.output true;
SET mapred.map.output.compression.codec org.apache.hadoop.io.compress.SnappyCodec;

define CompareJsonValue com.mozilla.telemetry.pig.eval.json.TelemetryValueCompare('$json_key','$sub_json_key',
'$comparator','$value');

raw = LOAD 'hbase://telemetry' USING com.mozilla.pig.load.HBaseMultiScanLoader('$start_date', '$end_date', 'yyyyMMdd', 'data:json') AS (k:chararray, json:chararray);

cycle_collector = FILTER raw by CompareJsonValue(json);
dump cycle_collector;
49 changes: 49 additions & 0 deletions src/main/python/simple_query.py
@@ -0,0 +1,49 @@
#!/usr/bin/env python

from subprocess import call
from optparse import OptionParser


def main():
parser = OptionParser(usage="usage: %prog [options] query",
version="%prog 0.1")
parser.add_option("-s", "--startdate",
help="start date to scan ")
parser.add_option("-e", "--enddate",
help="end date to scan ")

(options, args) = parser.parse_args()
print options
if not options.startdate:
parser.error("startdate is not given")
if not options.enddate:
parser.error("enddate is not given")
if args != None and len(args[0].split()) == 2:
parser.error("please provide query as CYCLE_COLLECTOR > 3000")
(json_key, sub_json_key,comparator, value) = parse_query(args[0])
if sub_json_key == "range" :
parser.error("Allowed sub keys are bucket_count,values,sum")


cmd = "pig -f simple_query.pig -p start_date=%s -p end_date=%s -p json_key=%s -p sub_json_key=%s -p comparator='%s' -p value=%s" % (options.startdate ,options.enddate,json_key,sub_json_key,comparator,value)
print cmd
call(cmd,shell=True)


def parse_query(query) :
query_split = query.split()
json_keys = query_split[0].split('.')

if (len(json_keys)) == 2:
json_key = json_keys[0]
sub_json_key = json_keys[1]
else:
json_key = json_keys[0]
sub_json_key = "values"
comparator = query_split[1]
value = query_split[2]
return (json_key,sub_json_key,comparator,value)


if __name__ == '__main__':
main()

0 comments on commit dfda903

Please sign in to comment.