Skip to content
This repository has been archived by the owner on Oct 8, 2019. It is now read-only.

Commit

Permalink
Added rain processing namespace, for work for dan.
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie committed Jul 6, 2011
1 parent 0a59b10 commit 2b2ed38
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 175 deletions.
15 changes: 0 additions & 15 deletions .classpath

This file was deleted.

3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -9,3 +9,6 @@ docs
autodoc
.#*
*#*
.classpath
.settings/
.project
45 changes: 27 additions & 18 deletions .project
@@ -1,21 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>forma</name>
<comment>
<name>forma</name>
<comment>
</comment>
<projects/>
<buildSpec>
<buildCommand>
<name>ccw.builder</name>
<arguments/>
</buildCommand>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments/>
</buildCommand>
</buildSpec>
<natures>
<nature>ccw.nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>ccw.builder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>ccw.nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
10 changes: 5 additions & 5 deletions dev/forma.thrift
Expand Up @@ -62,11 +62,11 @@ struct LocationProperty {

struct DataChunk {
1: string dataset;
3: string temporalRes;
4: string tileString;
5: LocationProperty locationProperty;
6: DataValue chunkValue;
7: optional string date;
2: string temporalRes;
3: string tileString;
4: LocationProperty locationProperty;
5: DataValue chunkValue;
6: optional string date;
}

struct FormaNeighborValue {
Expand Down
4 changes: 3 additions & 1 deletion project.clj
Expand Up @@ -35,4 +35,6 @@
:aot [forma.hadoop.jobs.preprocess
forma.hadoop.jobs.load-tseries
forma.hadoop.jobs.run-forma
forma.source.fire])
forma.hadoop.jobs.process-rain
forma.source.fire
])
10 changes: 5 additions & 5 deletions src/clj/forma/hadoop/cluster.clj
Expand Up @@ -74,10 +74,10 @@
:slaves (slave-group nodecount)
}
:base-machine-spec {
:hardware-id "m1.large"
;; :hardware-id "m2.4xlarge"
;; :hardware-id "m1.large"
:hardware-id "m2.2xlarge"
:image-id "us-east-1/ami-08f40561"
:spot-price (float 1.50)
:spot-price (float 0.80)
}
:base-props {:hadoop-env {:JAVA_LIBRARY_PATH native-path
:LD_LIBRARY_PATH lib-path}
Expand All @@ -91,8 +91,8 @@
:mapred.task.timeout 300000
:mapred.compress.map.output true
:mapred.reduce.tasks (int (* 1.4 6 nodecount))
:mapred.tasktracker.map.tasks.maximum 6
:mapred.tasktracker.reduce.tasks.maximum 6
:mapred.tasktracker.map.tasks.maximum 20
:mapred.tasktracker.reduce.tasks.maximum 20
:mapred.child.java.opts (str "-Djava.library.path=" native-path " -Xms1024m -Xmx1024m")
:mapred.child.env (str "LD_LIBRARY_PATH=" lib-path)}})))

Expand Down
10 changes: 10 additions & 0 deletions src/clj/forma/hadoop/io.clj
Expand Up @@ -124,6 +124,16 @@ tuples into the supplied directory, using the format specified by
:templatefields templatefields
:sink-parts sink-parts))

;; TODO: FIX THIS
(defnk myhfs-textline
"Opens up a Cascading [TemplateTap](http://goo.gl/Vsnm5) that sinks
tuples into the supplied directory, using the format specified by
`pathstr`."
[path :outfields Fields/ALL]
(w/hfs-tap (doto (w/text-line ["line"] outfields)
(.setNumSinkParts 1))
path))

