Skip to content

Design of a data processing path

Paul Houle edited this page · 5 revisions
Clone this wiki locally

Overview

This is starting out as a bitch session and will evolve into a real plan to get rapid turnover in data analysis.

Basic Scenario

A common kind of analysis is to project the triples in some way (say extract the ?s), sort, group and count on the projection, then make a cumulative probability distribution over the projected variable.

If the variable is something numeric, like a year, it may make sense to sort the counts by the projected value. On the other hand, if the variable doesn't have a meaningful order (URI) it makes sense to sort the counts by the counts in descending order so that you can see right off who are the 20% of the beer drinkers that drink 80% of the the beer.

We can do this in a not-so-scalable way in Pig by writing

groupNodes = GROUP nodes BY s;
countedNodes = FOREACH groupNodes GENERATE group AS uri:chararray,COUNT(nodes) AS cnt:long;
sortedNodes = ORDER countedNodes BY cnt DESC PARALLEL 1;
cumNodes = FOREACH sortedNodes GENERATE CumulativeCount() as id:int,uri AS uri:chararray,CumulativeSum(cnt) AS cumCount:long,cnt as cnt:long;

the key to this is the PARALLEL 1 which serializes the last step, and then the use of the CumulativeCount() and CumulativeSum() UDFs I've defined. These UDFs are very bad in the sense that they keep a state variable in the instance, but they seem to do what is wanted when PARALLEL 1.

We're not far away from the cumulative probability distribution (the ability to say that 'a' predicates represent 15% of all facts in Freebase) except for the fact that we need to know the value of the CumulativeCount in the very last row of that table so we can divide that against the rest of the counts.

In a raw Map/Reduce job we could easily pass this out as a side effect but we're working in Pig. Could we sort the file backwards and LIMIT 1? Sure. But practically you might as well do

$ hadoop fs -cat /cumNodes.gz/part-r-00000.gz | zcat - | tail -n 1

and wait about 50 sec to get the last line

40514095        <http://rdf.basekb.com/ns/m.0lkmm4c>    134802470       1

(One reasonable answer might be to ask Pig to do a sum and count grouped over ALL and hope the query optimizer will realize it doesn't need to schedule another reduce step)

Note that the data here is going to exported outside of the parallel world, because at some point I want to send this data out to make plots with 'R'.

We could do the division in the parallel world but I have to admit that I like integers better than I like floating point numbers because floating point numbers are just a touch more sketchy.

It's inevitable at some point that we will want to look at this as a CDF in the parallel world so we still need some way to catch this number and get it into a place where parallel world apps that need it will find it.

Is there a more scalable algorithm?

In a worst case scenario we have to serialize the last step of putting things in serial order. Can we do better?

I'm considering the case where we want to assign unique sequential ids to the concepts because we want to do arithmetic or entropy encoding. (0 or 1 would be the most common concept, 2 the next common and so forth.)

In the real world, however, a large number of items will turn out 1, 2 or 3 times and the serial order of these things is not important. I'd imagine there is a way to turn the algorithm sideways and get a small amount of speed-up (a few times).

Getting Data from Hadoop to R

Suppose we want to plot the CDF, how do we do it with R?

We can get data out of the hadoop cluster by writing something like

hadoop fs -cat /user/paul/cumNodes.gz/part-r-00000.gz > output.gz

but we get a file with 40 million lines that is 330 MB compressed and it doesn't seem practical to load that much data into R. (Note there is a gzfile() function you can use in R to decompress gz on load.) It's not hard to peel off the top 100,000 items, however,

hadoop fs -cat /user/paul/cumNodes.gz/part-r-00000.gz | zcat - | head -n100000 > top100000.dat

at this point the file looks like

99996   <http://rdf.basekb.com/ns/m.06b7bc>     70196243        22
99997   <http://rdf.basekb.com/ns/m.02bg6w>     70196265        22
99998   <http://rdf.basekb.com/ns/m.06kk24>     70196287        22
99999   <http://rdf.basekb.com/ns/m.01q1tw>     70196309        22
100000  <http://rdf.basekb.com/ns/m.03drd_>     70196331        22

and if you try to load it in R like this:

cum = read.table("top100000.dat")

you get the cryptic error message

 in scan(file, what, nmax, sep, dec, quote, skip, nlines, na.strings,  : 
  line 2 did not have 4 elements

so far I can tell this is caused by the 'string' field $2 not being quoted. This problem can be dealt with by reformatting the data with awk

awk '{print $1 " \"" $2 "\" " $3 " " $4}' < top100000.dat >t100000.dat

producing output that looks like

1 "<http://www.w3.org/1999/02/22-rdf-syntax-ns#type>" 7917328 7917328
2 "<http://rdf.basekb.com/ns/type.object.key>" 13389771 5472443
3 "<http://www.w3.org/2000/01/rdf-schema#label>" 16248001 2858230
4 "<http://rdf.basekb.com/ns/type.object.name>" 19104400 2856399
5 "<http://rdf.basekb.com/ns/common.topic.topic_equivalent_webpage>" 21852547 2748147
6 "<http://rdf.basekb.com/ns/common.topic>" 23833411 1980864

you can read this into R with

cum = read.table("t100000.dat",as.is=TRUE,header=FALSE);

and then make the obvious plots such as

plot(log(cum$V1,cum$V4))

the as.is=TRUE prevents R from doing type conversions that will bulk up the data.

Plotting a CDF with more dynamic range

Although looking at individual instances is a good way to look at the most prevalent things (because you want to look at them individually) it scales poorly because we'd need to plot hundreds of millions or billions of points.

I did

$ time hadoop fs -cat /user/paul/cumNodes.gz/part-r-00000.gz | zcat - | awk '{print $4'} | uniq -c > prevalence.curve


real    0m51.228s
user    1m13.652s
sys     0m37.196s

and for the 5% database this has about 2600 points, with a tail that looks like

  67546 10
  87198 9
 116174 8
 164536 7
 250123 6
 428020 5
 875545 4
2284636 3
7344308 2
28527534 1

that states that 67546 nodes appeared 10 times and so forth. (I think the high prevalence of nodes that appear only once is an artifact of the statistical sampling, I can't believe these are this common in Freebase.)

Anyway, this data set is 100,000 or so times smaller than the file it is derived from so it should not be at difficult to run this against the full data.

I think this line of thinking can be carried back into Pig and Hadoop and the good news is that I think this reduces the serialization bottleneck. There's definitely an Amdahl's law component, because the 1 at the bottom is going to be reduced in one huge chunk, but I bet I'll get some speed up by organizing the code better.

Something went wrong with that request. Please try again.