Skip to content

Commit

Permalink
Merge pull request #228 from mrrodriguez/sorted-coll-durability-fix
Browse files Browse the repository at this point in the history
Sorted coll Fressian handler fixes
  • Loading branch information
rbrush committed Oct 17, 2016
2 parents 6d14dbf + 2ec6c61 commit f940884
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 56 deletions.
100 changes: 62 additions & 38 deletions src/main/clojure/clara/rules/durability.clj
Original file line number Diff line number Diff line change
Expand Up @@ -401,60 +401,84 @@
:activation-group-fn
:alphas-fn))))

(defn create-default-get-alphas-fn [rulebase]
(@#'com/create-get-alphas-fn type ancestors rulebase))
(def ^:private create-get-alphas-fn @#'com/create-get-alphas-fn)

(defn- opts->get-alphas-fn [rulebase opts]
(let [fact-type-fn (:fact-type-fn opts type)
ancestors-fn (:ancestors-fn opts ancestors)]
(create-get-alphas-fn fact-type-fn
ancestors-fn
rulebase)))

(defn assemble-restored-session
"Builds a Clara session from the given rulebase and memory components.
"Builds a Clara session from the given rulebase and memory components. When no memory is given a new
one is created with all of the defaults of eng/local-memory.
Note! This function should not typically be used. It is left public to assist in ISessionSerializer
durability implementations. Use clara.rules/mk-session typically to make rule sessions.
Options can be provided via opts.
These include:
* :fact-type-fn
* :ancestors-fn
* :activation-group-sort-fn
* :activation-group-fn
* :get-alphas-fn
If the options are not provided, they will default to the Clara session defaults.
These are all described in detail in clara.rules/mk-session docs.
Note! Currently this only supports the clara.rules.memory.PersistentLocalMemory implementation
of memory."
[rulebase memory opts]
(let [opts (-> opts
(assoc :rulebase rulebase)
;; Right now activation fns do not serialize.
(update :activation-group-sort-fn
#(eng/options->activation-group-sort-fn {:activation-group-sort-fn %}))
(update :activation-group-fn
#(eng/options->activation-group-fn {:activation-group-fn %}))
;; TODO: Memory doesn't seem to ever need this or use it. Can we just remove it from memory?
(update :get-alphas-fn
#(or % (create-default-get-alphas-fn rulebase))))

{:keys [listeners transport get-alphas-fn]} opts

memory-opts (select-keys opts
#{:rulebase
:activation-group-sort-fn
:activation-group-fn
:get-alphas-fn})

transport (or transport (clara.rules.engine.LocalTransport.))
listeners (or listeners [])

memory (-> memory
(merge memory-opts)
;; Naming difference for some reason.
(set/rename-keys {:get-alphas-fn :alphas-fn})
mem/map->PersistentLocalMemory)]

(eng/assemble {:rulebase rulebase
:memory memory
:transport transport
:listeners listeners
:get-alphas-fn get-alphas-fn})))
([rulebase opts]
(let [opts (assoc opts
:rulebase
rulebase
:get-alphas-fn
(opts->get-alphas-fn rulebase opts))
{:keys [listeners transport get-alphas-fn]} opts]

(eng/assemble {:rulebase rulebase
:memory (eng/local-memory rulebase
(clara.rules.engine.LocalTransport.)
(eng/options->activation-group-sort-fn opts)
(eng/options->activation-group-fn opts)
;; TODO: Memory doesn't seem to ever need this or use
;; it. Can we just remove it from memory?
get-alphas-fn)
:transport (or transport (clara.rules.engine.LocalTransport.))
:listeners (or listeners [])
:get-alphas-fn get-alphas-fn})))

([rulebase memory opts]
(let [opts (-> opts
(assoc :rulebase
rulebase
;; Right now get alphas fn does not serialize.
:get-alphas-fn
(opts->get-alphas-fn rulebase opts))
;; Right now activation fns do not serialize.
(update :activation-group-sort-fn
#(eng/options->activation-group-sort-fn {:activation-group-sort-fn %}))
(update :activation-group-fn
#(eng/options->activation-group-fn {:activation-group-fn %})))

{:keys [listeners transport get-alphas-fn]} opts

memory-opts (select-keys opts
#{:rulebase
:activation-group-sort-fn
:activation-group-fn
:get-alphas-fn})]

(eng/assemble {:rulebase rulebase
:memory (-> memory
(merge memory-opts)
;; Naming difference for some reason.
(set/rename-keys {:get-alphas-fn :alphas-fn})
mem/map->PersistentLocalMemory)
:transport (or transport (clara.rules.engine.LocalTransport.))
:listeners (or listeners [])
:get-alphas-fn get-alphas-fn}))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;;; Serialization protocols.
Expand Down
31 changes: 24 additions & 7 deletions src/main/clojure/clara/rules/durability/fressian.clj
Original file line number Diff line number Diff line change
Expand Up @@ -255,32 +255,49 @@
:writer (reify WriteHandler
(write [_ w o]
(let [cname (d/sorted-comparator-name o)]
(.writeTag w "clj/treeset" 2)
(.writeTag w "clj/treeset" 3)
(if cname
(.writeObject w cname true)
(.writeNull w))
;; Preserve metadata.
(if-let [m (meta o)]
(.writeObject w m)
(.writeNull w))
(.writeList w o))))
:readers {"clj/treeset"
(reify ReadHandler
(read [_ rdr tag component-count]
(let [c (some-> rdr .readObject resolve deref)]
(d/seq->sorted-set (.readObject rdr) c))))}}

(let [c (some-> rdr .readObject resolve deref)
m (.readObject rdr)
s (-> (.readObject rdr)
(d/seq->sorted-set c))]
(if m
(with-meta s m)
s))))}}

"clj/treemap"
{:class clojure.lang.PersistentTreeMap
:writer (reify WriteHandler
(write [_ w o]
(let [cname (d/sorted-comparator-name o)]
(.writeTag w "clj/treemap" 2)
(.writeTag w "clj/treemap" 3)
(if cname
(.writeObject w cname true)
(.writeNull w))
;; Preserve metadata.
(if-let [m (meta o)]
(.writeObject w m)
(.writeNull w))
(write-map w o))))
:readers {"clj/treemap"
(reify ReadHandler
(read [_ rdr tag component-count]
(let [c (some-> rdr .readObject resolve deref)]
(d/seq->sorted-map (.readObject rdr) c))))}}
(let [c (some-> rdr .readObject resolve deref)
m (.readObject rdr)
s (d/seq->sorted-map (.readObject rdr) c)]
(if m
(with-meta s m)
s))))}}

