Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Repository is getting dirtier. getting close to the full run.

  • Loading branch information...
commit 6c9cb816e584d0d8ef426f77bd99afc53857523f 1 parent a8b58f3
@sritchie sritchie authored
View
67 dev/forma.thrift
@@ -0,0 +1,67 @@
+namespace java forma.generated
+
+struct FireTuple {
+ 1: i32 temp330;
+ 2: i32 conf50;
+ 3: i32 bothPreds;
+ 4: i32 count;
+}
+
+struct FormaValue {
+ 1: FireTuple fireValue;
+ 2: double shortDrop;
+ 3: double longDrop;
+ 4: double tStat;
+ 5: double paramBreak;
+}
+
+struct NeighborValue {
+ 1: FireTuple fireValue;
+ 2: i32 numNeighbors;
+ 3: double avgShortDrop;
+ 4: double minShortDrop;
+ 5: double avgLongDrop;
+ 6: double minLongDrop;
+ 7: double avgTStat;
+ 8: double minTStat;
+ 9: double avgParamBreak;
+ 10: double minParamBreak;
+}
+
+# Collection Wrappers
+
+struct DoubleArray {
+ 1: list<double> doubles
+}
+
+struct IntArray {
+ 1: list<i32> ints
+}
+
+struct FireSeries {
+ 1: i32 startIdx
+ 2: i32 endIdx;
+ 3: list<FireTuple> values
+}
+
+# TODO: Can we add this to DataValue directly?
+union ArrayValue {
+ 1: IntArray ints;
+ 2: DoubleArray doubles;
+}
+
+struct TimeSeries {
+ 1: i32 startIdx
+ 2: i32 endIdx;
+ 3: ArrayValue series;
+}
+
+union DataValue {
+ 1: i32 intVal;
+ 2: IntArray ints;
+ 3: double doubleVal;
+ 4: DoubleArray doubles;
+ 5: FireTuple fireVal;
+ 6: TimeSeries timeSeries;
+ 7: FireSeries fireSeries;
+}
View
11 dev/job-conf.clj
@@ -0,0 +1,11 @@
+(require '[clojure.string :as s])
+
+{"fs.s3n.awsAccessKeyId" (System/getenv "AWS_KEY")
+ "fs.s3n.awsSecretAccessKey" (System/getenv "AWS_SECRET")
+ "cascading.kryo.hierarchy.registrations"
+ "clojure.lang.IRecord,carbonite.PrintDupSerializer"
+ "cascading.kryo.serializations"
+ (s/join ":" ["forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer"
+ "forma.schema.FireValue,carbonite.PrintDupSerializer"
+ "forma.schema.FormaValue,carbonite.PrintDupSerializer"
+ "forma.schema.NeighborValue,carbonite.PrintDupSerializer"])}
View
5 project.clj
@@ -7,7 +7,6 @@
:repositories {"conjars" "http://conjars.org/repo/"}
:marginalia {:javascript ["mathjax/MathJax.js"]}
:javac-options {:debug "true" :fork "true"}
- :checksum-deps true
:jvm-opts ["-XX:MaxPermSize=128M" "-Xms1024M" "-Xmx2048M" "-server"]
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.cli "0.1.0"]
@@ -17,12 +16,12 @@
[clj-time "0.3.4"]
[forma/gdal "1.8.0"]
[forma/jblas "1.2.1"]
- [cascalog "1.9.0-wip4"]
+ [cascalog "1.9.0-wip"]
[cascalog-checkpoint "0.1.1"]
[backtype/dfs-datastores "1.1.0"]
+ ;; [org.apache.thrift/libthrift "0.8.0"]
[backtype/dfs-datastores-cascading "1.1.1"]]
:dev-dependencies [[org.apache.hadoop/hadoop-core "0.20.2-dev"]
- [incanter/incanter-charts "1.3.0-SNAPSHOT"]
[midje-cascalog "0.4.0"]]
:aot [forma.hadoop.pail
forma.schema
View
13 resources/job-conf.clj
@@ -1 +1,12 @@
-{"cascading.kryo.hierarchy.registrations" "clojure.lang.IRecord,carbonite.PrintDupSerializer"}
+(require '[clojure.string :as s])
+
+{"cascading.kryo.hierarchy.registrations"
+ "clojure.lang.IRecord,carbonite.PrintDupSerializer"
+ "cascading.kryo.serializations"
+ (s/join ":" ["forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer"
+ "forma.schema.FireValue,carbonite.PrintDupSerializer"
+ "forma.schema.FormaValue,carbonite.PrintDupSerializer"
+ "forma.schema.NeighborValue,carbonite.PrintDupSerializer"])}
+
+
+
View
65 src/clj/forma/hadoop/jobs/forma.clj
@@ -13,16 +13,12 @@
(defn short-trend-shell
"a wrapper to collect the short-term trends into a form that can be
manipulated from within cascalog."
- [{:keys [est-start est-end t-res long-block window]}
- spectral-series reli-series]
- (let [ts-start (:start-idx spectral-series)
- freq (date/res->period-count t-res)
+ [{:keys [est-start est-end t-res long-block window]} ts-start spectral reli]
+ (let [freq (date/res->period-count t-res)
new-start (date/datetime->period t-res est-start)
[start end] (date/relative-period t-res ts-start [est-start est-end])]
- [(->> (a/telescoping-short-trend long-block window freq start end
- (:series spectral-series)
- (:series reli-series))
- (schema/ts-record new-start))]))
+ [new-start
+ (a/telescoping-short-trend long-block window freq start end spectral reli)]))
;; We're mapping across two sequences at the end, there; the
;; long-series and the t-stat-series.
@@ -32,24 +28,22 @@
time-series (and cofactors) to extract the long-term trends and
t-statistics from the time-series."
[{:keys [est-start est-end t-res long-block window]}
- ts-series reli-series rain-series]
- (let [ts-start (:start-idx ts-series)
- freq (date/res->period-count t-res)
+ ts-start ts-series reli-series rain-series]
+ (let [freq (date/res->period-count t-res)
new-start (date/datetime->period t-res est-start)
[start end] (date/relative-period t-res ts-start [est-start est-end])]
- (apply map (comp (partial schema/ts-record new-start)
- vector)
- (a/telescoping-long-trend freq start end
- (:series ts-series)
- (:series reli-series)
- (:series rain-series)))))
+ (cons new-start
+ (apply map vector
+ (a/telescoping-long-trend freq start end
+ ts-series
+ reli-series
+ rain-series)))))
(def get-loc
(<- [?chunk :> ?s-res ?mod-h ?mod-v ?sample ?line ?val]
- (map ?chunk [:location :value] :> ?location ?val)
- (schema/unpack-pixel-location ?location :> ?s-res ?mod-h ?mod-v ?sample ?line)))
+ (map ?chunk [:location :value] :> ?loc ?val)
+ (schema/unpack-pixel-location ?loc :> ?s-res ?mod-h ?mod-v ?sample ?line)))
-;; TODO: implement comparable for our records.
(defn fire-tap
"Accepts an est-map and a query source of fire timeseries. Note that
this won't work, pulling directly from the pail!"
@@ -60,11 +54,12 @@
(schema/adjust-fires est-map ?f-series :> ?fire-series)))
(defn filter-query [vcf-src vcf-limit chunk-src]
- (<- [?s-res ?mod-h ?mod-v ?sample ?line ?ts-record]
+ (<- [?s-res ?mod-h ?mod-v ?sample ?line ?start ?ts]
(chunk-src _ ?ts-chunk)
(vcf-src _ ?vcf-chunk)
(get-loc ?ts-chunk :> ?s-res ?mod-h ?mod-v ?sample ?line ?series)
- (schema/map->TimeSeriesValue ?series :> ?ts-record)
+ (:distinct false)
+ (map ?series [:start-idx :series] :> ?start ?ts)
(p/blossom-chunk ?vcf-chunk :> ?s-res ?mod-h ?mod-v ?sample ?line ?vcf)
(>= ?vcf vcf-limit)))
@@ -73,11 +68,12 @@
filtering out all pixels with VCF less than the supplied
`vcf-limit`."
[ndvi-src reli-src rain-src]
- (<- [?s-res ?mod-h ?mod-v ?sample ?line ?ndvi-ts ?precl-ts ?reli-ts]
- (ndvi-src ?s-res ?mod-h ?mod-v ?sample ?line ?ndvi)
- (reli-src ?s-res ?mod-h ?mod-v ?sample ?line ?reli)
- (rain-src ?s-res ?mod-h ?mod-v ?sample ?line ?rain)
- (schema/adjust-timeseries ?rain ?ndvi ?reli :> ?precl-ts ?ndvi-ts ?reli-ts)
+ (<- [?s-res ?mod-h ?mod-v ?sample ?line ?start-idx ?ndvi-ts ?precl-ts ?reli-ts]
+ (ndvi-src ?s-res ?mod-h ?mod-v ?sample ?line ?n-start ?ndvi)
+ (reli-src ?s-res ?mod-h ?mod-v ?sample ?line ?r-start ?reli)
+ (rain-src ?s-res ?mod-h ?mod-v ?sample ?line ?p-start ?precl)
+ (schema/adjust ?p-start ?precl ?n-start ?ndvi ?r-start ?reli
+ :> ?start-idx ?precl-ts ?ndvi-ts ?reli-ts)
(:distinct false)))
(defn dynamic-tap
@@ -88,10 +84,10 @@
occur before the analysis. Note that all variable names within this
query are TIMESERIES, not individual values."
[est-map dynamic-src]
- (<- [?s-res ?mod-h ?mod-v ?sample ?line ?short ?break ?long ?t-stat]
- (dynamic-src ?s-res ?mod-h ?mod-v ?sample ?line ?ndvi ?precl ?reli)
- (short-trend-shell est-map ?ndvi ?reli :> ?short)
- (long-trend-shell est-map ?ndvi ?reli ?precl :> ?break ?long ?t-stat)
+ (<- [?s-res ?mod-h ?mod-v ?sample ?line ?new-start ?short ?break ?long ?t-stat]
+ (dynamic-src ?s-res ?mod-h ?mod-v ?sample ?line ?start ?ndvi ?precl ?reli)
+ (short-trend-shell est-map ?start ?ndvi ?reli :> ?new-start ?short)
+ (long-trend-shell est-map ?start ?ndvi ?reli ?precl :> _ ?break ?long ?t-stat)
(:distinct false)))
(defn forma-tap
@@ -100,11 +96,10 @@
Note that all values internally discuss timeseries."
[dynamic-src fire-src]
- (<- [?s-res ?period ?mod-h ?mod-v ?sample ?line ?forma-val]
- (fire-src ?s-res ?mod-h ?mod-v ?sample ?line !!fire)
- (dynamic-src ?s-res ?mod-h ?mod-v ?sample ?line ?short ?break ?long ?t-stat)
+ (<- [?s-res ?period ?mh ?mv ?s ?l ?forma-val]
+ (fire-src ?s-res ?mh ?mv ?s ?l !!fire)
+ (dynamic-src ?s-res ?mh ?mv ?s ?l ?start ?short ?break ?long ?t-stat)
(schema/forma-seq !!fire ?short ?break ?long ?t-stat :> ?forma-seq)
- (get ?short :start-idx :> ?start)
(p/index ?forma-seq :zero-index ?start :> ?period ?forma-val)
(:distinct false)))
View
85 src/clj/forma/hadoop/jobs/scatter.clj
@@ -135,7 +135,14 @@
(:distinct false))))
(comment
+ (??<- [?a ?b]
+ ((constrained-tap
+ "/Users/sritchie/Desktop/mypail" "vcf" "500" "00" [[8 6]]) ?a ?b)
+ (:distinct false))
+
"This command runs FORMA."
+ (use 'forma.hadoop.jobs.scatter)
+ (in-ns 'forma.hadoop.jobs.scatter)
(formarunner "/user/hadoop/checkpoints"
"s3n://pailbucket/master"
"s3n://pailbucket/series"
@@ -172,10 +179,21 @@
ts-src))]
(assert est-map (str run-key " is not a valid run key!"))
(workflow [tmp-root]
- vcf-step ([:tmp-dirs vcf-path]
- (?- (hfs-seqfile vcf-path)
- (constrained-tap
- pail-path "vcf" s-res "00" country-seq)))
+ vcf-step
+ ([:tmp-dirs vcf-path]
+ (?- (hfs-seqfile vcf-path)
+ (<- [?a ?b]
+ ((constrained-tap
+ pail-path "vcf" s-res "00" country-seq) ?a ?b)
+ (:distinct false))))
+
+ ndvi-step
+ ([:tmp-dirs ndvi-path]
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile ndvi-path)
+ (mk-filter vcf-path
+ (constrained-tap
+ ts-pail-path "ndvi" s-res t-res country-seq)))))
fire-step ([:tmp-dirs fire-path]
(?- (hfs-seqfile fire-path)
@@ -187,35 +205,33 @@
adjustfires
([:tmp-dirs adjusted-fire-path]
- (?- (hfs-seqfile adjusted-fire-path)
- (forma/fire-tap est-map (hfs-seqfile fire-path))))
-
- ndvi-step
- ([:tmp-dirs ndvi-path]
- (?- (hfs-seqfile ndvi-path)
- (mk-filter vcf-path
- (constrained-tap
- ts-pail-path "ndvi" s-res t-res country-seq))))
-
- reli-step
- ([:tmp-dirs reli-path]
- (?- (hfs-seqfile reli-path)
- (mk-filter vcf-path
- (constrained-tap
- ts-pail-path "reli" s-res t-res country-seq))))
+ (with-job-conf
+ {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile adjusted-fire-path)
+ (forma/fire-tap est-map (hfs-seqfile fire-path)))))
rain-step
([:tmp-dirs rain-path]
- (?- (hfs-seqfile rain-path)
- (mk-filter vcf-path
- (new-adjusted-precl-tap
- ts-pail-path "1000" "32" t-res country-seq))))
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile rain-path)
+ (mk-filter vcf-path
+ (new-adjusted-precl-tap
+ ts-pail-path "1000" "32" t-res country-seq)))))
+ reli-step
+ ([:tmp-dirs reli-path]
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile reli-path)
+ (mk-filter vcf-path
+ (constrained-tap
+ ts-pail-path "reli" s-res t-res country-seq)))))
+
adjustseries
([:tmp-dirs adjusted-series-path]
"Adjusts the lengths of all timeseries
and filters out timeseries below the proper VCF value."
- (with-job-conf {"mapred.min.split.size" 805306368}
+ (with-job-conf {"mapred.min.split.size" 805306368
+ "cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
(?- (hfs-seqfile adjusted-series-path)
(forma/dynamic-filter (hfs-seqfile ndvi-path)
(hfs-seqfile reli-path)
@@ -223,23 +239,26 @@
trends ([:tmp-dirs dynamic-path]
"Runs the trends processing."
- (?- (hfs-seqfile dynamic-path)
- (forma/dynamic-tap
- est-map (hfs-seqfile adjusted-series-path))))
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile dynamic-path)
+ (forma/dynamic-tap
+ est-map (hfs-seqfile adjusted-series-path)))))
mid-forma ([:tmp-dirs forma-mid-path
:deps [trends adjustfires]]
- (?- (hfs-seqfile forma-mid-path)
- (forma/forma-tap (hfs-seqfile dynamic-path)
- (hfs-seqfile adjusted-fire-path))))
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile forma-mid-path)
+ (forma/forma-tap (hfs-seqfile dynamic-path)
+ (hfs-seqfile adjusted-fire-path)))))
final-forma
([] (let [names ["?s-res" "?period" "?mod-h" "?mod-v"
"?sample" "?line" "?forma-val"]
mid-src (-> (hfs-seqfile forma-mid-path)
(name-vars names))]
- (?- (hfs-seqfile out-path)
- (forma/forma-query est-map mid-src)))))))
+ (with-job-conf {"cascading.kryo.serializations" "forma.schema.TimeSeriesValue,carbonite.PrintDupSerializer:forma.schema.FireValue,carbonite.PrintDupSerializer:forma.schema.FormaValue,carbonite.PrintDupSerializer:forma.schema.NeighborValue,carbonite.PrintDupSerializer"}
+ (?- (hfs-seqfile out-path)
+ (forma/forma-query est-map mid-src))))))))
(defn populate-local [main-path timeseries-path]
(doto timeseries-path
View
30 src/clj/forma/schema.clj
@@ -53,7 +53,14 @@
:end-idx end-idx
:series (vec series)})))
-(defn fields-compare [m1 m2 fn-seq]
+(gen-class :name forma.kryo.RecordComparator
+ :prefix "fields-"
+ :methods
+ [^{:static true}
+ [recordCompare [java.util.Map java.util.Map java.util.List] int]])
+
+(defn fields-recordCompare
+ [m1 m2 fn-seq]
(loop [[f & more] fn-seq]
(let [val (compare (f m1) (f m2))]
(if (and more (zero? val))
@@ -63,7 +70,8 @@
(defrecord TimeSeriesValue [start-idx end-idx series]
Comparable
(compareTo [m1 m2]
- (fields-compare m1 m2 [:start-idx :end-idx (comp vec :series)])))
+ (forma.kryo.RecordComparator/recordCompare
+ m1 m2 [:start-idx :end-idx (comp vec :series)])))
(defn ts-record
([start-idx series]
@@ -107,7 +115,8 @@
(defrecord FireValue [temp-330 conf-50 both-preds count]
Comparable
(compareTo [m1 m2]
- (fields-compare m1 m2 [:temp-330 :conf-50 :both-preds :count])))
+ (forma.kryo.RecordComparator/recordCompare
+ m1 m2 [:temp-330 :conf-50 :both-preds :count])))
(defn fire-value
"Creates a `fire-value` object with counts of fires meeting certain criteria:
@@ -154,6 +163,7 @@
(let [[start end] (for [pd [est-start est-end]]
(date/datetime->period t-res pd))]
[(->> (:series f-series)
+ (map map->FireValue)
(u/trim-seq start (inc end) (:start-idx f-series))
(ts-record start))]))
@@ -170,7 +180,8 @@
[fire-value short-drop param-break long-drop t-stat]
Comparable
(compareTo [m1 m2]
- (fields-compare m1 m2 [:t-stat :fire :short :param-break :long])))
+ (forma.kryo.RecordComparator/recordCompare
+ m1 m2 [:t-stat :fire :short :param-break :long])))
(defn forma-value
"Creates forma object containing various characteristics of pixel timeseries
@@ -186,7 +197,7 @@
:t-stat 1.725}"
[fire short param-break long t-stat]
(let [fire (or fire (FireValue. 0 0 0 0))]
- (FormaValue. fire short param-break long t-stat)))
+ [fire short param-break long t-stat]))
(defn unpack-forma-val
"Returns a vector containing the fire value, short drop,
@@ -212,9 +223,10 @@
avg-t-stat min-t-stat]
Comparable
(compareTo [m1 m2]
- (fields-compare [:neighbor-count :fire-value :avg-short-drop :min-short-drop
- :avg-param-break :min-param-break :avg-long-drop :min-long-drop
- :avg-t-stat :min-t-stat])))
+ (forma.kryo.RecordComparator/recordCompare
+ m1 m2 [:neighbor-count :fire-value :avg-short-drop :min-short-drop
+ :avg-param-break :min-param-break :avg-long-drop :min-long-drop
+ :avg-t-stat :min-t-stat])))
(defn neighbor-value
"Accepts either a forma value or a sequence of sub-values."
@@ -394,5 +406,5 @@
(forma-seq fire-series short-series long-series t-stat-series)"
[& in-series]
[(->> in-series
- (map #(or (:series %) (repeat %)))
+ (map #(or % (repeat %)))
(apply map forma-value))])
View
16 src/jvm/forma/kryo/RecordSerializer.java
@@ -1,16 +0,0 @@
-package forma.kryo;
-
-import com.esotericsoftware.kryo.Serializer;
-
-import java.nio.ByteBuffer;
-
-/** User: sritchie Date: 2/19/12 Time: 6:56 PM */
-public class RecordSerializer extends Serializer {
- @Override public void writeObjectData(ByteBuffer byteBuffer, Object o) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override public <T> T readObjectData(ByteBuffer byteBuffer, Class<T> tClass) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
View
2  src/jvm/forma/tap/KryoPailStructure.java
@@ -16,7 +16,7 @@
private ObjectBuffer getKryoBuffer() {
if(kryoBuf == null) {
ClojureKryoSerialization serialization = new ClojureKryoSerialization();
- kryoBuf = KryoFactory.newBuffer(serialization.populatedKryo());
+ kryoBuf = KryoFactory.newBuffer(serialization.makeKryo());
}
return kryoBuf;
}
View
1  test/forma/postprocess/select_test.clj
@@ -10,7 +10,6 @@
[forma.trends.data :only (ndvi rain reli)]
[forma.reproject :only (modis->latlon)])
(:require [incanter.core :as i]
- [incanter.charts :as chart]
[forma.testing :as t]
[cascalog.ops :as c]
[forma.schema :as schema]))
Please sign in to comment.
Something went wrong with that request. Please try again.