Permalink
Browse files

Sync'ing latest fhr_uniques

  • Loading branch information...
xstevens committed Feb 19, 2013
1 parent 417d294 commit f40c8fe4ac31b8f2493b27647ae32a413d72387c
Showing with 15 additions and 5 deletions.
  1. +15 −5 src/main/pig/fhr_uniques.pig
@@ -62,9 +62,19 @@ adjusted_data = FOREACH converted_ping_time GENERATE ping_time,
(latest_time == first_time ? 1 : 0) AS new_ping:int,
profile_age,
BucketProfileAge(profile_age) AS bucketed_profile_age:int;
+/* clean data for DB insertion */
+cleansed_data = FILTER adjusted_data BY SIZE(product) <= 32 AND
+ SIZE(product_version) <= 8 AND
+ SIZE(product_channel) <= 64 AND
+ SIZE(os) <= 16 AND
+ SIZE(os_version) <= 32 AND
+ SIZE(geo_country_code) <= 4 AND
+ new_ping IS NOT NULL AND
+ profile_age IS NOT NULL AND
+ bucketed_profile_age IS NOT NULL;
/* Daily */
-daily_data = FILTER adjusted_data BY days_ago == 0;
+daily_data = FILTER cleansed_data BY days_ago == 0;
grouped_daily = GROUP daily_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
daily_counts = FOREACH grouped_daily GENERATE '$date' AS perspective_date:chararray,
FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
@@ -77,7 +87,7 @@ STORE daily_counts INTO '{fhr_daily_counts(perspective_date date, product varcha
/* Week in Year */
-weekly_data = FILTER adjusted_data BY week_in_year == $week AND year == $year;
+weekly_data = FILTER cleansed_data BY week_in_year == $week AND year == $year;
grouped_wiy = GROUP weekly_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
wiy_counts = FOREACH grouped_wiy GENERATE '$date' AS perspective_date:chararray,
FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
@@ -88,7 +98,7 @@ wiy_counts = FOREACH grouped_wiy GENERATE '$date' AS perspective_date:chararray,
STORE wiy_counts INTO '{fhr_wiy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* Month in Year */
-monthly_data = FILTER adjusted_data BY month_in_year == $month AND year == $year;
+monthly_data = FILTER cleansed_data BY month_in_year == $month AND year == $year;
grouped_miy = GROUP monthly_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
miy_counts = FOREACH grouped_miy GENERATE '$date' AS perspective_date:chararray,
FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
@@ -99,7 +109,7 @@ miy_counts = FOREACH grouped_miy GENERATE '$date' AS perspective_date:chararray,
STORE miy_counts INTO '{fhr_miy_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* d-7 to d */
-d7_data = FILTER adjusted_data BY days_ago >= 0 AND days_ago < 7;
+d7_data = FILTER cleansed_data BY days_ago >= 0 AND days_ago < 7;
grouped_d7 = GROUP d7_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
d7_counts = FOREACH grouped_d7 GENERATE '$date' AS perspective_date:chararray,
FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),
@@ -110,7 +120,7 @@ d7_counts = FOREACH grouped_d7 GENERATE '$date' AS perspective_date:chararray,
STORE d7_counts INTO '{fhr_d7_counts(perspective_date date, product varchar(32), product_version varchar(8), product_channel varchar(64), os varchar(16), os_version varchar(32), geo_country_code varchar(4), new_ping int, median_profile_age float, count int)}' USING com.vertica.pig.VerticaStorer('$dblist', '$dbname', '$dbport', '$dbuser', '$dbpass');
/* d-30 to d */
-d30_data = FILTER adjusted_data BY days_ago >= 0 AND days_ago < 30;
+d30_data = FILTER cleansed_data BY days_ago >= 0 AND days_ago < 30;
grouped_d30 = GROUP d30_data BY (product,product_version,product_channel,os,os_version,geo_country_code,new_ping);
d30_counts = FOREACH grouped_d30 GENERATE '$date' AS perspective_date:chararray,
FLATTEN(group) AS (product,product_version,product_channel,os,os_version,geo_country_code,new_ping),

0 comments on commit f40c8fe

Please sign in to comment.