(defnk hfs-wholefile
"Subquery to return distinct files in the supplied directory. Files
will be returned as 2-tuples, formatted as `<filename, file>` The
Expand Down
44 changes: 44 additions & 0 deletions src/clj/forma/hadoop/jobs/process_rain.clj
@@ -0,0 +1,44 @@
(ns forma.hadoop.jobs.process-rain
(:use cascalog.api
[forma.source.tilesets :only (tile-set)]
[forma.utils :only (weighted-mean)])
(:require [cascalog.ops :as c]
[forma.date-time :as date]
[forma.hadoop.io :as io]
[forma.hadoop.predicate :as p]
[forma.source.modis :as m]
[forma.source.static :as static])
(:gen-class))

(def gadm-tap (hfs-seqfile "s3n://redddata/gadm/1000-00/*/*/"))
(def precl-tap (hfs-seqfile "s3n://redddata/precl/1000-32/*/*/"))

(defn rain-tap
"TODO: Very similar to extract-tseries. Consolidate."
[rain-src]
(<- [?mod-h ?mod-v ?sample ?line ?date ?val]
(rain-src _ ?s-res _ ?tilestring ?date ?chunkid ?chunk)
(io/count-vals ?chunk :> ?chunk-size)
(p/struct-index 0 ?chunk :> ?pix-idx ?val)
(m/tilestring->hv ?tilestring :> ?mod-h ?mod-v)
(m/tile-position ?s-res ?chunk-size ?chunkid ?pix-idx :> ?sample ?line)))

(defbufferop weighted-avg [tuples]
[(apply weighted-mean (flatten tuples))])

(defn run-rain
[gadm-src rain-src]
(let [gadm-src (static/static-tap gadm-src)
rain-src (rain-tap rain-src)
join (<- [?gadm ?mod-h ?mod-v ?sample ?line ?date ?count ?avg-rain]
(gadm-src _ ?mod-h ?mod-v ?sample ?line ?gadm)
(rain-src ?mod-h ?mod-v ?sample ?line ?date ?rain)
(c/count ?count)
(c/avg ?rain :> ?avg-rain))]
(<- [?gadm ?date ?weighted-avg]
(join ?gadm ?mod-h ?mod-v ?sample ?line ?date ?count ?avg-rain)
(weighted-avg ?avg-rain ?count :> ?weighted-avg))))

(defn -main [path]
(?- (io/myhfs-textline path)
(run-rain gadm-tap precl-tap)))
15 changes: 3 additions & 12 deletions src/clj/forma/hadoop/jobs/run_forma.clj
Expand Up @@ -8,7 +8,8 @@
[forma.hadoop.jobs.load-tseries :as tseries]
[forma.trends.analysis :as a]
[forma.source.modis :as modis]
[forma.source.fire :as fire])
[forma.source.fire :as fire]
[forma.source.static :as static])
(:gen-class))

(defn short-trend-shell
Expand Down Expand Up @@ -41,22 +42,12 @@
(fire-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?f-start _ ?f-series)
(io/adjust-fires est-map ?f-start ?f-series :> ?start ?fire-series)))

(defn static-tap
"TODO: Very similar to extract-tseries. Consolidate."
[static-src]
(<- [?s-res ?mod-h ?mod-v ?sample ?line ?val]
(static-src _ ?s-res _ ?tilestring ?chunkid ?chunk)
(io/count-vals ?chunk :> ?chunk-size)
(p/struct-index 0 ?chunk :> ?pix-idx ?val)
(modis/tilestring->hv ?tilestring :> ?mod-h ?mod-v)
(modis/tile-position ?s-res ?chunk-size ?chunkid ?pix-idx :> ?sample ?line)))

(defn dynamic-filter
"Returns a new generator of ndvi and rain timeseries obtained by
filtering out all pixels with VCF less than the supplied
`vcf-limit`."
[vcf-limit ndvi-src rain-src vcf-src]
(let [vcf-pixels (static-tap vcf-src)]
(let [vcf-pixels (static/static-tap vcf-src)]
(<- [?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start ?ndvi-series ?precl-series]
(ndvi-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?n-start _ ?n-series)
(rain-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?r-start _ ?r-series)
Expand Down
13 changes: 13 additions & 0 deletions src/clj/forma/source/static.clj
Expand Up @@ -5,6 +5,7 @@
[forma.reproject :only (wgs84-indexer
modis-indexer)])
(:require [cascalog.ops :as c]
[forma.hadoop.io :as io]
[forma.source.modis :as m]
[forma.hadoop.predicate :as p]
[clojure.contrib.duck-streams :as duck]))
Expand Down Expand Up @@ -101,3 +102,15 @@
(<- [?dataset ?spatial-res ?t-res ?tilestring ?chunkid ?chunk]
(window-src ?dataset ?spatial-res ?t-res ?tilestring _ ?chunkid ?window)
(p/window->struct [:int] ?window :> ?chunk))))

;; TODO: Think about dependencies with run-forma, fix this shit!
;; Consolidate with the new rain extraction.
(defn static-tap
"TODO: Very similar to extract-tseries. Consolidate."
[static-src]
(<- [?s-res ?mod-h ?mod-v ?sample ?line ?val]
(static-src _ ?s-res _ ?tilestring ?chunkid ?chunk)
(io/count-vals ?chunk :> ?chunk-size)
(p/struct-index 0 ?chunk :> ?pix-idx ?val)
(m/tilestring->hv ?tilestring :> ?mod-h ?mod-v)
(m/tile-position ?s-res ?chunk-size ?chunkid ?pix-idx :> ?sample ?line)))
2 changes: 1 addition & 1 deletion src/jvm/forma/tap/DataChunkPailStructure.java
Expand Up @@ -9,7 +9,7 @@ protected DataChunk createThriftObject() {
return new DataChunk();
}

