From 88904a3659fe4a81bdfb2a6b615894d926af3fe1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 28 Mar 2014 23:02:11 -0700 Subject: [PATCH] Make TimeStampedWeakValueHashMap a wrapper of TimeStampedHashMap This allows us to get rid of WrappedJavaHashMap without much duplicate code. --- .../scala/org/apache/spark/SparkContext.scala | 1 - .../apache/spark/storage/BlockManager.scala | 7 +- .../spark/util/TimeStampedHashMap.scala | 117 +++++++--- .../util/TimeStampedWeakValueHashMap.scala | 164 +++++++------- .../spark/util/WrappedJavaHashMap.scala | 152 ------------- .../spark/util/WrappedJavaHashMapSuite.scala | 206 ------------------ 6 files changed, 168 insertions(+), 479 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala delete mode 100644 core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 79574c271cfb6..13fba1e0dfe5d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index dd2dbd1c8a397..991881b00c0eb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -209,7 +209,7 @@ private[spark] class BlockManager( } } - /** Return the status of the block identified by the given ID, if it exists. */ + /** Get the BlockStatus for the block identified by the given ID, if it exists.*/ def getStatus(blockId: BlockId): Option[BlockStatus] = { blockInfo.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L @@ -635,9 +635,10 @@ private[spark] class BlockManager( diskStore.putValues(blockId, iterator, level, askForBytes) case ArrayBufferValues(array) => diskStore.putValues(blockId, array, level, askForBytes) - case ByteBufferValues(bytes) => + case ByteBufferValues(bytes) => { bytes.rewind() diskStore.putBytes(blockId, bytes, level) + } } size = res.size res.data match { @@ -872,7 +873,7 @@ private[spark] class BlockManager( } private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)) { - val iterator = blockInfo.internalMap.entrySet().iterator() + val iterator = blockInfo.getEntrySet.iterator while (iterator.hasNext) { val entry = iterator.next() val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index c4d770fecdf74..1721818c212f9 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -17,64 +17,108 @@ package org.apache.spark.util +import java.util.Set +import java.util.Map.Entry import java.util.concurrent.ConcurrentHashMap +import scala.collection.{immutable, JavaConversions, mutable} + import org.apache.spark.Logging -private[util] case class TimeStampedValue[T](timestamp: Long, value: T) +private[spark] case class TimeStampedValue[V](value: V, timestamp: Long) /** - * A map that stores the timestamp of when a key was inserted along with the value. If specified, - * the timestamp of each pair can be updated every time it is accessed. - * Key-value pairs whose timestamps are older than a particular - * threshold time can then be removed using the clearOldValues method. It exposes a - * scala.collection.mutable.Map interface to allow it to be a drop-in replacement for Scala - * HashMaps. - * - * Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe. + * This is a custom implementation of scala.collection.mutable.Map which stores the insertion + * timestamp along with each key-value pair. If specified, the timestamp of each pair can be + * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can then be removed using the clearOldValues method. This is intended to + * be a drop-in replacement of scala.collection.mutable.HashMap. * - * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be - * updated when it is accessed + * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed */ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false) - extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging { + extends mutable.Map[A, B]() with Logging { - private[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]() + private val internalMap = new ConcurrentHashMap[A, TimeStampedValue[B]]() - private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { - new TimeStampedHashMap[K1, V1]() + def get(key: A): Option[B] = { + val value = internalMap.get(key) + if (value != null && updateTimeStampOnGet) { + internalMap.replace(key, value, TimeStampedValue(value.value, currentTime)) + } + Option(value).map(_.value) } - def internalMap = internalJavaMap + def iterator: Iterator[(A, B)] = { + val jIterator = getEntrySet.iterator() + JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue.value)) + } - override def get(key: A): Option[B] = { - val timeStampedValue = internalMap.get(key) - if (updateTimeStampOnGet && timeStampedValue != null) { - internalJavaMap.replace(key, timeStampedValue, - TimeStampedValue(currentTime, timeStampedValue.value)) - } - Option(timeStampedValue).map(_.value) + def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet() + + override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { + val newMap = new TimeStampedHashMap[A, B1] + val oldInternalMap = this.internalMap.asInstanceOf[ConcurrentHashMap[A, TimeStampedValue[B1]]] + newMap.internalMap.putAll(oldInternalMap) + kv match { case (a, b) => newMap.internalMap.put(a, TimeStampedValue(b, currentTime)) } + newMap } - @inline override protected def externalValueToInternalValue(v: B): TimeStampedValue[B] = { - new TimeStampedValue(currentTime, v) + + override def - (key: A): mutable.Map[A, B] = { + val newMap = new TimeStampedHashMap[A, B] + newMap.internalMap.putAll(this.internalMap) + newMap.internalMap.remove(key) + newMap + } + + override def += (kv: (A, B)): this.type = { + kv match { case (a, b) => internalMap.put(a, TimeStampedValue(b, currentTime)) } + this + } + + override def -= (key: A): this.type = { + internalMap.remove(key) + this + } + + override def update(key: A, value: B) { + this += ((key, value)) } - @inline override protected def internalValueToExternalValue(iv: TimeStampedValue[B]): B = { - iv.value + override def apply(key: A): B = { + val value = internalMap.get(key) + Option(value).map(_.value).getOrElse { throw new NoSuchElementException() } } - /** Atomically put if a key is absent. This exposes the existing API of ConcurrentHashMap. */ + override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = { + JavaConversions.mapAsScalaConcurrentMap(internalMap) + .map { case (k, TimeStampedValue(v, t)) => (k, v) } + .filter(p) + } + + override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]() + + override def size: Int = internalMap.size + + override def foreach[U](f: ((A, B)) => U) { + val iterator = getEntrySet.iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val kv = (entry.getKey, entry.getValue.value) + f(kv) + } + } + + // Should we return previous value directly or as Option? def putIfAbsent(key: A, value: B): Option[B] = { - val prev = internalJavaMap.putIfAbsent(key, TimeStampedValue(currentTime, value)) + val prev = internalMap.putIfAbsent(key, TimeStampedValue(value, currentTime)) Option(prev).map(_.value) } - /** - * Removes old key-value pairs that have timestamp earlier than `threshTime`, - * calling the supplied function on each such entry before removing. - */ + def toMap: immutable.Map[A, B] = iterator.toMap + def clearOldValues(threshTime: Long, f: (A, B) => Unit) { - val iterator = internalJavaMap.entrySet().iterator() + val iterator = getEntrySet.iterator() while (iterator.hasNext) { val entry = iterator.next() if (entry.getValue.timestamp < threshTime) { @@ -86,11 +130,12 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa } /** - * Removes old key-value pairs that have timestamp earlier than `threshTime` + * Removes old key-value pairs that have timestamp earlier than `threshTime`. */ def clearOldValues(threshTime: Long) { clearOldValues(threshTime, (_, _) => ()) } - private def currentTime: Long = System.currentTimeMillis() + private def currentTime: Long = System.currentTimeMillis + } diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala index 9f3247a27ba38..f814f58261bf3 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala @@ -18,113 +18,115 @@ package org.apache.spark.util import java.lang.ref.WeakReference -import java.util -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConversions - -import org.apache.spark.Logging - -private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: WeakReference[T]) { - def this(timestamp: Long, value: T) = this(timestamp, new WeakReference[T](value)) -} +import scala.collection.{immutable, mutable} /** - * A map that stores the timestamp of when a key was inserted along with the value, - * while ensuring that the values are weakly referenced. If the value is garbage collected and - * the weak reference is null, get() operation returns the key be non-existent. However, - * the key is actually not removed in the current implementation. Key-value pairs whose - * timestamps are older than a particular threshold time can then be removed using the - * clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a - * drop-in replacement for Scala HashMaps. + * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped. + * + * If the value is garbage collected and the weak reference is null, get() operation returns + * a non-existent value. However, the corresponding key is actually not removed in the current + * implementation. Key-value pairs whose timestamps are older than a particular threshold time + * can then be removed using the clearOldValues method. It exposes a scala.collection.mutable.Map + * interface to allow it to be a drop-in replacement for Scala HashMaps. * * Internally, it uses a Java ConcurrentHashMap, so all operations on this HashMap are thread-safe. + * + * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed. */ +private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false) + extends mutable.Map[A, B]() { + + import TimeStampedWeakValueHashMap._ -private[spark] class TimeStampedWeakValueHashMap[A, B]() - extends WrappedJavaHashMap[A, B, A, TimeStampedWeakValue[B]] with Logging { + private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet) - /** Number of inserts after which keys whose weak ref values are null will be cleaned */ - private val CLEANUP_INTERVAL = 1000 + def get(key: A): Option[B] = internalMap.get(key) - /** Counter for counting the number of inserts */ - private val insertCounts = new AtomicInteger(0) + def iterator: Iterator[(A, B)] = internalMap.iterator + + override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { + val newMap = new TimeStampedWeakValueHashMap[A, B1] + newMap.internalMap += kv + newMap + } - private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = { - new ConcurrentHashMap[A, TimeStampedWeakValue[B]]() + override def - (key: A): mutable.Map[A, B] = { + val newMap = new TimeStampedWeakValueHashMap[A, B] + newMap.internalMap -= key + newMap } - private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { - new TimeStampedWeakValueHashMap[K1, V1]() + override def += (kv: (A, B)): this.type = { + internalMap += kv + this } - override def +=(kv: (A, B)): this.type = { - // Cleanup null value at certain intervals - if (insertCounts.incrementAndGet() % CLEANUP_INTERVAL == 0) { - cleanNullValues() - } - super.+=(kv) + override def -= (key: A): this.type = { + internalMap -= key + this } - override def get(key: A): Option[B] = { - Option(internalJavaMap.get(key)).flatMap { weakValue => - val value = weakValue.weakValue.get - if (value == null) { - internalJavaMap.remove(key) - } - Option(value) - } + override def update(key: A, value: B) = this += ((key, value)) + + override def apply(key: A): B = internalMap.apply(key) + + override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = internalMap.filter(p) + + override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]() + + override def size: Int = internalMap.size + + override def foreach[U](f: ((A, B)) => U) = internalMap.foreach(f) + + def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value) + + def toMap: immutable.Map[A, B] = iterator.toMap + + /** + * Remove old key-value pairs that have timestamp earlier than `threshTime`. + */ + def clearOldValues(threshTime: Long) = internalMap.clearOldValues(threshTime) + +} + +/** + * Helper methods for converting to and from WeakReferences. + */ +private[spark] object TimeStampedWeakValueHashMap { + + /* Implicit conversion methods to WeakReferences */ + + implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v) + + implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = { + kv match { case (k, v) => (k, toWeakReference(v)) } } - @inline override protected def externalValueToInternalValue(v: B): TimeStampedWeakValue[B] = { - new TimeStampedWeakValue(currentTime, v) + implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = { + (kv: (K, WeakReference[V])) => p(kv) } - @inline override protected def internalValueToExternalValue(iv: TimeStampedWeakValue[B]): B = { - iv.weakValue.get + /* Implicit conversion methods from WeakReferences */ + + implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get + + implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = { + v.map(fromWeakReference) } - override def iterator: Iterator[(A, B)] = { - val iterator = internalJavaMap.entrySet().iterator() - JavaConversions.asScalaIterator(iterator).flatMap(kv => { - val (key, value) = (kv.getKey, kv.getValue.weakValue.get) - if (value != null) Seq((key, value)) else Seq.empty - }) + implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = { + kv match { case (k, v) => (k, fromWeakReference(v)) } } - /** - * Removes old key-value pairs that have timestamp earlier than `threshTime`, - * calling the supplied function on each such entry before removing. - */ - def clearOldValues(threshTime: Long, f: (A, B) => Unit = null) { - val iterator = internalJavaMap.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - if (entry.getValue.timestamp < threshTime) { - val value = entry.getValue.weakValue.get - if (f != null && value != null) { - f(entry.getKey, value) - } - logDebug("Removing key " + entry.getKey) - iterator.remove() - } - } + implicit def fromWeakReferenceIterator[K, V]( + it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = { + it.map(fromWeakReferenceTuple) } - /** - * Removes keys whose weak referenced values have become null. - */ - private def cleanNullValues() { - val iterator = internalJavaMap.entrySet().iterator() - while (iterator.hasNext) { - val entry = iterator.next() - if (entry.getValue.weakValue.get == null) { - logDebug("Removing key " + entry.getKey) - iterator.remove() - } - } + implicit def fromWeakReferenceMap[K, V]( + map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = { + mutable.Map(map.mapValues(fromWeakReference).toSeq: _*) } - private def currentTime = System.currentTimeMillis() } diff --git a/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala b/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala deleted file mode 100644 index 6cc3007f5d7ac..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/WrappedJavaHashMap.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.mutable.Map -import java.util.{Map => JMap} -import java.util.Map.{Entry => JMapEntry} -import scala.collection.{immutable, JavaConversions} -import scala.reflect.ClassTag - -/** - * Convenient wrapper class for exposing Java HashMaps as Scala Maps even if the - * exposed key-value type is different from the internal type. This allows these - * implementations of WrappedJavaHashMap to be drop-in replacements for Scala HashMaps. - * - * While Java <-> Scala conversion methods exists, its hard to understand the performance - * implications and thread safety of the Scala wrapper. This class allows you to convert - * between types and applying the necessary overridden methods to take care of performance. - * - * Note that the threading behavior of an implementation of WrappedJavaHashMap is tied to that of - * the internal Java HashMap used in the implementation. Each implementation must use - * necessary traits (e.g, scala.collection.mutable.SynchronizedMap), etc. to achieve the - * desired thread safety. - * - * @tparam K External key type - * @tparam V External value type - * @tparam IK Internal key type - * @tparam IV Internal value type - */ -private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V] { - - /* Methods that must be defined. */ - - /** - * Internal Java HashMap that is being wrapped. - * Scoped private[util] so that rest of Spark code cannot - * directly access the internal map. - */ - private[util] val internalJavaMap: JMap[IK, IV] - - /** Method to get a new instance of the internal Java HashMap. */ - private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] - - /* - Methods that convert between internal and external types. These implementations - optimistically assume that the internal types are same as external types. These must - be overridden if the internal and external types are different. Otherwise there will be - runtime exceptions. - */ - - @inline protected def externalKeyToInternalKey(k: K): IK = { - k.asInstanceOf[IK] // works only if K is same or subclass of K - } - - @inline protected def externalValueToInternalValue(v: V): IV = { - v.asInstanceOf[IV] // works only if V is same or subclass of - } - - @inline protected def internalKeyToExternalKey(ik: IK): K = { - ik.asInstanceOf[K] - } - - @inline protected def internalValueToExternalValue(iv: IV): V = { - iv.asInstanceOf[V] - } - - @inline protected def internalPairToExternalPair(ip: JMapEntry[IK, IV]): (K, V) = { - (internalKeyToExternalKey(ip.getKey), internalValueToExternalValue(ip.getValue) ) - } - - /* Implicit methods to convert the types. */ - - @inline implicit private def convExtKeyToIntKey(k: K) = externalKeyToInternalKey(k) - - @inline implicit private def convExtValueToIntValue(v: V) = externalValueToInternalValue(v) - - @inline implicit private def convIntKeyToExtKey(ia: IK) = internalKeyToExternalKey(ia) - - @inline implicit private def convIntValueToExtValue(ib: IV) = internalValueToExternalValue(ib) - - @inline implicit private def convIntPairToExtPair(ip: JMapEntry[IK, IV]) = { - internalPairToExternalPair(ip) - } - - /* Methods that must be implemented for a scala.collection.mutable.Map */ - - def get(key: K): Option[V] = { - Option(internalJavaMap.get(key)) - } - - def iterator: Iterator[(K, V)] = { - val jIterator = internalJavaMap.entrySet().iterator() - JavaConversions.asScalaIterator(jIterator).map(kv => convIntPairToExtPair(kv)) - } - - /* Other methods that are implemented to ensure performance. */ - - def +=(kv: (K, V)): this.type = { - internalJavaMap.put(kv._1, kv._2) - this - } - - def -=(key: K): this.type = { - internalJavaMap.remove(key) - this - } - - override def + [V1 >: V](kv: (K, V1)): Map[K, V1] = { - val newMap = newInstance[K, V1]() - newMap.internalJavaMap.asInstanceOf[JMap[IK, IV]].putAll(this.internalJavaMap) - newMap += kv - newMap - } - - override def - (key: K): Map[K, V] = { - val newMap = newInstance[K, V]() - newMap.internalJavaMap.asInstanceOf[JMap[IK, IV]].putAll(this.internalJavaMap) - newMap -= key - } - - override def foreach[U](f: ((K, V)) => U) { - val jIterator = internalJavaMap.entrySet().iterator() - while(jIterator.hasNext) { - f(jIterator.next()) - } - } - - override def empty: Map[K, V] = newInstance[K, V]() - - override def size: Int = internalJavaMap.size - - override def filter(p: ((K, V)) => Boolean): Map[K, V] = { - newInstance[K, V]() ++= iterator.filter(p) - } - - def toMap: immutable.Map[K, V] = iterator.toMap -} diff --git a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala deleted file mode 100644 index f6e6a4c77c820..0000000000000 --- a/core/src/test/scala/org/apache/spark/util/WrappedJavaHashMapSuite.scala +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.lang.ref.WeakReference -import java.util - -import scala.collection.mutable.{ArrayBuffer, HashMap, Map} -import scala.util.Random - -import org.scalatest.FunSuite - -class WrappedJavaHashMapSuite extends FunSuite { - - // Test the testMap function - a Scala HashMap should obviously pass - testMap(new HashMap[String, String]()) - - // Test a simple WrappedJavaHashMap - testMap(new TestMap[String, String]()) - - // Test TimeStampedHashMap - testMap(new TimeStampedHashMap[String, String]) - - testMapThreadSafety(new TimeStampedHashMap[String, String]) - - test("TimeStampedHashMap - clearing by timestamp") { - // clearing by insertion time - val map = new TimeStampedHashMap[String, String](false) - map("k1") = "v1" - assert(map("k1") === "v1") - Thread.sleep(10) - val threshTime = System.currentTimeMillis() - assert(map.internalMap.get("k1").timestamp < threshTime) - map.clearOldValues(threshTime) - assert(map.get("k1") === None) - - // clearing by modification time - val map1 = new TimeStampedHashMap[String, String](true) - map1("k1") = "v1" - map1("k2") = "v2" - assert(map1("k1") === "v1") - Thread.sleep(10) - val threshTime1 = System.currentTimeMillis() - Thread.sleep(10) - assert(map1("k2") === "v2") // access k2 to update its access time to > threshTime - assert(map1.internalMap.get("k1").timestamp < threshTime1) - assert(map1.internalMap.get("k2").timestamp >= threshTime1) - map1.clearOldValues(threshTime1) //should only clear k1 - assert(map1.get("k1") === None) - assert(map1.get("k2").isDefined) - } - - // Test TimeStampedHashMap - testMap(new TimeStampedWeakValueHashMap[String, String]) - - testMapThreadSafety(new TimeStampedWeakValueHashMap[String, String]) - - test("TimeStampedWeakValueHashMap - clearing by timestamp") { - // clearing by insertion time - val map = new TimeStampedWeakValueHashMap[String, String]() - map("k1") = "v1" - assert(map("k1") === "v1") - Thread.sleep(10) - val threshTime = System.currentTimeMillis() - assert(map.internalJavaMap.get("k1").timestamp < threshTime) - map.clearOldValues(threshTime) - assert(map.get("k1") === None) - } - - - test("TimeStampedWeakValueHashMap - get not returning null when weak reference is cleared") { - var strongRef = new Object - val weakRef = new WeakReference(strongRef) - val map = new TimeStampedWeakValueHashMap[String, Object] - - map("k1") = strongRef - assert(map("k1") === strongRef) - - strongRef = null - val startTime = System.currentTimeMillis - System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. - System.runFinalization() // Make a best effort to call finalizer on all cleaned objects. - while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { - System.gc() - System.runFinalization() - Thread.sleep(100) - } - assert(map.internalJavaMap.get("k1").weakValue.get == null) - assert(map.get("k1") === None) - - // TODO (TD): Test clearing of null-value pairs - } - - def testMap(hashMapConstructor: => Map[String, String]) { - def newMap() = hashMapConstructor - - val name = newMap().getClass.getSimpleName - - test(name + " - basic test") { - val testMap1 = newMap() - - // put and get - testMap1 += (("k1", "v1")) - assert(testMap1.get("k1").get === "v1") - testMap1("k2") = "v2" - assert(testMap1.get("k2").get === "v2") - assert(testMap1("k2") === "v2") - - // remove - testMap1.remove("k1") - assert(testMap1.get("k1").isEmpty) - testMap1.remove("k2") - intercept[Exception] { - testMap1("k2") // Map.apply() causes exception - } - - // multi put - val keys = (1 to 100).map(_.toString) - val pairs = keys.map(x => (x, x * 2)) - val testMap2 = newMap() - assert((testMap2 ++ pairs).iterator.toSet === pairs.toSet) - testMap2 ++= pairs - - // iterator - assert(testMap2.iterator.toSet === pairs.toSet) - testMap2("k1") = "v1" - - // foreach - val buffer = new ArrayBuffer[(String, String)] - testMap2.foreach(x => buffer += x) - assert(testMap2.toSet === buffer.toSet) - - // multi remove - testMap2 --= keys - assert(testMap2.size === 1) - assert(testMap2.iterator.toSeq.head === ("k1", "v1")) - } - } - - def testMapThreadSafety(hashMapConstructor: => Map[String, String]) { - def newMap() = hashMapConstructor - - val name = newMap().getClass.getSimpleName - val testMap = newMap() - @volatile var error = false - - def getRandomKey(m: Map[String, String]): Option[String] = { - val keys = testMap.keysIterator.toSeq - if (keys.nonEmpty) { - Some(keys(Random.nextInt(keys.size))) - } else { - None - } - } - - val threads = (1 to 100).map(i => new Thread() { - override def run() { - try { - for (j <- 1 to 1000) { - Random.nextInt(3) match { - case 0 => - testMap(Random.nextString(10)) = Random.nextDouble.toString // put - case 1 => - getRandomKey(testMap).map(testMap.get) // get - case 2 => - getRandomKey(testMap).map(testMap.remove) // remove - } - } - } catch { - case t : Throwable => - error = true - throw t - } - } - }) - - test(name + " - threading safety test") { - threads.map(_.start) - threads.map(_.join) - assert(!error) - } - } -} - -class TestMap[A, B] extends WrappedJavaHashMap[A, B, A, B] { - private[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]() - - private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = { - new TestMap[K1, V1] - } -}