|
397 | 397 | context
|
398 | 398 | free-processes
|
399 | 399 | free-threads
|
400 |
| - next-process |
401 |
| - on-threads-context |
402 | 400 | process->thread
|
403 | 401 | some-free-process
|
404 | 402 | thread->process])
|
|
598 | 596 | (not (:process op))
|
599 | 597 | (conj "no :process")
|
600 | 598 |
|
601 |
| - ; This is a tad expensive, sadly |
602 |
| - (not (.contains ^Set (free-threads ctx) |
603 |
| - (process->thread ctx (:process op)))) |
| 599 | + (not (->> op :process |
| 600 | + (context/process->thread ctx) |
| 601 | + (context/thread-free? ctx))) |
604 | 602 | (conj (str "process " (pr-str (:process op))
|
605 | 603 | " is not free"))))))]
|
606 | 604 | (when (seq problems)
|
607 | 605 | (throw+ {:type ::invalid-op
|
608 |
| - :context ctx |
| 606 | + :context (datafy ctx) |
609 | 607 | ;:res res
|
610 | 608 | :problems problems}
|
611 | 609 | nil
|
|
640 | 638 | [op (FriendlyExceptions. gen')])
|
641 | 639 | (catch Throwable t
|
642 | 640 | (throw+ {:type ::op-threw
|
643 |
| - :context ctx} |
| 641 | + :context (datafy ctx)} |
644 | 642 | t
|
645 | 643 | (with-out-str
|
646 | 644 | (print "Generator threw" (class t) "-" (.getMessage t) "when asked for an operation. Generator:\n")
|
647 | 645 | (binding [*print-length* 10]
|
648 | 646 | (pprint gen))
|
649 | 647 | (println "\nContext:\n")
|
650 |
| - (pprint ctx)))))) |
| 648 | + (pprint (datafy ctx))))))) |
651 | 649 |
|
652 | 650 | (update [this test ctx event]
|
653 | 651 | (try
|
654 | 652 | (when-let [gen' (update gen test ctx event)]
|
655 | 653 | (FriendlyExceptions. gen'))
|
656 | 654 | (catch Throwable t
|
657 | 655 | (throw+ {:type ::update-threw
|
658 |
| - :context ctx |
| 656 | + :context (datafy ctx) |
659 | 657 | :event event}
|
660 | 658 | t
|
661 | 659 | (with-out-str
|
662 | 660 | (print "Generator threw " t " when updated with an event. Generator:\n")
|
663 | 661 | (binding [*print-length* 10]
|
664 | 662 | (pprint gen))
|
665 | 663 | (println "\nContext:\n")
|
666 |
| - (pprint ctx) |
| 664 | + (pprint (datafy ctx)) |
667 | 665 | (println "Event:\n")
|
668 | 666 | (pprint event)))))))
|
669 | 667 |
|
|
798 | 796 | [f gen]
|
799 | 797 | (OnUpdate. f gen))
|
800 | 798 |
|
801 |
| -(defrecord OnThreads [f gen] |
| 799 | +(defn on-threads-context |
| 800 | + "For backwards compatibility; filters a context to just threads matching (f |
| 801 | + thread). Use context/make-thread-filter for performance." |
| 802 | + [f context] |
| 803 | + ((context/make-thread-filter f context) context)) |
| 804 | + |
| 805 | +(defrecord OnThreads [f context-filter gen] |
802 | 806 | Generator
|
803 | 807 | (op [this test ctx]
|
804 |
| - (when-let [[op gen'] (op gen test (on-threads-context f ctx))] |
805 |
| - [op (OnThreads. f gen')])) |
| 808 | + (when-let [[op gen'] (op gen test (context-filter ctx))] |
| 809 | + [op (OnThreads. f context-filter gen')])) |
806 | 810 |
|
807 | 811 | (update [this test ctx event]
|
808 | 812 | (if (f (process->thread ctx (:process event)))
|
809 |
| - (OnThreads. f (update gen test (on-threads-context f ctx) event)) |
| 813 | + (OnThreads. f context-filter (update gen test (context-filter ctx) event)) |
810 | 814 | this)))
|
811 | 815 |
|
812 | 816 | (defn on-threads
|
|
815 | 819 | generator: it will only include free threads and workers satisfying f.
|
816 | 820 | Updates are passed on only when the thread performing the update matches f."
|
817 | 821 | [f gen]
|
818 |
| - (OnThreads. f gen)) |
| 822 | + (OnThreads. f (context/make-thread-filter f) gen)) |
819 | 823 |
|
820 | 824 | (def on "For backwards compatibility" on-threads)
|
821 | 825 |
|
|
889 | 893 | 1 (first gens)
|
890 | 894 | (Any. (vec gens))))
|
891 | 895 |
|
892 |
| -(defrecord EachThread [fresh-gen gens] |
| 896 | +(defn each-thread-ensure-context-filters! |
| 897 | + "Ensures an EachThread has context filters for each thread." |
| 898 | + [context-filters ctx] |
| 899 | + (when-not (realized? context-filters) |
| 900 | + (deliver context-filters |
| 901 | + (reduce (fn compute-context-filters [cfs thread] |
| 902 | + (assoc cfs thread (context/make-thread-filter |
| 903 | + #{thread} |
| 904 | + ctx))) |
| 905 | + {} |
| 906 | + (context/all-threads ctx))))) |
| 907 | + |
| 908 | +(defrecord EachThread [fresh-gen context-filters gens] |
893 | 909 | ; fresh-gen is a generator we use to initialize a thread's state, the first
|
894 | 910 | ; time we see it.
|
| 911 | + ; context-filters is a promise of a map of threads to context filters; lazily |
| 912 | + ; initialized. |
895 | 913 | ; gens is a map of threads to generators.
|
896 | 914 | Generator
|
897 | 915 | (op [this test ctx]
|
| 916 | + (each-thread-ensure-context-filters! context-filters ctx) |
898 | 917 | (let [{:keys [op gen' thread] :as soonest}
|
899 | 918 | (->> (context/free-threads ctx)
|
900 | 919 | (keep (fn [thread]
|
901 | 920 | (let [gen (get gens thread fresh-gen)
|
902 | 921 | ; Give this generator a context *just* for one
|
903 | 922 | ; thread
|
904 |
| - ctx (on-threads-context #{thread} ctx)] |
| 923 | + ctx ((@context-filters thread) ctx)] |
905 | 924 | (when-let [[op gen'] (op gen test ctx)]
|
906 | 925 | {:op op
|
907 | 926 | :gen' gen'
|
908 | 927 | :thread thread}))))
|
909 | 928 | (reduce soonest-op-map nil))]
|
910 | 929 | (cond ; A free thread has an operation
|
911 |
| - soonest [op (EachThread. fresh-gen (assoc gens thread gen'))] |
| 930 | + soonest [op (EachThread. fresh-gen context-filters |
| 931 | + (assoc gens thread gen'))] |
912 | 932 |
|
913 | 933 | ; Some thread is busy; we can't tell what to do just yet
|
914 | 934 | (not= (context/free-thread-count ctx)
|
|
920 | 940 | nil)))
|
921 | 941 |
|
922 | 942 | (update [this test ctx event]
|
| 943 | + (each-thread-ensure-context-filters! context-filters ctx) |
923 | 944 | (let [process (:process event)
|
924 | 945 | thread (process->thread ctx process)
|
925 | 946 | gen (get gens thread fresh-gen)
|
926 |
| - ctx (on-threads-context #{thread} ctx) |
| 947 | + ctx ((@context-filters thread) ctx) |
927 | 948 | gen' (update gen test ctx event)]
|
928 |
| - (EachThread. fresh-gen (assoc gens thread gen'))))) |
| 949 | + (EachThread. fresh-gen context-filters (assoc gens thread gen'))))) |
929 | 950 |
|
930 | 951 | (defn each-thread
|
931 | 952 | "Takes a generator. Constructs a generator which maintains independent copies
|
932 | 953 | of that generator for every thread. Each generator sees exactly one thread in
|
933 | 954 | its free process list. Updates are propagated to the generator for the thread
|
934 | 955 | which emitted the operation."
|
935 | 956 | [gen]
|
936 |
| - (EachThread. gen {})) |
| 957 | + (EachThread. gen (promise) {})) |
937 | 958 |
|
938 |
| -(defrecord Reserve [ranges all-ranges gens] |
| 959 | +(defrecord Reserve [ranges all-ranges context-filters gens] |
939 | 960 | ; ranges is a collection of sets of threads engaged in each generator.
|
940 | 961 | ; all-ranges is the union of all ranges.
|
| 962 | + ; context-filters is a vector of context filtering functions, one for each |
| 963 | + ; range (and the default gen last). |
941 | 964 | ; gens is a vector of generators corresponding to ranges, followed by the
|
942 | 965 | ; default generator.
|
943 | 966 | Generator
|
|
948 | 971 | (fn [i threads]
|
949 | 972 | (let [gen (nth gens i)
|
950 | 973 | ; Restrict context to this range of threads
|
951 |
| - ctx (on-threads-context threads ctx)] |
| 974 | + ctx ((nth context-filters i) ctx)] |
952 | 975 | ; Ask this range's generator for an op
|
953 | 976 | (when-let [[op gen'] (op gen test ctx)]
|
954 | 977 | ; Remember our index
|
955 | 978 | {:op op
|
956 | 979 | :gen' gen'
|
957 | 980 | :weight (count threads)
|
958 | 981 | :i i}))))
|
959 |
| - ; And for the default generator, compute a context without any |
960 |
| - ; threads from defined ranges... |
961 |
| - (cons (let [ctx (on-threads-context (complement all-ranges) ctx)] |
| 982 | + ; And for the default generator... |
| 983 | + (cons (let [ctx ((peek context-filters) ctx)] |
962 | 984 | ; And construct a triple for the default generator
|
963 | 985 | (when-let [[op gen'] (op (peek gens) test ctx)]
|
964 | 986 | (assert ctx)
|
|
969 | 991 | (reduce soonest-op-map nil))]
|
970 | 992 | (when soonest
|
971 | 993 | ; A range has an operation to do!
|
972 |
| - [op (Reserve. ranges all-ranges (assoc gens i gen'))]))) |
| 994 | + [op (Reserve. ranges all-ranges context-filters (assoc gens i gen'))]))) |
973 | 995 |
|
974 | 996 | (update [this test ctx event]
|
975 | 997 | (let [process (:process event)
|
|
981 | 1003 | (inc i)))
|
982 | 1004 | 0
|
983 | 1005 | ranges)]
|
984 |
| - (Reserve. ranges all-ranges (c/update gens i update test ctx event))))) |
| 1006 | + (Reserve. ranges all-ranges context-filters |
| 1007 | + (c/update gens i update test ctx event))))) |
985 | 1008 |
|
986 | 1009 | (defn reserve
|
987 | 1010 | "Takes a series of count, generator pairs, and a final default generator.
|
|
1004 | 1027 | (partition 2)
|
1005 | 1028 | ; Construct [thread-set gen] tuples defining the range of
|
1006 | 1029 | ; thread indices covering a given generator, lower
|
1007 |
| - ; inclusive, upper exclusive. |
| 1030 | + ; inclusive, upper exclusive. TODO: I think there might be a |
| 1031 | + ; bug here: if we construct nested reserves or otherwise |
| 1032 | + ; restrict threads, an inner reserve might not understand |
| 1033 | + ; that its threads don't start at 0. |
1008 | 1034 | (reduce (fn [[n gens] [thread-count gen]]
|
1009 | 1035 | (let [n' (+ n thread-count)]
|
1010 | 1036 | [n' (conj gens [(set (range n n')) gen])]))
|
1011 | 1037 | [0 []])
|
1012 | 1038 | second)
|
1013 | 1039 | ranges (mapv first gens)
|
1014 | 1040 | all-ranges (reduce set/union ranges)
|
| 1041 | + ; Compute context filters for all ranges |
| 1042 | + context-filters (mapv context/make-thread-filter |
| 1043 | + (c/concat ranges |
| 1044 | + [(complement all-ranges)])) |
1015 | 1045 | gens (mapv second gens)
|
1016 | 1046 | default (last args)
|
1017 | 1047 | gens (conj gens default)]
|
1018 | 1048 | (assert default)
|
1019 |
| - (Reserve. ranges all-ranges gens))) |
| 1049 | + (Reserve. ranges all-ranges context-filters gens))) |
1020 | 1050 |
|
1021 | 1051 | (declare nemesis)
|
1022 | 1052 |
|
|
1116 | 1146 | (op [_ test ctx]
|
1117 | 1147 | (when-not (zero? remaining)
|
1118 | 1148 | (when-let [[op gen'] (op gen test ctx)]
|
1119 |
| - ; If you actually hit MIN_INT doing this... you probably have bigger |
1120 |
| - ; problems on your hands. |
1121 |
| - [op (Repeat. (dec remaining) gen)]))) |
| 1149 | + [op (Repeat. (max -1 (dec remaining)) gen)]))) |
1122 | 1150 |
|
1123 | 1151 | (update [this test ctx event]
|
1124 | 1152 | (Repeat. remaining (update gen test ctx event))))
|
|
1333 | 1361 | (defrecord Synchronize [gen]
|
1334 | 1362 | Generator
|
1335 | 1363 | (op [this test ctx]
|
1336 |
| - (if (= (context/free-threads ctx) (context/all-threads ctx)) |
| 1364 | + (if (= (context/free-thread-count ctx) |
| 1365 | + (context/all-thread-count ctx)) |
1337 | 1366 | ; We're ready, replace ourselves with the generator
|
1338 | 1367 | (op gen test ctx)
|
1339 | 1368 | ; Not yet
|
|
0 commit comments