public Class getType() {
public Class<DataChunk> getType() {
return DataChunk.class;
}
}
130 changes: 12 additions & 118 deletions src/jvm/forma/tap/SplitDataChunkPailStructure.java
@@ -1,126 +1,20 @@
package forma.tap;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import forma.schema.DataChunk;
import org.apache.thrift.TBase;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.TUnion;
import org.apache.thrift.meta_data.FieldMetaData;
import org.apache.thrift.meta_data.FieldValueMetaData;
import org.apache.thrift.meta_data.StructMetaData;

import forma.schema.DataValue;
import forma.schema.LocationProperty;
import forma.schema.LocationPropertyValue;
import forma.schema.ModisChunkLocation;

public class SplitDataChunkPailStructure extends DataChunkPailStructure {
// protected static interface FieldStructure {
// public boolean isValidTarget(String[] dirs);
// public void fillTarget(List<String> ret, Object val);
// }

// public static HashMap<Short, FieldStructure> validFieldMap = new HashMap<Short, FieldStructure>();

// private static Map<TFieldIdEnum, FieldMetaData> getMetadataMap(Class c) {
// try {
// Object o = c.newInstance();
// return (Map) c.getField("metaDataMap").get(o);
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// }

// protected static class EdgeStructure implements FieldStructure {
// public boolean isValidTarget(String[] dirs) {
// return true;
// }
// public void fillTarget(List<String> ret, Object val) {

// }
// }

// protected static class PropertyStructure implements FieldStructure {
// private short valueId;
// private HashSet<Short> validIds;

// private static short getIdForClass(Map<TFieldIdEnum, FieldMetaData> meta, Class toFind) {
// for(TFieldIdEnum k: meta.keySet()) {
// FieldValueMetaData md = meta.get(k).valueMetaData;
// if(md instanceof StructMetaData) {
// if(toFind.equals(((StructMetaData) md).structClass)) {
// return k.getThriftFieldId();
// }
// }
// }
// throw new RuntimeException("Could not find " + toFind.toString() + " in " + meta.toString());
// }

// public PropertyStructure(Class prop) {
// try {
// Map<TFieldIdEnum, FieldMetaData> propMeta = getMetadataMap(prop);
// Class valClass = Class.forName(prop.getName() + "Value");
// valueId = getIdForClass(propMeta, valClass);

// validIds = new HashSet<Short>();
// Map<TFieldIdEnum, FieldMetaData> valMeta = getMetadataMap(valClass);
// for(TFieldIdEnum valId: valMeta.keySet()) {
// validIds.add(valId.getThriftFieldId());
// }
// } catch(Exception e) {
// throw new RuntimeException(e);
// }
// }

// public boolean isValidTarget(String[] dirs) {
// if(dirs.length<2) return false;
// try {
// short s = Short.parseShort(dirs[1]);
// return validIds.contains(s);
// } catch(NumberFormatException e) {
// return false;
// }
// }

// public void fillTarget(List<String> ret, Object val) {
// ret.add("" + ((TUnion) ((TBase)val).getFieldValue(valueId)).getSetField().getThriftFieldId());
// }
// }

// static {
// for(DataUnit._Fields k: DataUnit.metaDataMap.keySet()) {
// FieldValueMetaData md = DataUnit.metaDataMap.get(k).valueMetaData;
// FieldStructure fieldStruct;
// if(md instanceof StructMetaData && ((StructMetaData) md).structClass.getName().endsWith("Property")) {
// fieldStruct = new PropertyStructure(((StructMetaData) md).structClass);
// } else {
// fieldStruct = new EdgeStructure();
// }
// validFieldMap.put(k.getThriftFieldId(), fieldStruct);
// }
// }

// @Override
// public boolean isValidTarget(String[] dirs) {
// if(dirs.length==0) return false;
// try {
// short id = Short.parseShort(dirs[0]);
// FieldStructure s = validFieldMap.get(id);
// if(s==null) return false;
// else return s.isValidTarget(dirs);
// } catch(NumberFormatException e) {
// return false;
// }
// }

// @Override
// public List<String> getTarget(Data object) {
// List<String> ret = new ArrayList<String>();
// DataUnit du = object.get_dataunit();
// short id = du.getSetField().getThriftFieldId();
// ret.add("" + id);
// validFieldMap.get(id).fillTarget(ret, du.getFieldValue());
// return ret;
// }

@Override
public List<String> getTarget(DataChunk t) {
List<String> ret = new ArrayList<String>();

ret.add(t.getDataset());
return ret;
}
}

0 comments on commit 2b2ed38

Please sign in to comment.