Skip to content

Commit

Permalink
Chapter title for analytic patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Feb 7, 2014
1 parent 976cbb1 commit 0ea162a
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions 05-analytic_patterns.asciidoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
Now that you've met the fundamental analytic operations -- in both their map/reduce and table-operation form -- it's time to put them to work in an actual data exploration.
[[analytic_patterns]]
== Core Analytic Patterns

Now that you've met the fundamental analytic operations -- in both their map/reduce and table-operation form -- it's time to put them to work in an actual data exploration.

This chapter will equip you to think tactically, to think in terms of the changes you would like to make to the data. Each section introduces a repeatedly-useful data transformation pattern, demonstrated in Pig (and, where we'd like to reinforce the record-by-record action, in Wukong as well).

Expand Down Expand Up @@ -88,8 +91,8 @@ And the output after the flatten:
Lexington,_Texas 023130130 tot_usages num_terms 4 "texas"
Lexington,_Texas 023130130 tot_usages num_terms 2 "lexington"
Lexington,_Texas 023130130 tot_usages num_terms 2 "best"
Lexington,_Texas 023130130 tot_usages num_terms 2 "bbq"
Lexington,_Texas 023130130 tot_usages num_terms 1 "barbecue"
Lexington,_Texas 023130130 tot_usages num_terms 2 "bbq"
Lexington,_Texas 023130130 tot_usages num_terms 1 "barbecue"
------

=== Pattern: Atom-only Records
Expand All @@ -102,9 +105,9 @@ Think of this atom-only form as the neutral fighting stance for your tables. Fro

----
taf_g = GROUP term_article_freqs BY quadcell, term;
cell_freqs = FOREACH taf_g GENERATE
group.quadcell AS quadcell,
group.term AS term,
cell_freqs = FOREACH taf_g GENERATE
group.quadcell AS quadcell,
group.term AS term,
SUM(term_article_freqs.article_term_usages) AS cell_term_usages;
cf_g = GROUP cell_freqs BY quadcell;
term_cell_freqs = FOREACH cf_g GENERATE
Expand All @@ -116,7 +119,7 @@ term_cell_freqs = FOREACH cf_g GENERATE
._"cell_freqs" result_
----
023130130 7 "bbq"
023130130 20 "texas"
023130130 20 "texas"
----

._"cf_g" result_
Expand All @@ -138,7 +141,7 @@ Let's now prepare those global statistics.

----
all_terms = GROUP term_article_freqs BY term;
term_info_1 = FOREACH all_terms GENERATE
term_info_1 = FOREACH all_terms GENERATE
group AS term,
COUNT_STAR(term_article_freqs) AS num_articles,
SUM(article_term_usages) AS term_usages;
Expand All @@ -155,31 +158,31 @@ STORE global_term_info INTO '/data/work/geo_flavor/global_term_info';

=== GROUP/COGROUP To Restructure Tables

This next pattern is one of the more difficult to picture but also one of the most important to master. Once you can confidently recognize and apply this pattern, you can consider yourself a black belt in the martial art of Map/Reduce.
This next pattern is one of the more difficult to picture but also one of the most important to master. Once you can confidently recognize and apply this pattern, you can consider yourself a black belt in the martial art of Map/Reduce.

(TODO: describe this pattern)

=== Pattern: Extend Records with Uniquely Matching Records from Another Table

Using a join as we just did -- to extend the records in one table with the fields from one matching record in another -- is a very common pattern. Datasets are commonly stored as tables in 'normalized' form -- that is, having tables structured to minimize redundancy and dependency. The global hourly weather dataset has one table giving the metadata for every weather station: identifiers, geocoordinates, elevation, country and so on. The giant tables listing the hourly observations from each weather station are normalized to not repeat the station metadata on each line, only the weather station id. However, later in the book (REF) we'll do geographic analysis of the weather data -- and one of the first tasks will be to denormalize the geocoordinates of each weather station with its observations, letting us group nearby observations.

Another reason to split data across tables is 'vertical partitioning': storing fields that are very large or seldom used in context within different tables. That's the case with the Wikipedia article tables -- the geolocation information is only relevant for geodata analysis; the article text is both large and not always relevant.
Another reason to split data across tables is 'vertical partitioning': storing fields that are very large or seldom used in context within different tables. That's the case with the Wikipedia article tables -- the geolocation information is only relevant for geodata analysis; the article text is both large and not always relevant.

=== Pattern: Summarizing Groups

Pretty much every data exploration you perform will involve summarizing datasets using statistical aggregations -- counts, averages and so forth. You have already seen an example of this when we helped the reindeer count UFO visit frequency by month and later in the book, we will devote a whole chapter to statistical summaries and aggregation.
Pretty much every data exploration you perform will involve summarizing datasets using statistical aggregations -- counts, averages and so forth. You have already seen an example of this when we helped the reindeer count UFO visit frequency by month and later in the book, we will devote a whole chapter to statistical summaries and aggregation.

=== Pattern: Re-injecting global totals
=== Pattern: Re-injecting global totals

We also extract two global statistics: the number of distinct terms, and the number of distinct usages. This brings up one of the more annoying things about Hadoop programming. The global_term_info result is two lousy values, needed to turn the global _counts_ for each term into the global _frequency_ for each term. But a pig script just orchestrates the top-level motion of data: there's no intrinsic way to bring the result of a step into the declaration of following steps. The proper recourse is to split the script into two parts, and run it within a workflow tool like Rake, Drake or Oozie. The workflow layer can fish those values out of the HDFS and inject them as runtime parameters into the next stage of the script.
We also extract two global statistics: the number of distinct terms, and the number of distinct usages. This brings up one of the more annoying things about Hadoop programming. The global_term_info result is two lousy values, needed to turn the global _counts_ for each term into the global _frequency_ for each term. But a pig script just orchestrates the top-level motion of data: there's no intrinsic way to bring the result of a step into the declaration of following steps. The proper recourse is to split the script into two parts, and run it within a workflow tool like Rake, Drake or Oozie. The workflow layer can fish those values out of the HDFS and inject them as runtime parameters into the next stage of the script.

We prefer to cheat. We instead ran a version of the script that found the global count of terms and usages, then copy/pasted their values as static parameters at the top of the script. This also lets us calculate the ppm frequency of each term and the other term statistics in a single pass. To ensure our time-traveling shenanigans remain valid, we add an `ASSERT` statement which compares the memoized values to the actual totals.

----
DEFINE memoized_num_terms XXX;
DEFINE memoized_global_usages XXX;
all_terms = GROUP term_cell_freqs BY term;
term_info_1 = FOREACH all_terms GENERATE
term_info_1 = FOREACH all_terms GENERATE
group AS term,
COUNT_STAR(term_cell_freqs) AS num_articles,
SUM(article_term_usages) AS term_usages,
Expand Down Expand Up @@ -207,29 +210,29 @@ The Pig LIMIT operation arbitrarily selects, at most, the specified number of re

(TODO: Is there a non-Reduce way to do this?)

In the simplest Map/Reduce equivalent, Mappers emit each record unchanged until they hit the specified limit (or reach the end of their input). Those output records are sent to a single Reducer, which itself emits each record unchanged until it has hit the specified limit and does nothing on all subsequent records.
In the simplest Map/Reduce equivalent, Mappers emit each record unchanged until they hit the specified limit (or reach the end of their input). Those output records are sent to a single Reducer, which itself emits each record unchanged until it has hit the specified limit and does nothing on all subsequent records.

(TODO: Do we want to talk about a non-single Reducer approach?)

A Combiner is helpful here in the predominant case where the specified limit is small, as it will eliminate excess records before they are sent to the Reducer and at each merge/sort pass.
A Combiner is helpful here in the predominant case where the specified limit is small, as it will eliminate excess records before they are sent to the Reducer and at each merge/sort pass.

==== Top K Records (ORDER..LIMIT)

The naive way to extract the top K elements from a table is simply to do an ORDER and then a LIMIT. For example, the following script will identify the top 100 URLs from the waxy.org weblog dataset.
The naive way to extract the top K elements from a table is simply to do an ORDER and then a LIMIT. For example, the following script will identify the top 100 URLs from the waxy.org weblog dataset.

----
logs=LOAD '/data/gold/waxy/whatever.log' AS (...) USING APACHE LOG READER;
logs=FOREACH logs GENERATE url;
url_logs = GROUP logs BY url;
URL_COUNTS=FOREACH url_logs GENERATE
URL_COUNTS=FOREACH url_logs GENERATE
COUNT_STAR(url_logs) AS views,
group AS url;
url_counts_o = ORDER url_counts BY views PARALLEL 1;
top_url_counts = LIMIT url_counts_o 100;
STORE top_url_counts INTO '/data/out/weblogs/top_url_counts';
----

There are two useful optimizations to make when K (the number of records you will keep) is much less than N (the number of records in the table). The first one, which Pig does for you, is to only retain the top K records at each Mapper; this is a great demonstration of where a Combiner is useful: After each intermediate merge/sort on the Map side and the Reduce side, the Combiner discards all but the top K records.
There are two useful optimizations to make when K (the number of records you will keep) is much less than N (the number of records in the table). The first one, which Pig does for you, is to only retain the top K records at each Mapper; this is a great demonstration of where a Combiner is useful: After each intermediate merge/sort on the Map side and the Reduce side, the Combiner discards all but the top K records.

==== A Foolish Optimization

Expand All @@ -242,11 +245,10 @@ The 'O(log N)' portion of Hadoop's log sort shows up in two ways: The N memory
There is a situation where the heap-based top K algorithm is appropriate: finding the top K elements for a group. Pig's 'top' function accepts a bag and returns a bag with its top K elements. Here is an example that uses the World Cup dataset to find the top 10 URLs for each day of the tournament:

----
visits = load ('worldcup');
visits = load ('worldcup');
visits = FOREACH visits generate day, url;
visits by day = GROUP visits by day;
top visits by day = FOREACH visits url = GROUP visits by url;
generate GROUP as day, top (visits, top visit URLs, COUNT_STAR (visit urls), 100;
store top visits by url into 'top visits by url';
----

0 comments on commit 0ea162a

Please sign in to comment.