This repository has been archived by the owner on Oct 8, 2019. It is now read-only.
/
run_forma.clj
185 lines (165 loc) · 7.93 KB
/
run_forma.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
(ns forma.hadoop.jobs.run-forma
(:use cascalog.api
[forma.source.tilesets :only (tile-set)])
(:require [forma.matrix.walk :as w]
[forma.date-time :as date]
[forma.hadoop.io :as io]
[forma.hadoop.predicate :as p]
[forma.hadoop.jobs.load-tseries :as tseries]
[forma.trends.analysis :as a]
[forma.source.modis :as modis]
[forma.source.fire :as fire]
[forma.source.static :as static])
(:gen-class))
(defn short-trend-shell
"a wrapper to collect the short-term trends into a form that can be
manipulated from within cascalog."
[{:keys [est-start est-end t-res long-block window]} ts-start ts-series]
(let [[start end] (date/relative-period t-res ts-start [est-start est-end])]
[(date/datetime->period t-res est-start)
(->> (io/get-vals ts-series)
(a/collect-short-trend start end long-block window)
io/to-struct)]))
(defn long-trend-shell
"a wrapper that takes a map of options and attributes of the input
time-series (and cofactors) to extract the long-term trends and
t-statistics from the time-series."
[{:keys [est-start est-end t-res long-block window]} ts-start ts-series & cofactors]
(let [[start end] (date/relative-period t-res ts-start [est-start est-end])]
(apply vector
(date/datetime->period t-res est-start)
(->> (a/collect-long-trend start end
(io/get-vals ts-series)
(map io/get-vals cofactors))
(apply map (comp io/to-struct vector))))))
(defn fire-tap
"Accepts an est-map and a source of fire timeseries."
[est-map fire-src]
(<- [?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start ?fire-series]
(fire-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?f-start _ ?f-series)
(io/adjust-fires est-map ?f-start ?f-series :> ?start ?fire-series)))
(defn dynamic-filter
"Returns a new generator of ndvi and rain timeseries obtained by
filtering out all pixels with VCF less than the supplied
`vcf-limit`."
[vcf-limit ndvi-src rain-src vcf-src]
(let [vcf-pixels (static/static-tap vcf-src)]
(<- [?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start ?ndvi-series ?precl-series]
(ndvi-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?n-start _ ?n-series)
(rain-src _ ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?r-start _ ?r-series)
(io/adjust ?r-start ?r-series ?n-start ?n-series :> ?start ?precl-series ?ndvi-series)
(vcf-pixels ?s-res ?mod-h ?mod-v ?sample ?line ?vcf)
(>= ?vcf vcf-limit))))
(defn dynamic-tap
"Accepts an est-map, and sources for ndvi and rain timeseries and
vcf values split up by pixel."
[est-map dynamic-src]
(<- [?s-res ?t-res ?mod-h ?mod-v ?sample ?line
?est-start ?short-series ?long-series ?t-stat-series]
(dynamic-src ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start ?ndvi-series ?precl-series)
(short-trend-shell est-map ?start ?ndvi-series :> ?est-start ?short-series)
(long-trend-shell est-map ?start ?ndvi-series ?precl-series :> ?est-start ?long-series ?t-stat-series)))
(defn forma-tap
"Accepts an est-map and sources for ndvi, rain, and fire timeseries,
plus a source of static vcf pixels."
[est-map ndvi-src rain-src vcf-src fire-src]
(let [fire-src (fire-tap est-map fire-src)
{lim :vcf-limit} est-map
dynamic-src (->> (dynamic-filter lim ndvi-src rain-src vcf-src)
(dynamic-tap est-map))]
(<- [?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?period ?forma-val]
(dynamic-src ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start
?short-series ?long-series ?t-stat-series)
(fire-src ?s-res ?t-res ?mod-h ?mod-v ?sample ?line ?start !!fire-series)
(io/forma-schema !!fire-series ?short-series ?long-series ?t-stat-series :> ?forma-series)
(p/struct-index ?start ?forma-series :> ?period ?forma-val))))
;; Processes all neighbors... Returns the index within the chunk, the
;; value, and the aggregate of the neighbors.
(defmapcatop [process-neighbors [num-neighbors]]
[window]
(->> (for [[val neighbors] (w/neighbor-scan num-neighbors window)
:when val]
[val (->> neighbors
(apply concat)
(filter (complement nil?))
(io/combine-neighbors))])
(map-indexed cons)))
(defn integerize [& strings]
(map #(Integer. %) strings))
(defn line->nums [convert-src]
(<- [?country ?admin]
(convert-src ?line)
(p/mangle #"," ?line :> ?country-s ?admin-s)
(integerize ?country-s ?admin-s :> ?country ?admin)))
(defn country-tap
"TODO: Very similar to extract-tseries, and almost identical to
static-tap. Consolidate."
[gadm-src convert-src]
(let [converter (line->nums convert-src)]
(<- [?s-res ?mod-h ?mod-v ?sample ?line ?country]
(gadm-src _ ?s-res _ ?tilestring ?chunkid ?chunk)
(converter ?country ?admin)
(io/count-vals ?chunk :> ?chunk-size)
(p/struct-index 0 ?chunk :> ?pix-idx ?admin)
(modis/tilestring->hv ?tilestring :> ?mod-h ?mod-v)
(modis/tile-position ?s-res ?chunk-size ?chunkid ?pix-idx :> ?sample ?line))))
(defn forma-query
"final query that walks the neighbors and spits out the values."
[est-map ndvi-src rain-src vcf-src country-src fire-src]
(let [{:keys [neighbors window-dims]} est-map
[rows cols] window-dims
src (-> (forma-tap est-map ndvi-src rain-src vcf-src fire-src)
(p/sparse-windower ["?sample" "?line"] window-dims "?forma-val" nil))]
(<- [?s-res ?t-res ?country ?datestring ?text]
(date/period->datetime ?t-res ?period :> ?datestring)
(src ?s-res ?t-res ?mod-h ?mod-v ?win-col ?win-row ?period ?window)
(country-src ?s-res ?mod-h ?mod-v ?sample ?line ?country)
(process-neighbors [neighbors] ?window :> ?win-idx ?val ?neighbor-vals)
(modis/tile-position cols rows ?win-col ?win-row ?win-idx :> ?sample ?line)
(io/textify ?mod-h ?mod-v ?sample ?line ?val ?neighbor-vals :> ?text))))
(defn forma-textline
[path pathstr]
(io/template-textline path pathstr
:outfields ["?text"]
:templatefields ["?s-res" "?t-res" "?country" "?datestring"]
:sink-parts 1))
;; Hardcoded in, for the big run.
(def *ndvi-path* "s3n://redddata/ndvi/1000-32/*/*/")
(def *rain-path* "s3n://redddata/precl/1000-32/*/*/")
(def *fire-path* "s3n://redddata/fire/1000-01/*/")
(def *vcf-tap*
(io/chunk-tap "s3n://redddata/"
"gadm"
"1000-00"
(for [[th tv] (vec (tile-set :IDN :MYS))]
(format "%03d%03d" th tv))))
(def *gadm-path* "s3n://redddata/gadm/1000-00/*/*/")
(def *convert-path* "s3n://modisfiles/ascii/admin-map.csv")
(def forma-map
{:est-start "2005-12-01"
:est-end "2011-04-01"
:t-res "32"
:neighbors 1
:window-dims [600 600]
:vcf-limit 25
:long-block 15
:window 5})
;; TODO: Rewrite this, so that we only need to give it a sequence of
;; countries (or tiles), and it'll generate the rest.
;; TODO: Outputting to this doesn't help. We need to do a separate job
;; with one reducer.
;; (forma-textline out-path "%s-%s/%s/%s/")
(defn -main
([] (let [src (hfs-seqfile "s3n://formares/results/")]
(?<- (forma-textline "s3n://formares/stata/" "%s-%s/%s/%s/")
[?s-res ?t-res ?country ?datestring ?text]
(src ?s-res ?t-res ?country ?datestring ?text))))
([out-path]
(let [ndvi-src (tseries/tseries-query *ndvi-path*)
rain-src (tseries/tseries-query *rain-path*)
vcf-src *vcf-tap*
country-src (country-tap (hfs-seqfile *gadm-path*)
(hfs-textline *convert-path*))
fire-src (fire/fire-query "32" "2000-11-01" "2011-04-01" *fire-path*)]
(?- (hfs-seqfile out-path)
(forma-query forma-map ndvi-src rain-src vcf-src country-src fire-src)))))