Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 866 lines (801 sloc) 33.875 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866
module ThreadSafe
  # A Ruby port of the Doug Lea's jsr166e.ConcurrentHashMapV8 class version 1.59 available in public domain.
  # Original source code available here: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/ConcurrentHashMapV8.java?revision=1.59
  #
  # The Ruby port skips out the +TreeBin+ (red-black trees for use in bins
  # whose size exceeds a threshold).
  #
  # A hash table supporting full concurrency of retrievals and
  # high expected concurrency for updates. However, even though all
  # operations are thread-safe, retrieval operations do _not_ entail locking,
  # and there is _not_ any support for locking the entire table
  # in a way that prevents all access.
  #
  # Retrieval operations generally do not block, so may overlap with
  # update operations. Retrievals reflect the results of the most
  # recently _completed_ update operations holding upon their
  # onset. (More formally, an update operation for a given key bears a
  # _happens-before_ relation with any (non +nil+) retrieval for
  # that key reporting the updated value.) For aggregate operations
  # such as +clear()+, concurrent retrievals may reflect insertion or removal
  # of only some entries. Similarly, the +each_pair+ iterator yields elements
  # reflecting the state of the hash table at some point at or since
  # the start of the +each_pair+. Bear in mind that the results of
  # aggregate status methods including +size()+ and +empty?+} are typically
  # useful only when a map is not undergoing concurrent updates in other
  # threads. Otherwise the results of these methods reflect transient
  # states that may be adequate for monitoring or estimation purposes, but not
  # for program control.
  #
  # The table is dynamically expanded when there are too many
  # collisions (i.e., keys that have distinct hash codes but fall into
  # the same slot modulo the table size), with the expected average
  # effect of maintaining roughly two bins per mapping (corresponding
  # to a 0.75 load factor threshold for resizing). There may be much
  # variance around this average as mappings are added and removed, but
  # overall, this maintains a commonly accepted time/space tradeoff for
  # hash tables. However, resizing this or any other kind of hash
  # table may be a relatively slow operation. When possible, it is a
  # good idea to provide a size estimate as an optional :initial_capacity
  # initializer argument. An additional optional :load_factor constructor
  # argument provides a further means of customizing initial table capacity
  # by specifying the table density to be used in calculating the amount of
  # space to allocate for the given number of elements. Note that using
  # many keys with exactly the same +hash+ is a sure way to slow down
  # performance of any hash table.
  #
  # == Design overview
  #
  # The primary design goal of this hash table is to maintain
  # concurrent readability (typically method +[]+, but also
  # iteration and related methods) while minimizing update
  # contention. Secondary goals are to keep space consumption about
  # the same or better than plain +Hash+, and to support high
  # initial insertion rates on an empty table by many threads.
  #
  # Each key-value mapping is held in a +Node+. The validation-based
  # approach explained below leads to a lot of code sprawl because
  # retry-control precludes factoring into smaller methods.
  #
  # The table is lazily initialized to a power-of-two size upon the
  # first insertion. Each bin in the table normally contains a
  # list of +Node+s (most often, the list has only zero or one +Node+).
  # Table accesses require volatile/atomic reads, writes, and
  # CASes. The lists of nodes within bins are always accurately traversable
  # under volatile reads, so long as lookups check hash code
  # and non-nullness of value before checking key equality.
  #
  # We use the top two bits of +Node+ hash fields for control
  # purposes -- they are available anyway because of addressing
  # constraints. As explained further below, these top bits are
  # used as follows:
  # 00 - Normal
  # 01 - Locked
  # 11 - Locked and may have a thread waiting for lock
  # 10 - +Node+ is a forwarding node
  #
  # The lower 28 bits of each +Node+'s hash field contain a
  # the key's hash code, except for forwarding nodes, for which
  # the lower bits are zero (and so always have hash field == +MOVED+).
  #
  # Insertion (via +[]=+ or its variants) of the first node in an
  # empty bin is performed by just CASing it to the bin. This is
  # by far the most common case for put operations under most
  # key/hash distributions. Other update operations (insert,
  # delete, and replace) require locks. We do not want to waste
  # the space required to associate a distinct lock object with
  # each bin, so instead use the first node of a bin list itself as
  # a lock. Blocking support for these locks relies +Util::CheapLockable.
  # However, we also need a +try_lock+ construction, so we overlay
  # these by using bits of the +Node+ hash field for lock control (see above),
  # and so normally use builtin monitors only for blocking and signalling using
  # +cheap_wait+/+cheap_broadcast+ constructions. See +Node#try_await_lock+.
  #
  # Using the first node of a list as a lock does not by itself
  # suffice though: When a node is locked, any update must first
  # validate that it is still the first node after locking it, and
  # retry if not. Because new nodes are always appended to lists,
  # once a node is first in a bin, it remains first until deleted
  # or the bin becomes invalidated (upon resizing). However,
  # operations that only conditionally update may inspect nodes
  # until the point of update. This is a converse of sorts to the
  # lazy locking technique described by Herlihy & Shavit.
  #
  # The main disadvantage of per-bin locks is that other update
  # operations on other nodes in a bin list protected by the same
  # lock can stall, for example when user +eql?+ or mapping
  # functions take a long time. However, statistically, under
  # random hash codes, this is not a common problem. Ideally, the
  # frequency of nodes in bins follows a Poisson distribution
  # (http://en.wikipedia.org/wiki/Poisson_distribution) with a
  # parameter of about 0.5 on average, given the resizing threshold
  # of 0.75, although with a large variance because of resizing
  # granularity. Ignoring variance, the expected occurrences of
  # list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The
  # first values are:
  #
  # 0: 0.60653066
  # 1: 0.30326533
  # 2: 0.07581633
  # 3: 0.01263606
  # 4: 0.00157952
  # 5: 0.00015795
  # 6: 0.00001316
  # 7: 0.00000094
  # 8: 0.00000006
  # more: less than 1 in ten million
  #
  # Lock contention probability for two threads accessing distinct
  # elements is roughly 1 / (8 * #elements) under random hashes.
  #
  # The table is resized when occupancy exceeds a percentage
  # threshold (nominally, 0.75, but see below). Only a single
  # thread performs the resize (using field +size_control+, to arrange
  # exclusion), but the table otherwise remains usable for reads
  # and updates. Resizing proceeds by transferring bins, one by
  # one, from the table to the next table. Because we are using
  # power-of-two expansion, the elements from each bin must either
  # stay at same index, or move with a power of two offset. We
  # eliminate unnecessary node creation by catching cases where old
  # nodes can be reused because their next fields won't change. On
  # average, only about one-sixth of them need cloning when a table
  # doubles. The nodes they replace will be garbage collectable as
  # soon as they are no longer referenced by any reader thread that
  # may be in the midst of concurrently traversing table. Upon
  # transfer, the old table bin contains only a special forwarding
  # node (with hash field +MOVED+) that contains the next table as
  # its key. On encountering a forwarding node, access and update
  # operations restart, using the new table.
  #
  # Each bin transfer requires its bin lock. However, unlike other
  # cases, a transfer can skip a bin if it fails to acquire its
  # lock, and revisit it later. Method +rebuild+ maintains a buffer of
  # TRANSFER_BUFFER_SIZE bins that have been skipped because of failure
  # to acquire a lock, and blocks only if none are available
  # (i.e., only very rarely). The transfer operation must also ensure
  # that all accessible bins in both the old and new table are usable by
  # any traversal. When there are no lock acquisition failures, this is
  # arranged simply by proceeding from the last bin (+table.size - 1+) up
  # towards the first. Upon seeing a forwarding node, traversals arrange
  # to move to the new table without revisiting nodes. However, when any
  # node is skipped during a transfer, all earlier table bins may have
  # become visible, so are initialized with a reverse-forwarding node back
  # to the old table until the new ones are established. (This
  # sometimes requires transiently locking a forwarding node, which
  # is possible under the above encoding.) These more expensive
  # mechanics trigger only when necessary.
  #
  # The traversal scheme also applies to partial traversals of
  # ranges of bins (via an alternate Traverser constructor)
  # to support partitioned aggregate operations. Also, read-only
  # operations give up if ever forwarded to a null table, which
  # provides support for shutdown-style clearing, which is also not
  # currently implemented.
  #
  # Lazy table initialization minimizes footprint until first use.
  #
  # The element count is maintained using a +ThreadSafe::Util::Adder+,
  # which avoids contention on updates but can encounter cache thrashing
  # if read too frequently during concurrent access. To avoid reading so
  # often, resizing is attempted either when a bin lock is
  # contended, or upon adding to a bin already holding two or more
  # nodes (checked before adding in the +x_if_absent+ methods, after
  # adding in others). Under uniform hash distributions, the
  # probability of this occurring at threshold is around 13%,
  # meaning that only about 1 in 8 puts check threshold (and after
  # resizing, many fewer do so). But this approximation has high
  # variance for small table sizes, so we check on any collision
  # for sizes <= 64. The bulk putAll operation further reduces
  # contention by only committing count updates upon these size
  # checks.
  class AtomicReferenceCacheBackend
    class Table < Util::PowerOfTwoTuple
      def cas_new_node(i, hash, key, value)
        cas(i, nil, Node.new(hash, key, value))
      end

      def try_to_cas_in_computed(i, hash, key)
        succeeded = false
        new_value = nil
        new_node = Node.new(locked_hash = hash | LOCKED, key, NULL)
        if cas(i, nil, new_node)
          begin
            new_node.value = new_value = yield
            succeeded = true
          ensure
            volatile_set(i, nil) unless succeeded
            new_node.unlock_via_hash(locked_hash, hash)
          end
        end
        return succeeded, new_value
      end

      def try_lock_via_hash(i, node, node_hash)
        node.try_lock_via_hash(node_hash) do
          yield if volatile_get(i) == node
        end
      end
    end

    # Key-value entry. Nodes with a hash field of +MOVED+ are special,
    # and do not contain user keys or values. Otherwise, keys are never +nil+,
    # and +NULL+ +value+ fields indicate that a node is in the process
    # of being deleted or created. For purposes of read-only access, a key may be read
    # before a value, but can only be used after checking value to be +!= NULL+.
    class Node
      extend Util::Volatile
      attr_volatile :hash, :value, :next

      include Util::CheapLockable

      bit_shift = Util::FIXNUM_BIT_SIZE - 2 # need 2 bits for ourselves
      # Encodings for special uses of Node hash fields. See above for explanation.
      MOVED = ('10' << ('0' * bit_shift)).to_i(2) # hash field for forwarding nodes
      LOCKED = ('01' << ('0' * bit_shift)).to_i(2) # set/tested only as a bit
      WAITING = ('11' << ('0' * bit_shift)).to_i(2) # both bits set/tested together
      HASH_BITS = ('00' << ('1' * bit_shift)).to_i(2) # usable bits of normal node hash

      SPIN_LOCK_ATTEMPTS = Util::CPU_COUNT > 1 ? Util::CPU_COUNT * 2 : 0

      attr_reader :key

      def initialize(hash, key, value, next_node = nil)
        super()
        @key = key
        self.lazy_set_hash(hash)
        self.lazy_set_value(value)
        self.next = next_node
      end

      # Spins a while if +LOCKED+ bit set and this node is the first
      # of its bin, and then sets +WAITING+ bits on hash field and
      # blocks (once) if they are still set. It is OK for this
      # method to return even if lock is not available upon exit,
      # which enables these simple single-wait mechanics.
      #
      # The corresponding signalling operation is performed within
      # callers: Upon detecting that +WAITING+ has been set when
      # unlocking lock (via a failed CAS from non-waiting +LOCKED+
      # state), unlockers acquire the +cheap_synchronize+ lock and
      # perform a +cheap_broadcast+.
      def try_await_lock(table, i)
        if table && i >= 0 && i < table.size # bounds check, TODO: why are we bounds checking?
          spins = SPIN_LOCK_ATTEMPTS
          randomizer = base_randomizer = Util::XorShiftRandom.get
          while equal?(table.volatile_get(i)) && self.class.locked_hash?(my_hash = hash)
            if spins >= 0
              if (randomizer = (randomizer >> 1)).even? # spin at random
                if (spins -= 1) == 0
                  Thread.pass # yield before blocking
                else
                  randomizer = base_randomizer = Util::XorShiftRandom.xorshift(base_randomizer) if randomizer.zero?
                end
              end
            elsif cas_hash(my_hash, my_hash | WAITING)
              force_aquire_lock(table, i)
              break
            end
          end
        end
      end

      def key?(key)
        @key.eql?(key)
      end

      def matches?(key, hash)
        pure_hash == hash && key?(key)
      end

      def pure_hash
        hash & HASH_BITS
      end

      def try_lock_via_hash(node_hash = hash)
        if cas_hash(node_hash, locked_hash = node_hash | LOCKED)
          begin
            yield
          ensure
            unlock_via_hash(locked_hash, node_hash)
          end
        end
      end

      def locked?
        self.class.locked_hash?(hash)
      end

      def unlock_via_hash(locked_hash, node_hash)
        unless cas_hash(locked_hash, node_hash)
          self.hash = node_hash
          cheap_synchronize { cheap_broadcast }
        end
      end

      private
      def force_aquire_lock(table, i)
        cheap_synchronize do
          if equal?(table.volatile_get(i)) && (hash & WAITING) == WAITING
            cheap_wait
          else
            cheap_broadcast # possibly won race vs signaller
          end
        end
      end

      class << self
        def locked_hash?(hash)
          (hash & LOCKED) != 0
        end
      end
    end

    # The class allows for +nil+ values to be stored, so a special +NULL+ token is required to indicate the "nil-ness".
    NULL = Object.new
    NOW_RESIZING = -1

    # shorthands
    MOVED = Node::MOVED
    LOCKED = Node::LOCKED
    WAITING = Node::WAITING
    HASH_BITS = Node::HASH_BITS

    DEFAULT_CAPACITY = 16
    MAX_CAPACITY = Util::MAX_INT

    # The buffer size for skipped bins during transfers. The
    # value is arbitrary but should be large enough to avoid
    # most locking stalls during resizes.
    TRANSFER_BUFFER_SIZE = 32

    extend Util::Volatile
    attr_volatile :table, # The array of bins. Lazily initialized upon first insertion. Size is always a power of two.

                  # Table initialization and resizing control. When negative, the
                  # table is being initialized or resized. Otherwise, when table is
                  # null, holds the initial table size to use upon creation, or 0
                  # for default. After initialization, holds the next element count
                  # value upon which to resize the table.
                  :size_control

    def initialize(options = nil)
      super()
      @counter = Util::Adder.new

      initial_capacity = options && options[:initial_capacity] || DEFAULT_CAPACITY
      concurrency_level = options && options[:concurrency_level]

      initial_capacity = concurrency_level if concurrency_level && concurrency_level > initial_capacity # Use at least as many bins

      self.size_control = (capacity = table_size_for(initial_capacity)) > MAX_CAPACITY ? MAX_CAPACITY : capacity
    end

    def [](key)
      internal_get(key)
    end

    def key?(key)
      internal_get(key, NULL) != NULL
    end

    def []=(key, value)
      get_and_set(key, value)
      value
    end

    def compute_if_absent(key)
      hash = key_hash(key)
      current_table = table || initialize_table
      while true
        if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
          succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield }
          if succeeded
            increment_size
            return new_value
          end
        elsif (node_hash = node.hash) == MOVED
          current_table = node.key
        elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS))
          return current_value
        elsif Node.locked_hash?(node_hash)
          try_await_lock(current_table, i, node)
        else
          succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield }
          return value if succeeded
        end
      end
    end

    if defined?(RUBY_ENGINE) && RUBY_ENGINE == 'rbx' && false # <- remove "&& false" to fix the bug on Rubinius
      def compute_if_absent(key)
        hash = key_hash(key)
        current_table = table || initialize_table
        while true
          if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
            succeeded, new_value = current_table.try_to_cas_in_computed(i, hash, key) { yield }
            if succeeded
              increment_size
              return new_value
            end
          elsif (node_hash = node.hash) == MOVED
            current_table = node.key
          # TODO: fix this
          # START Rubinius hack
          # Rubinius seems to go crazy on the full test_cache_loop run without this inlined first iteration of find_value_in_node_list
          elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (current_value = node.value)
            return current_value
          # END Rubinius hack
          elsif NULL != (current_value = find_value_in_node_list(node, key, hash, node_hash & HASH_BITS))
            return current_value
          elsif Node.locked_hash?(node_hash)
            try_await_lock(current_table, i, node)
          else
            succeeded, value = attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash) { yield }
            return value if succeeded
          end
        end
      end
    end

    def replace_pair(key, old_value, new_value)
      NULL != internal_replace(key, new_value) {|current_value| current_value == old_value}
    end

    def replace_if_exists(key, new_value)
      if (result = internal_replace(key, new_value)) && NULL != result
        result
      end
    end

    def get_and_set(key, value) # internalPut in the original CHMV8
      hash = key_hash(key)
      current_table = table || initialize_table
      while current_table
        if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
          if current_table.cas_new_node(i, hash, key, value)
            increment_size
            break
          end
        elsif (node_hash = node.hash) == MOVED
          current_table = node.key
        elsif Node.locked_hash?(node_hash)
          try_await_lock(current_table, i, node)
        else
          succeeded, old_value = attempt_get_and_set(key, value, hash, current_table, i, node, node_hash)
          break old_value if succeeded
        end
      end
    end

    def delete(key)
      replace_if_exists(key, NULL)
    end

    def delete_pair(key, value)
      result = internal_replace(key, NULL) {|current_value| value == current_value}
      if result && NULL != result
        !!result
      else
        false
      end
    end

    def each_pair
      return self unless current_table = table
      current_table_size = base_size = current_table.size
      i = base_index = 0
      while base_index < base_size
        if node = current_table.volatile_get(i)
          if node.hash == MOVED
            current_table = node.key
            current_table_size = current_table.size
          else
            begin
              if NULL != (value = node.value) # skip deleted or special nodes
                yield node.key, value
              end
            end while node = node.next
          end
        end

        if (i_with_base = i + base_size) < current_table_size
          i = i_with_base # visit upper slots if present
        else
          i = base_index += 1
        end
      end
      self
    end

    def size
      (sum = @counter.sum) < 0 ? 0 : sum # ignore transient negative values
    end

    def empty?
      size == 0
    end

    # Implementation for clear. Steps through each bin, removing all nodes.
    def clear
      return self unless current_table = table
      current_table_size = current_table.size
      deleted_count = i = 0
      while i < current_table_size
        if !(node = current_table.volatile_get(i))
          i += 1
        elsif (node_hash = node.hash) == MOVED
          current_table = node.key
          current_table_size = current_table.size
        elsif Node.locked_hash?(node_hash)
          decrement_size(deleted_count) # opportunistically update count
          deleted_count = 0
          node.try_await_lock(current_table, i)
        else
          current_table.try_lock_via_hash(i, node, node_hash) do
            begin
              deleted_count += 1 if NULL != node.value # recheck under lock
              node.value = nil
            end while node = node.next
            current_table.volatile_set(i, nil)
            i += 1
          end
        end
      end
      decrement_size(deleted_count)
      self
    end

    private
    # Internal versions of the insertion methods, each a
    # little more complicated than the last. All have
    # the same basic structure:
    # 1. If table uninitialized, create
    # 2. If bin empty, try to CAS new node
    # 3. If bin stale, use new table
    # 4. Lock and validate; if valid, scan and add or update
    #
    # The others interweave other checks and/or alternative actions:
    # * Plain +get_and_set+ checks for and performs resize after insertion.
    # * compute_if_absent prescans for mapping without lock (and fails to add
    # if present), which also makes pre-emptive resize checks worthwhile.
    #
    # Someday when details settle down a bit more, it might be worth
    # some factoring to reduce sprawl.
    def internal_replace(key, value, &block)
      hash = key_hash(key)
      current_table = table
      while current_table
        if !(node = current_table.volatile_get(i = current_table.hash_to_index(hash)))
          break
        elsif (node_hash = node.hash) == MOVED
          current_table = node.key
        elsif (node_hash & HASH_BITS) != hash && !node.next # precheck
          break # rules out possible existence
        elsif Node.locked_hash?(node_hash)
          try_await_lock(current_table, i, node)
        else
          succeeded, old_value = attempt_internal_replace(key, value, hash, current_table, i, node, node_hash, &block)
          return old_value if succeeded
        end
      end
      NULL
    end

    def attempt_internal_replace(key, value, hash, current_table, i, node, node_hash)
      current_table.try_lock_via_hash(i, node, node_hash) do
        predecessor_node = nil
        old_value = NULL
        begin
          if node.matches?(key, hash) && NULL != (current_value = node.value)
            if !block_given? || yield(current_value)
              old_value = current_value
              if NULL == (node.value = value)
                if predecessor_node
                  predecessor_node.next = node.next
                else
                  current_table.volatile_set(i, node.next)
                end
                decrement_size
              end
            end
            break
          end

          predecessor_node = node
        end while node = node.next

        return true, old_value
      end
    end

    def internal_get(key, else_value = nil)
      hash = key_hash(key)
      current_table = table
      while current_table
        node = current_table.volatile_get_by_hash(hash)
        current_table =
          while node
            if (node_hash = node.hash) == MOVED
              break node.key
            elsif (node_hash & HASH_BITS) == hash && node.key?(key) && NULL != (value = node.value)
              return value
            end
            node = node.next
          end
      end
      else_value
    end

    def find_value_in_node_list(node, key, hash, pure_hash)
      do_check_for_resize = false
      while true
        if pure_hash == hash && node.key?(key) && NULL != (value = node.value)
          return value
        elsif node = node.next
          do_check_for_resize = true # at least 2 nodes -> check for resize
          pure_hash = node.pure_hash
        else
          return NULL
        end
      end
    ensure
      check_for_resize if do_check_for_resize
    end

    def attempt_internal_compute_if_absent(key, hash, current_table, i, node, node_hash)
      added = false
      current_table.try_lock_via_hash(i, node, node_hash) do
        while true
          if node.matches?(key, hash) && NULL != (value = node.value)
            return true, value
          end
          last = node
          unless node = node.next
            last.next = Node.new(hash, key, value = yield)
            added = true
            increment_size
            return true, value
          end
        end
      end
    ensure
      check_for_resize if added
    end

    def attempt_get_and_set(key, value, hash, current_table, i, node, node_hash)
      node_nesting = nil
      current_table.try_lock_via_hash(i, node, node_hash) do
        node_nesting = 1
        old_value = nil
        found_old_value = false
        while node
          if node.matches?(key, hash) && NULL != (old_value = node.value)
            found_old_value = true
            node.value = value
            break
          end
          last = node
          unless node = node.next
            last.next = Node.new(hash, key, value)
            break
          end
          node_nesting += 1
        end

        return true, old_value if found_old_value
        increment_size
        true
      end
    ensure
      check_for_resize if node_nesting && (node_nesting > 1 || current_table.size <= 64)
    end

    def initialize_copy(other)
      super
      @counter = Util::Adder.new
      self.table = nil
      self.size_control = (other_table = other.table) ? other_table.size : DEFAULT_CAPACITY
      self
    end

    def try_await_lock(current_table, i, node)
      check_for_resize # try resizing if can't get lock
      node.try_await_lock(current_table, i)
    end

    def key_hash(key)
      key.hash & HASH_BITS
    end

    # Returns a power of two table size for the given desired capacity.
    def table_size_for(entry_count)
      size = 2
      size <<= 1 while size < entry_count
      size
    end

    # Initializes table, using the size recorded in +size_control+.
    def initialize_table
      until current_table ||= table
        if (size_ctrl = size_control) == NOW_RESIZING
          Thread.pass # lost initialization race; just spin
        else
          try_in_resize_lock(current_table, size_ctrl) do
            initial_size = size_ctrl > 0 ? size_ctrl : DEFAULT_CAPACITY
            current_table = self.table = Table.new(initial_size)
            initial_size - (initial_size >> 2) # 75% load factor
          end
        end
      end
      current_table
    end

    # If table is too small and not already resizing, creates next
    # table and transfers bins. Rechecks occupancy after a transfer
    # to see if another resize is already needed because resizings
    # are lagging additions.
    def check_for_resize
      while (current_table = table) && MAX_CAPACITY > (table_size = current_table.size) && NOW_RESIZING != (size_ctrl = size_control) && size_ctrl < @counter.sum
        try_in_resize_lock(current_table, size_ctrl) do
          self.table = rebuild(current_table)
          (table_size << 1) - (table_size >> 1) # 75% load factor
        end
      end
    end

    def try_in_resize_lock(current_table, size_ctrl)
      if cas_size_control(size_ctrl, NOW_RESIZING)
        begin
          if current_table == table # recheck under lock
            size_ctrl = yield # get new size_control
          end
        ensure
          self.size_control = size_ctrl
        end
      end
    end

    # Moves and/or copies the nodes in each bin to new table. See above for explanation.
    def rebuild(table)
      old_table_size = table.size
      new_table = table.next_in_size_table
      # puts "#{old_table_size} -> #{new_table.size}"
      forwarder = Node.new(MOVED, new_table, NULL)
      rev_forwarder = nil
      locked_indexes = nil # holds bins to revisit; nil until needed
      locked_arr_idx = 0
      bin = old_table_size - 1
      i = bin
      while true
        if !(node = table.volatile_get(i))
          # no lock needed (or available) if bin >= 0, because we're not popping values from locked_indexes until we've run through the whole table
          redo unless (bin >= 0 ? table.cas(i, nil, forwarder) : lock_and_clean_up_reverse_forwarders(table, old_table_size, new_table, i, forwarder))
        elsif Node.locked_hash?(node_hash = node.hash)
          locked_indexes ||= Array.new
          if bin < 0 && locked_arr_idx > 0
            locked_arr_idx -= 1
            i, locked_indexes[locked_arr_idx] = locked_indexes[locked_arr_idx], i # swap with another bin
            redo
          end
          if bin < 0 || locked_indexes.size >= TRANSFER_BUFFER_SIZE
            node.try_await_lock(table, i) # no other options -- block
            redo
          end
          rev_forwarder ||= Node.new(MOVED, table, NULL)
          redo unless table.volatile_get(i) == node && node.locked? # recheck before adding to list
          locked_indexes << i
          new_table.volatile_set(i, rev_forwarder)
          new_table.volatile_set(i + old_table_size, rev_forwarder)
        else
          redo unless split_old_bin(table, new_table, i, node, node_hash, forwarder)
        end

        if bin > 0
          i = (bin -= 1)
        elsif locked_indexes && !locked_indexes.empty?
          bin = -1
          i = locked_indexes.pop
          locked_arr_idx = locked_indexes.size - 1
        else
          return new_table
        end
      end
    end

    def lock_and_clean_up_reverse_forwarders(old_table, old_table_size, new_table, i, forwarder)
      # transiently use a locked forwarding node
      locked_forwarder = Node.new(moved_locked_hash = MOVED | LOCKED, new_table, NULL)
      if old_table.cas(i, nil, locked_forwarder)
        new_table.volatile_set(i, nil) # kill the potential reverse forwarders
        new_table.volatile_set(i + old_table_size, nil) # kill the potential reverse forwarders
        old_table.volatile_set(i, forwarder)
        locked_forwarder.unlock_via_hash(moved_locked_hash, MOVED)
        true
      end
    end

    # Splits a normal bin with list headed by e into lo and hi parts; installs in given table.
    def split_old_bin(table, new_table, i, node, node_hash, forwarder)
      table.try_lock_via_hash(i, node, node_hash) do
        split_bin(new_table, i, node, node_hash)
        table.volatile_set(i, forwarder)
      end
    end

    def split_bin(new_table, i, node, node_hash)
      bit = new_table.size >> 1 # bit to split on
      run_bit = node_hash & bit
      last_run = nil
      low = nil
      high = nil
      current_node = node
      # this optimises for the lowest amount of volatile writes and objects created
      while current_node = current_node.next
        unless (b = current_node.hash & bit) == run_bit
          run_bit = b
          last_run = current_node
        end
      end
      if run_bit == 0
        low = last_run
      else
        high = last_run
      end
      current_node = node
      until current_node == last_run
        pure_hash = current_node.pure_hash
        if (pure_hash & bit) == 0
          low = Node.new(pure_hash, current_node.key, current_node.value, low)
        else
          high = Node.new(pure_hash, current_node.key, current_node.value, high)
        end
        current_node = current_node.next
      end
      new_table.volatile_set(i, low)
      new_table.volatile_set(i + bit, high)
    end

    def increment_size
      @counter.increment
    end

    def decrement_size(by = 1)
      @counter.add(-by)
    end
  end
end
Something went wrong with that request. Please try again.