"clj/mapentry"
{:class clojure.lang.MapEntry
Expand Down
142 changes: 131 additions & 11 deletions src/test/clojure/clara/test_durability.clj
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,8 @@
"expected fact: " expected-fact \newline
"fact: " fact \newline)))))))

(defn durability-test
"Test runner to run different implementations of d/ISessionSerializer."
[serde-type]
(let [s (mk-session 'clara.durability-rules)

;; Testing identity relationships on the IWorkingMemorySerializer facts received to serialize.
;; So this is a little weird, but we want to know the exact object identity of even these
;; "primitive" values.
mci "MCI"
(defn session-test [s]
(let [mci "MCI"
lax "LAX"
san "SAN"
chi "CHI"
Expand Down Expand Up @@ -119,7 +112,66 @@
cold-res (query fired dr/cold-temp)
hot-res (query fired dr/hot-temp)
temp-his-res (query fired dr/temp-his)
temps-under-thresh-res (query fired dr/temps-under-thresh)
temps-under-thresh-res (query fired dr/temps-under-thresh)]
{:all-objs [mci
lax
san
chi
irk
ten
twenty
fifty
forty
thirty
thresh50
temp50
temp40
temp30
temp20
ws50
ws40
ws10]
:fired-session fired
:query-results {:unpaired-res unpaired-res
:cold-res cold-res
:hot-res hot-res
:temp-his-res temp-his-res
:temps-under-thresh-res temps-under-thresh-res}}))

(defn durability-test
"Test runner to run different implementations of d/ISessionSerializer."
[serde-type]
(let [s (mk-session 'clara.durability-rules)
results (session-test s)
;; Testing identity relationships on the IWorkingMemorySerializer facts received to serialize.
;; So this is a little weird, but we want to know the exact object identity of even these
;; "primitive" values.
[mci
lax
san
chi
irk
ten
twenty
fifty
forty
thirty
thresh50
temp50
temp40
temp30
temp20
ws50
ws40
ws10] (:all-objs results)

fired (:fired-session results)

{:keys [unpaired-res
cold-res
hot-res
temp-his-res
temps-under-thresh-res]} (:query-results results)

create-serializer (fn [stream]
;; Currently only one.
Expand Down Expand Up @@ -254,4 +306,72 @@
(check-fact expected-fact fact)))))))

(deftest test-durability-fressian-serde
(durability-test :fressian))
(testing "SerDe of the rulebase along with working memory"
(durability-test :fressian))

(testing "Repeated SerDe of rulebase"
(let [rb-serde (fn [s]
(with-open [baos (java.io.ByteArrayOutputStream.)]
(d/serialize-rulebase s (df/create-session-serializer baos))
(let [rb-data (.toByteArray baos)]
(with-open [bais (java.io.ByteArrayInputStream. rb-data)]
(d/deserialize-rulebase (df/create-session-serializer bais))))))

s (mk-session 'clara.durability-rules)
rb (-> s eng/components :rulebase)
deserialized1 (rb-serde s)
;; Need a session to do the 2nd round of SerDe.
restored1 (d/assemble-restored-session deserialized1 {})
deserialized2 (rb-serde restored1)
restored2 (d/assemble-restored-session deserialized2 {})

init-qresults (:query-results (session-test s))
restored-qresults1 (:query-results (session-test restored1))
restored-qresults2 (:query-results (session-test restored2))]

(is (= init-qresults
restored-qresults1
restored-qresults2)))))

(deftest test-assemble-restored-session-opts
(let [orig (mk-session 'clara.durability-rules)

test-assemble (fn [rulebase memory]
(let [activation-group-fn-called? (volatile! false)
activation-group-sort-fn-called? (volatile! false)
fact-type-fn-called? (volatile! false)
ancestors-fn-called? (volatile! false)

opts {:activation-group-fn (fn [x]
(vreset! activation-group-fn-called? true)
(or (some-> x :props :salience)
0))
:activation-group-sort-fn (fn [x y]
(vreset! activation-group-sort-fn-called? true)
(> x y))
:fact-type-fn (fn [x]
(vreset! fact-type-fn-called? true)
(type x))
:ancestors-fn (fn [x]
(vreset! ancestors-fn-called? true)
(ancestors x))}

restored (if memory
(d/assemble-restored-session rulebase memory opts)
(d/assemble-restored-session rulebase opts))]

(is (= (:query-results (session-test orig))
(:query-results (session-test restored))))

(is (true? @activation-group-sort-fn-called?))
(is (true? @activation-group-fn-called?))
(is (true? @fact-type-fn-called?))
(is (true? @ancestors-fn-called?))))

{:keys [rulebase memory]} (eng/components orig)]

(testing "restoring without given memory"
(test-assemble rulebase nil))

(testing "restoring with memory"
(test-assemble rulebase memory))))

0 comments on commit f940884

Please sign in to comment.