Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Checking in latest changes for pig scripts and UDF changes

  • Loading branch information...
commit e2eeafd4818a8a2376c27bd23546164270af1e3f 1 parent a3c9de5
Xavier xstevens authored
9 src/main/java/com/mozilla/fhr/consumer/FHRConsumer.java
View
@@ -74,7 +74,7 @@
private static final Logger LOG = Logger.getLogger(FHRConsumer.class);
private static final String GEO_COUNTRY_FIELD = "geoCountry";
- private static final String UNKNOWN = "Unknown";
+ private static final String UNKNOWN_COUNTRY_CODE = "--";
private ObjectMapper jsonMapper;
private LookupService geoIpLookupService;
@@ -224,15 +224,16 @@ public Void call() throws Exception {
// do a geoip lookup on the IP if we have one
if (bmsg.hasIpAddr()) {
Location location = geoIpLookupService.getLocation(InetAddress.getByAddress(bmsg.getIpAddr().toByteArray()));
- if (location != null && !"--".equals(location.countryCode)) {
+ if (location != null && !UNKNOWN_COUNTRY_CODE.equals(location.countryCode) &&
+ location.countryCode.trim().length() > 0) {
document.put(GEO_COUNTRY_FIELD, location.countryCode);
} else {
unknownGeoIpMeter.mark();
- document.put(GEO_COUNTRY_FIELD, UNKNOWN);
+ document.put(GEO_COUNTRY_FIELD, UNKNOWN_COUNTRY_CODE);
}
} else {
unknownGeoIpMeter.mark();
- document.put(GEO_COUNTRY_FIELD, UNKNOWN);
+ document.put(GEO_COUNTRY_FIELD, UNKNOWN_COUNTRY_CODE);
}
// store the document
34 src/main/java/com/mozilla/fhr/pig/eval/ProfileCreationTime.java
View
@@ -20,42 +20,34 @@
package com.mozilla.fhr.pig.eval;
import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.Calendar;
-import java.util.Date;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
-public class ProfileAgeTime extends EvalFunc<Long> {
+public class ProfileCreationTime extends EvalFunc<Long> {
public static enum ERRORS { DateParseError };
+
+ private Calendar cal;
- private SimpleDateFormat sdf;
-
- public ProfileAgeTime(String pingTimeFormat) {
- sdf = new SimpleDateFormat(pingTimeFormat);
+ public ProfileCreationTime() {
+ cal = Calendar.getInstance();
}
@Override
public Long exec(Tuple input) throws IOException {
- if (input == null || input.size() != 2) {
+ if (input == null || input.size() == 0 ||
+ input.get(0) == null) {
return null;
}
-
- Long t = null;
- try {
- Date d = sdf.parse((String)input.get(0));
- Calendar cal = Calendar.getInstance();
- cal.setTime(d);
- cal.add(Calendar.DATE, -((Number)input.get(1)).intValue());
- t = cal.getTimeInMillis();
- } catch (ParseException e) {
- pigLogger.warn(this, "Date parse error", ERRORS.DateParseError);
- }
+
+ // reset to epoch
+ cal.setTimeInMillis(0);
+ // profileCreation is in days since epoch
+ cal.add(Calendar.DATE, ((Number)input.get(0)).intValue());
- return t;
+ return cal.getTimeInMillis();
}
}
40 src/main/java/com/mozilla/fhr/pig/eval/VersionOnDate.java
View
@@ -24,6 +24,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
+import java.util.Map;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataBag;
@@ -33,6 +34,10 @@
public static enum ERRORS { ParseError };
+ private static final String APPINFO_VERSIONS_FIELD = "org.mozilla.appInfo.versions";
+ private static final String VERSION = "version";
+ private static final String MULTI_VERSION_DELIMITER = "|";
+
private final SimpleDateFormat sdf;
private long perspectiveTime;
@@ -46,6 +51,7 @@ public VersionOnDate(String dateFormat, String perspectiveDate) {
}
}
+ @SuppressWarnings("unchecked")
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
@@ -55,13 +61,33 @@ public String exec(Tuple input) throws IOException {
String latestVersion = null;
long latestTime = 0;
try {
- DataBag versions = (DataBag)input.get(0);
- Iterator<Tuple> iter = versions.iterator();
- while (iter.hasNext()) {
- Tuple t = iter.next();
- Date d = sdf.parse((String)t.get(0));
- if (d.getTime() <= perspectiveTime && d.getTime() > latestTime) {
- latestVersion = (String)t.get(1);
+ Map<String,Object> dataPoints = (Map<String,Object>)input.get(0);
+ for (Map.Entry<String,Object> dayEntry : dataPoints.entrySet()) {
+ Map<String,Object> dayMap = (Map<String,Object>)dayEntry.getValue();
+ if (dayMap.containsKey(APPINFO_VERSIONS_FIELD)) {
+ Date d = sdf.parse(dayEntry.getKey());
+ if (d.getTime() <= perspectiveTime && d.getTime() > latestTime) {
+ Map<String,Object> appInfoVersionMap = (Map<String,Object>)dayMap.get(APPINFO_VERSIONS_FIELD);
+ if (appInfoVersionMap.containsKey(VERSION)) {
+ DataBag versionBag = (DataBag)appInfoVersionMap.get(VERSION);
+ StringBuilder sb = new StringBuilder();
+ Iterator<Tuple> vbIter = versionBag.iterator();
+ for (int i=0; i < versionBag.size() && vbIter.hasNext(); i++) {
+ Tuple versionTuple = vbIter.next();
+ if (versionTuple.size() > 0) {
+ sb.append(versionTuple.get(0));
+ if (vbIter.hasNext()) {
+ sb.append(MULTI_VERSION_DELIMITER);
+ }
+ }
+ }
+
+ if (sb.length() > 0) {
+ latestVersion = sb.toString();
+ latestTime = d.getTime();
+ }
+ }
+ }
}
}
} catch (ParseException e) {
81 src/main/pig/fhr_uniques.pig
View
@@ -14,6 +14,7 @@ SET pig.tmpfilecompression.codec lzo;
/* %declare TIME_FORMAT 'yyyy-MM-dd'; */
define DaysAgo com.mozilla.pig.eval.date.TimeDelta('5', 'yyyy-MM-dd');
+define DaysAgoTimestamps com.mozilla.pig.eval.date.TimeDelta('5');
define BucketProfileAge com.mozilla.pig.eval.Bucket('1','7','30','180','365','366');
define ParseDate com.mozilla.pig.eval.date.ParseDate('yyyy-MM-dd');
define FormatDate com.mozilla.pig.eval.date.FormatDate('yyyy-MM-dd');
@@ -25,22 +26,31 @@ define OsVersionNormalizer com.mozilla.pig.eval.regex.FindOrReturn('^[0-9]+(\\.*
define VersionOnDate com.mozilla.fhr.pig.eval.VersionOnDate('yyyy-MM-dd', '$date');
define FirstPingTime com.mozilla.fhr.pig.eval.FirstPingTime('yyyy-MM-dd');
define LatestPingTime com.mozilla.fhr.pig.eval.LatestPingTime('yyyy-MM-dd', '$date');
+define ProfileCreationTime com.mozilla.fhr.pig.eval.ProfileCreationTime();
define IsMap com.mozilla.pig.filter.map.IsMap();
raw = LOAD 'hbase://metrics' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('data:json','-loadKey=true -caching=100') AS
(k:bytearray,json:chararray);
genmap = FOREACH raw GENERATE com.mozilla.pig.eval.json.JsonMap(json) AS json_map:map[];
-filtered_genmap = FILTER genmap BY IsMap(json_map#'dataPoints');
-data = FOREACH filtered_genmap GENERATE LatestPingTime(json_map#'dataPoints') AS latest_time:long,
- FirstPingTime(json_map#'dataPoints') AS first_time:long,
- json_map#'appName' AS product:chararray,
- VersionOnDate(json_map#'versions') AS product_version:chararray,
- json_map#'appUpdateChannel' AS product_channel:chararray,
- json_map#'OSName' AS os:chararray,
- OsVersionNormalizer((chararray)json_map#'OSVersion') AS os_version:chararray,
- json_map#'locale' AS locale:chararray,
- ((int)json_map#'appProfileAge' - (int)DaysAgo(json_map#'thisPingTime', '$date')) AS profile_age:int;
-filtered_data = FILTER data BY latest_time IS NOT NULL AND profile_age IS NOT NULL AND profile_age >= 0;
+filtered_genmap = FILTER genmap BY IsMap(json_map#'data'#'days');
+data = FOREACH filtered_genmap GENERATE LatestPingTime(json_map#'data'#'days') AS latest_time:long,
+ FirstPingTime(json_map#'data'#'days') AS first_time:long,
+ json_map#'data'#'last'#'org.mozilla.appInfo.appinfo'#'name' AS product:chararray,
+ VersionOnDate(json_map#'data'#'days') AS product_version:chararray,
+ json_map#'data'#'last'#'org.mozilla.appInfo.appinfo'#'updateChannel' AS product_channel:chararray,
+ json_map#'data'#'last'#'org.mozilla.appInfo.appinfo'#'os' AS os:chararray,
+ OsVersionNormalizer((chararray)json_map#'data'#'last'#'org.mozilla.sysinfo.sysinfo'#'version') AS os_version:chararray,
+ json_map#'geoCountry' AS geo_country_code:chararray,
+ (int)DaysAgoTimestamps(ProfileCreationTime((int)json_map#'data'#'last'#'org.mozilla.profile.age'#'profileCreation'), ParseDate('$date')) AS profile_age:int;
+filtered_data = FILTER data BY latest_time IS NOT NULL AND
+ profile_age IS NOT NULL AND profile_age >= 0 AND
+ product IS NOT NULL AND
+ (product == 'Firefox' OR product == 'MetroFirefox' OR
+ product == 'Fennec' OR product == 'Thunderbird') AND
+ product_version IS NOT NULL AND
+ product_channel IS NOT NULL AND
+ os IS NOT NULL AND
+ os_version IS NOT NULL;
converted_ping_time = FOREACH filtered_data GENERATE *, FormatDate(latest_time) AS ping_time:chararray;
adjusted_data = FOREACH converted_ping_time GENERATE ping_time,
DaysAgo(ping_time, '$date') AS days_ago:long, /* calc days ago from specified day */
@@ -48,67 +58,64 @@ adjusted_data = FOREACH converted_ping_time GENERATE ping_time,
(long)MonthInYear(ping_time) AS month_in_year:long,
(long)Year(ping_time) AS year:long,
product, product_version, product_channel,
- os, os_version, locale,
+ os, os_version, geo_country_code,
(latest_time == first_time ? 1 : 0) AS new_ping:int,
profile_age,
BucketProfileAge(profile_age) AS bucketed_profile_age:int;
/* Daily */
daily_data = FILTER adjusted_data BY days_ago == 0;
-grouped_daily = GROUP daily_data BY (product,product_version,product_channel,os,os_version,locale,new_ping);
+grouped_daily = GROUP daily_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
daily_counts = FOREACH grouped_daily GENERATE '$date' AS perspective_date:chararray,
- FLATTEN(group) AS (product,product_version,product_channel,os,os_version,locale,new_ping),
- FLATTEN(Median(daily_data.profile_age)) AS median_profile_age:float,
- /*FLATTEN(Median(daily_data.bucketed_profile_age)) AS median_bucketed_profile_age:float,*/
+ FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
+ FLATTEN(Median(daily_data.bucketed_profile_age)) AS median_profile_age:float,
COUNT(daily_data) AS count:long;
STORE daily_counts INTO 'fhr-daily-counts-$date';
/* Store into Vertica (only will work on Vertica 5+ and the vertica connector jar needs to be on every machine)*/
-STORE daily_counts INTO '{fhr_daily_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(16), os varchar(16), os_version varchar(32), locale varchar(8), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+STORE daily_counts INTO '{fhr_daily_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+
+
/* Week in Year */
weekly_data = FILTER adjusted_data BY week_in_year == $week AND year == $year;
-grouped_wiy = GROUP weekly_data BY (product,product_version,product_channel,os,os_version,locale,new_ping);
+grouped_wiy = GROUP weekly_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
wiy_counts = FOREACH grouped_wiy GENERATE '$date' AS perspective_date:chararray,
- FLATTEN(group) AS (product,product_version,product_channel,os,os_version,locale,new_ping),
- FLATTEN(Median(weekly_data.profile_age)) AS median_profile_age,
- /*FLATTEN(Median(weekly_data.bucketed_profile_age)) AS median_bucketed_profile_age,*/
+ FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
+ FLATTEN(Median(weekly_data.bucketed_profile_age)) AS median_profile_age,
COUNT(weekly_data) AS count:long;
STORE wiy_counts INTO 'fhr-wiy-counts-$date';
/* Store into Vertica (only will work on Vertica 5+ and the vertica connector jar needs to be on every machine)*/
-STORE wiy_counts INTO '{fhr_wiy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(16), os varchar(16), os_version varchar(32), locale varchar(8), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+STORE wiy_counts INTO '{fhr_wiy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* Month in Year */
monthly_data = FILTER adjusted_data BY month_in_year == $month AND year == $year;
-grouped_miy = GROUP monthly_data BY (product,product_version,product_channel,os,os_version,locale,new_ping);
+grouped_miy = GROUP monthly_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
miy_counts = FOREACH grouped_miy GENERATE '$date' AS perspective_date:chararray,
- FLATTEN(group) AS (product,product_version,product_channel,os,os_version,locale,new_ping),
- FLATTEN(Median(monthly_data.profile_age)) AS median_profile_age,
- /*FLATTEN(Median(monthly_data.bucketed_profile_age)) AS median_bucketed_profile_age, */
+ FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
+ FLATTEN(Median(monthly_data.bucketed_profile_age)) AS median_profile_age,
COUNT(monthly_data) AS count:long;
STORE miy_counts INTO 'fhr-miy-counts-$date';
/* Store into Vertica (only will work on Vertica 5+ and the vertica connector jar needs to be on every machine)*/
-STORE miy_counts INTO '{fhr_miy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(16), os varchar(16), os_version varchar(32), locale varchar(8), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+STORE miy_counts INTO '{fhr_miy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* d-7 to d */
d7_data = FILTER adjusted_data BY days_ago >= 0 AND days_ago < 7;
-grouped_d7 = GROUP d7_data BY (product,product_version,product_channel,os,os_version,locale,new_ping);
+grouped_d7 = GROUP d7_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
d7_counts = FOREACH grouped_d7 GENERATE '$date' AS perspective_date:chararray,
- FLATTEN(group) AS (product,product_version,product_channel,os,os_version,locale,new_ping),
- FLATTEN(Median(d7_data.profile_age)) AS median_profile_age,
- /*FLATTEN(Median(d7_data.bucketed_profile_age)) AS median_bucketed_profile_age, */
+ FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
+ FLATTEN(Median(d7_data.bucketed_profile_age)) AS median_profile_age,
COUNT(d7_data) AS count:long;
STORE d7_counts INTO 'fhr-d7-counts-$date';
/* Store into Vertica (only will work on Vertica 5+ and the vertica connector jar needs to be on every machine)*/
-STORE d7_counts INTO '{fhr_d7_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(16), os varchar(16), os_version varchar(32), locale varchar(8), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+STORE d7_counts INTO '{fhr_d7_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* d-30 to d */
d30_data = FILTER adjusted_data BY days_ago >= 0 AND days_ago < 30;
-grouped_d30 = GROUP d30_data BY (product,product_version,product_channel,os,os_version,locale,new_ping);
+grouped_d30 = GROUP d30_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
d30_counts = FOREACH grouped_d30 GENERATE '$date' AS perspective_date:chararray,
- FLATTEN(group) AS (product,product_version,product_channel,os,os_version,locale,new_ping),
- FLATTEN(Median(d30_data.profile_age)) AS median_profile_age,
- /*FLATTEN(Median(d30_data.bucketed_profile_age)) AS median_bucketed_profile_age, */
+ FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
+ FLATTEN(Median(d30_data.bucketed_profile_age)) AS median_profile_age,
COUNT(d30_data) AS count:long;
STORE d30_counts INTO 'fhr-d30-counts-$date';
/* Store into Vertica (only will work on Vertica 5+ and the vertica connector jar needs to be on every machine)*/
-STORE d30_counts INTO '{fhr_d30_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(16), os varchar(16), os_version varchar(32), locale varchar(8), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
+STORE d30_counts INTO '{fhr_d30_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
52 src/test/java/com/mozilla/fhr/pig/eval/VersionOnDateTest.java
View
@@ -22,6 +22,8 @@
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -37,20 +39,50 @@
@Test
public void testExec1() throws IOException {
Tuple input = tupleFactory.newTuple();
- DataBag versions = bagFactory.newDefaultBag();
- Tuple v1 = tupleFactory.newTuple(2);
- v1.set(0,"2012-08-01");
- v1.set(1,"14.0");
- versions.add(v1);
- Tuple v2 = tupleFactory.newTuple(2);
- v2.set(0,"2012-09-15");
- v2.set(1,"15.0");
- versions.add(v2);
- input.append(versions);
+
+ Map<String,Object> daysMap = new HashMap<String,Object>();
+ Map<String,Object> versionInfoMap1 = new HashMap<String,Object>();
+ Map<String,Object> versionMap1 = new HashMap<String,Object>();
+ DataBag versionsBag1 = bagFactory.newDefaultBag();
+ versionsBag1.add(tupleFactory.newTuple("14.0"));
+ versionMap1.put("version", versionsBag1);
+ versionInfoMap1.put("org.mozilla.appInfo.versions", versionMap1);
+ daysMap.put("2012-08-01", versionInfoMap1);
+
+ Map<String,Object> versionInfoMap2 = new HashMap<String,Object>();
+ Map<String,Object> versionMap2 = new HashMap<String,Object>();
+ DataBag versionsBag2 = bagFactory.newDefaultBag();
+ versionsBag2.add(tupleFactory.newTuple("15.0"));
+ versionMap2.put("version", versionsBag2);
+ versionInfoMap2.put("org.mozilla.appInfo.versions", versionMap2);
+ daysMap.put("2012-09-15", versionInfoMap2);
+
+ input.append(daysMap);
VersionOnDate vod = new VersionOnDate("yyyy-MM-dd", "2012-10-22");
String output = vod.exec(input);
assertTrue("15.0".equals(output));
}
+ @Test
+ public void testExec2() throws IOException {
+ Tuple input = tupleFactory.newTuple();
+
+ Map<String,Object> daysMap = new HashMap<String,Object>();
+ Map<String,Object> versionInfoMap1 = new HashMap<String,Object>();
+ Map<String,Object> versionMap1 = new HashMap<String,Object>();
+ DataBag versionsBag1 = bagFactory.newDefaultBag();
+ versionsBag1.add(tupleFactory.newTuple("14.0"));
+ versionsBag1.add(tupleFactory.newTuple("15.0a1"));
+ versionMap1.put("version", versionsBag1);
+ versionInfoMap1.put("org.mozilla.appInfo.versions", versionMap1);
+ daysMap.put("2012-08-01", versionInfoMap1);
+
+ input.append(daysMap);
+
+ VersionOnDate vod = new VersionOnDate("yyyy-MM-dd", "2012-10-22");
+ String output = vod.exec(input);
+ assertTrue("14.0|15.0a1".equals(output));
+ }
+
}
Please sign in to comment.
Something went wrong with that request. Please try again.