Skip to content

Commit

Permalink
p/process-folder
Browse files Browse the repository at this point in the history
  • Loading branch information
reborg committed Dec 2, 2018
1 parent dee6a14 commit d4027f6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Functions and macros:
| [`p/external-sort`](#pexternal-sort) | Memory efficient, file-based, parallel merge-sort.
| [`p/fold`](#pfold-pxrf-and-pfolder) | Transducer-aware `r/fold`.
| [`p/transduce`](#ptransduce) | Parallel version of `transduce` based on `p/fold`.
| [`p/process-filder`](#pprocess-folder) | Process the files in a folder in parallel.
| [`p/min` and `p/max`](#pmin-and-pmax) | Parallel `core/min` and `core/max` functions.
| [`p/distinct`](#pdistinct) | Parallel version of `core/distinct`
| [`p/amap`](#pamap) | Parallel array transformation.
Expand Down Expand Up @@ -579,6 +580,21 @@ The equivalent operation attempted on `reducers/fold` would give inconsistent re
;; Sometimes ArrayOutOfBound, sometimes a bunch of random partitions.
```

### `p/process-folder`

`p/process-folder` applies a composition of transducers to all files in a folder in parallel. The first transducer in the pipeline should expect a line of text. You can use something like `split -l 10000 -a 4 <filename> segment-` to split a large files into many smaller ones of 10k lines each. After you move them in a folder (please be sure it contains only the files that need processing) you're good to go, for example:

```clojure
(p/process-folder
"folder-name-as-string"
(comp (map s/trim)
(remove s/blank?)
(map #(s/split % #"\,"))
(map peek)))
```

The snippet above takes the last value for each line of each CSV file in a folder. `p/process-folder` is eager: if the files are many or lines are big, there is nothing `p/process-folder` can do to avoid out of memory. Try to compose your transducers so they process and aggregate data as needed returning a result that can fit into memory.

### `p/min` and `p/max`

`p/min` and `p/max` find the minimum or maximum in a vector of numbers in parallel (the input collection is converted into a vector if it's not already):
Expand Down
16 changes: 15 additions & 1 deletion src/parallel/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
(:import
[parallel.merge_sort MergeSort]
[parallel.map_combine MapCombine]
[java.io File FileInputStream]
[java.io FileInputStream BufferedReader FileReader Reader StringReader File]
[java.util.concurrent.atomic AtomicInteger AtomicLong]
[java.util.concurrent ConcurrentHashMap ConcurrentLinkedQueue]
[java.util HashMap Collections Queue Map]))
Expand Down Expand Up @@ -455,3 +455,17 @@
`(~f ~target)))
forms))
~target)))

(defn process-folder
"Applies xforms to all lines of all files inside folder. It supports
statful transducers, for example to skip the header (= line1) for each file."
[^String folder xforms]
(transduce
(comp
(mapcat (fn [^File f]
(with-open [br (BufferedReader. (FileReader. f))]
(doall (line-seq br)))))
xforms)
(completing conj! persistent!)
(fn combinef ([] (transient [])) ([v1 v2] (into v1 v2)))
(into [] (rest (file-seq (java.io.File. folder))))))

0 comments on commit d4027f6

Please sign in to comment.