Skip to content

Commit

Permalink
fix deprecation
Browse files Browse the repository at this point in the history
Caching impl is simplified

CachingUtils renamed to Caching

cleanup StateLock
  • Loading branch information
tribbloid committed Oct 20, 2023
1 parent a6b1f86 commit 67995e6
Show file tree
Hide file tree
Showing 40 changed files with 176 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.tribbloids.spookystuff

import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap

object SpookyViewsConst {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package com.tribbloids.spookystuff.caching
import com.tribbloids.spookystuff.execution.ExplorePlan.ExeID
import com.tribbloids.spookystuff.execution.{ExploreRunner, NodeKey}
import com.tribbloids.spookystuff.row.{DataRow, RowReducer}
import com.tribbloids.spookystuff.utils.CachingUtils.{ConcurrentCache, ConcurrentMap, ConcurrentSet}
import com.tribbloids.spookystuff.utils.Caching
import com.tribbloids.spookystuff.utils.Caching.{ConcurrentCache, ConcurrentMap, ConcurrentSet}

/**
* Singleton, always in the JVM and shared by all executors on the same machine This is a makeshift implementation,
Expand All @@ -13,10 +14,10 @@ object ExploreRunnerCache {

// (NodeKey, ExecutionID) -> Squashed Rows
// exeID is used to segment Squashed Rows from different jobs
val committedVisited: ConcurrentCache[(NodeKey, ExeID), Iterable[DataRow]] = ConcurrentCache()
val committedVisited: ConcurrentCache[(NodeKey, ExeID), Iterable[DataRow]] = Caching.ConcurrentCache()

val onGoings: ConcurrentMap[ExeID, ConcurrentSet[ExploreRunner]] =
ConcurrentMap() // executionID -> running ExploreStateView
Caching.ConcurrentMap() // executionID -> running ExploreStateView

def getOnGoingRunners(exeID: ExeID): ConcurrentSet[ExploreRunner] = {
// onGoings.synchronized{
Expand Down Expand Up @@ -58,7 +59,7 @@ object ExploreRunnerCache {
}

def register(v: ExploreRunner, exeID: ExeID): Unit = {
getOnGoingRunners(exeID) += v -> {}
getOnGoingRunners(exeID) += v
}

def deregister(v: ExploreRunner, exeID: ExeID): Unit = {
Expand All @@ -68,7 +69,6 @@ object ExploreRunnerCache {
def get(key: (NodeKey, ExeID)): Set[Iterable[DataRow]] = {
val onGoing = this
.getOnGoingRunners(key._2)
.keySet

val onGoingVisitedSet = onGoing
.flatMap { v =>
Expand All @@ -81,7 +81,7 @@ object ExploreRunnerCache {
def getAll(exeID: ExeID): Map[NodeKey, Iterable[DataRow]] = {
val onGoing: Map[NodeKey, Iterable[DataRow]] = this
.getOnGoingRunners(exeID)
.map(_._1.visited.toMap)
.map(_.visited.toMap)
.reduceOption { (v1, v2) =>
v1 ++ v2
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package com.tribbloids.spookystuff.caching
import com.tribbloids.spookystuff.SpookyContext
import com.tribbloids.spookystuff.actions.{Trace, TraceView}
import com.tribbloids.spookystuff.doc.DocOption
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentCache
import com.tribbloids.spookystuff.utils.Caching
import com.tribbloids.spookystuff.utils.Caching.ConcurrentCache

/**
* Backed by a WeakHashMap, the web cache temporarily store all trace -> Array[Page] until next GC. Always enabled
*/
object InMemoryDocCache extends AbstractDocCache {

val internal: ConcurrentCache[Trace, Seq[DocOption]] = ConcurrentCache()
val internal: ConcurrentCache[Trace, Seq[DocOption]] = Caching.ConcurrentCache()

def cacheable(v: Seq[DocOption]): Boolean = {
v.exists(v => v.cacheLevel.isInstanceOf[DocCacheLevel.InMemory])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.tribbloids.spookystuff.caching

import com.tribbloids.spookystuff.utils.CachingUtils
import com.tribbloids.spookystuff.utils.Caching

//TODO: not efficient should be replaced
trait Memoize[T, R] extends (T => R) with Serializable {

def f(v: T): R

val cache = CachingUtils.ConcurrentCache[T, (R, Long)]()
val cache = Caching.ConcurrentCache[T, (R, Long)]()

def get(x: T, condition: ((R, Long)) => Boolean): R = {
if (cache.contains(x)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.tribbloids.spookystuff.conf

import com.tribbloids.spookystuff.session.{DriverLike, DriverStatus, Session}
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.{BatchID, Lifespan}
import com.tribbloids.spookystuff.SpookyContext
import org.apache.spark.TaskContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.tribbloids.spookystuff.conf

import com.tribbloids.spookystuff.utils.CachingUtils
import com.tribbloids.spookystuff.utils.Caching

trait ParametricPoly1 extends GenParametricPoly1 {

Expand All @@ -11,7 +11,7 @@ object ParametricPoly1 {

trait Cached extends ParametricPoly1 {

lazy val cache: CachingUtils.ConcurrentMap[UB, Out[_ <: UB]] = CachingUtils.ConcurrentMap()
lazy val cache: Caching.ConcurrentMap[UB, Out[_ <: UB]] = Caching.ConcurrentMap()

def get[T <: UB](k: T): Option[Out[T]] = {
cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ case class CSVBlock(
csvFormat: CSVFormat
) extends Unstructured {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

val parsed: CSVParser = CSVParser.parse(_text, csvFormat)
val parsedList: List[CSVRecord] = parsed.asScala.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.mozilla.universalchardet.UniversalDetector

import java.sql.{Date, Time, Timestamp}
import java.util.UUID
import scala.collection.mutable

class DocOptionUDT extends ScalaUDT[DocOption]

Expand Down Expand Up @@ -122,15 +123,15 @@ case class Doc(
declaredContentType: Option[String] = None,
// cookie: Seq[SerializableCookie] = Nil,
override val timeMillis: Long = System.currentTimeMillis(),
saved: scala.collection.mutable.Set[String] = scala.collection.mutable.Set(), // TODO: move out of constructor
override val cacheLevel: DocCacheLevel.Value = DocCacheLevel.All,
httpStatus: Option[StatusLine] = None,
override val metadata: ResourceMetadata =
ResourceMetadata.empty // for customizing parsing TODO: remove, delegate to CSVElement.
ResourceMetadata.empty, // for customizing parsing TODO: remove, delegate to CSVElement.
saved: mutable.Set[String] = mutable.Set() // TODO: move out of constructor
) extends DocOption
with EqualBy {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

lazy val _equalBy: Any = (uid, uri, declaredContentType, timeMillis, httpStatus.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.tribbloids.spookystuff.caching.ExploreRunnerCache
import com.tribbloids.spookystuff.execution.ExplorePlan.Params
import com.tribbloids.spookystuff.execution.NodeKey
import com.tribbloids.spookystuff.row._
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap

sealed trait ExploreAlgorithm {

Expand Down Expand Up @@ -110,7 +110,6 @@ object ExploreAlgorithms {
override val ordering: RowOrdering = Ordering.by { tuple: (NodeKey, Iterable[DataRow]) =>
val inProgress = ExploreRunnerCache
.getOnGoingRunners(params.executionID)
.keySet
.flatMap(_.fetchingInProgressOpt)

val result = if (inProgress contains tuple._1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.tribbloids.spookystuff.dsl.ExploreAlgorithm
import com.tribbloids.spookystuff.execution.ExplorePlan.Open_Visited
import com.tribbloids.spookystuff.extractors.Resolved
import com.tribbloids.spookystuff.row._
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap
import com.tribbloids.spookystuff.utils.serialization.NOTSerializable
import com.tribbloids.spookystuff.{dsl, SpookyContext}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.tribbloids.spookystuff.SpookyContext
import com.tribbloids.spookystuff.conf.Python
import com.tribbloids.spookystuff.python.PyConverter
import com.tribbloids.spookystuff.session.{PythonDriver, Session}
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap
import com.tribbloids.spookystuff.utils.TreeThrowable
import com.tribbloids.spookystuff.utils.lifespan.Cleanable
import org.apache.spark.ml.dsl.utils._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ case class URLConnectionResolver(
override val retry: Retry = Retry.ExponentialBackoff(8, 16000)
) extends URIResolver {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

case class _Execution(pathStr: String) extends Execution {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.spark.rdd.spookystuff

import java.util.EventListener
import com.tribbloids.spookystuff.utils.CachingUtils
import com.tribbloids.spookystuff.utils.Caching
import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.metrics.source.Source
Expand All @@ -26,8 +26,8 @@ case class UncleanTaskContext(

override def resourcesJMap(): util.Map[String, ResourceInformation] = self.resourcesJMap()

lazy val listeners: CachingUtils.ConcurrentMap[Long, EventListener] =
CachingUtils.ConcurrentMap[Long, EventListener]()
lazy val listeners: Caching.ConcurrentMap[Long, EventListener] =
Caching.ConcurrentMap[Long, EventListener]()

override def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext = {
listeners += (System.currentTimeMillis() -> listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Try

object SpookyEnvFixture {

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

// def cleanDriverInstances(): Unit = {
// CleanMixin.unclean.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object SnapshotRunner extends SpookyEnvFixture.EnvBase {
import com.tribbloids.spookystuff.dsl.DSL._
import com.tribbloids.spookystuff.utils.CommonViews.StringView

val pathEncoding: GenExtractor[FR, String] = S.uri
val pathTemplate: GenExtractor[FR, String] = S.uri
.andFn { uri =>
val base = uri.split(SPLITTER).last
CommonConst.USER_TEMP_DIR \\ "test-sites" \\ base
Expand All @@ -45,7 +45,7 @@ object SnapshotRunner extends SpookyEnvFixture.EnvBase {
cooldown = coolDown
)
originalVersion
.savePages_!(pathEncoding, overwrite = true)
.savePages_!(pathTemplate, overwrite = true)

fd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ class FetchInteractionsIT extends IntegrationFixture {

assert(unionRows.length === 2)
assert(
unionRows(0).docs.head.copy(timeMillis = 0, raw = null, saved = null)
=== unionRows(1).docs.head.copy(timeMillis = 0, raw = null, saved = null)
unionRows(0).docs.head.copy(timeMillis = 0, raw = null)
=== unionRows(1).docs.head.copy(timeMillis = 0, raw = null)
)

assert(unionRows(0).docs.head.timeMillis === unionRows(1).docs.head.timeMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class FetchVisitIT extends IntegrationFixture {

assert(unionRows.length === 2)
assert(
unionRows(0).docs.head.copy(timeMillis = 0, raw = null, saved = null)
=== unionRows(1).docs.head.copy(timeMillis = 0, raw = null, saved = null)
unionRows(0).docs.head.copy(timeMillis = 0, raw = null)
=== unionRows(1).docs.head.copy(timeMillis = 0, raw = null)
)

assert(unionRows(0).docs.head.timeMillis === unionRows(1).docs.head.timeMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ class FetchWgetAndSaveIT extends IntegrationFixture {

assert(unionRows.length === 2)
assert(
unionRows(0).docs.head.copy(timeMillis = 0, raw = null, saved = null) ===
unionRows(1).docs.head.copy(timeMillis = 0, raw = null, saved = null)
unionRows(0).docs.head.copy(timeMillis = 0, raw = null) ===
unionRows(1).docs.head.copy(timeMillis = 0, raw = null)
)

assert(unionRows(0).docs.head.timeMillis === unionRows(1).docs.head.timeMillis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.tribbloids.spookystuff.graph
import com.tribbloids.spookystuff.graph.IDAlgebra.Rotator
import com.tribbloids.spookystuff.graph.Layout.Facet
import com.tribbloids.spookystuff.graph.Module.{Heads, Tails}
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap

import scala.language.implicitConversions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class Visualisation[D <: Domain](
def compileASCII(
endWith: Seq[Element[D]]
): Graph[ElementView[D]#WFormat] = {
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

// val buffer = mutable.LinkedHashSet.empty[ElementView[D]#WFormat]
val buffer = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ElementToArrayDeserializer extends XMLWeakDeserializer[Any] {
extractInner(ti, jv, format).toSet

case (ti @ TypeInfo(this.arrayListClass, _), jv) if !jv.isInstanceOf[JArray] =>
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

new java.util.ArrayList[Any](extractInner(ti, jv, format).toList.asJava)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.tribbloids.spookystuff.relay.xml

import com.tribbloids.spookystuff.utils.CachingUtils
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentCache
import com.tribbloids.spookystuff.utils.Caching
import com.tribbloids.spookystuff.utils.Caching.ConcurrentCache
import org.apache.spark.ml.dsl.utils.Verbose
import com.tribbloids.spookystuff.relay.MessageAPI
import org.json4s._
Expand All @@ -21,7 +21,7 @@ object XMLWeakDeserializer {
custom: Seq[String] = Nil
)

val cached: ConcurrentCache[Long, ParsingException] = CachingUtils.ConcurrentCache[Long, ParsingException]()
val cached: ConcurrentCache[Long, ParsingException] = Caching.ConcurrentCache[Long, ParsingException]()

trait ExceptionLike extends Throwable with Verbose {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.tribbloids.spookystuff.utils

import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.Caching.ConcurrentMap

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
Expand All @@ -10,7 +10,7 @@ object BufferedShuffleIteratorV1 {
/**
* (instanceID -> partitionID) -> iterator
*/
val seeds: ConcurrentMap[(String, Int), Long] = CachingUtils.ConcurrentMap[(String, Int), Long]()
val seeds: ConcurrentMap[(String, Int), Long] = Caching.ConcurrentMap[(String, Int), Long]()
}

case class BufferedShuffleIteratorV1[T](
Expand Down
Loading

0 comments on commit 67995e6

Please sign in to comment.