# Albacross Data Team Recruitment Task<br>


Michael Shestero.  16 January, 2023




In [1]:
// ip4util

import scala.io.Source
import scala.util.{Random, Success, Try}

  val partitioner: Long = 0x10000000

  // val maxIP: Long = 0xFFFFFFFF
  def partition(ip: Long): Long = ip / partitioner

  def junctions(p: Long) = Seq(p * partitioner - 1, p * partitioner)

  val ip4ToLongUnsafe: String => Long = _.trim.split("\\.").map(_.toLong) match {
    case r@Array(a, b, c, d) if r.map(_ & ~0xFF).forall(_ == 0) => a * 16777216 + b * 65536 + c * 256 + d
  }

  val ip4ToLong: String => Try[Long] = Success(_).map(ip4ToLongUnsafe)

  def longToIp4(ip0: Long): String = {
    import scala.math.Integral.Implicits._
    val (ip1, b1) = ip0 /% 256L
    val (ip2, b2) = ip1 /% 256L
    val (ip3, b3) = ip2 /% 256L
    val (ip4, b4) = ip3 /% 256L
    assert(ip4 == 0)
    s"$b4.$b3.$b2.$b1"
  }

  // random IP generation:

  private val random = Random

  def randomIp: String =
    (random.nextInt(223) + 1).toString + "." + (1 to 3).map { _ => random.nextInt(255) }.mkString(".")

  def randomIpLong: Long = ip4ToLong(randomIp).get

  def randomRange: (Long, Long) = {
    //val r@(r1, r2) = (randomIpLong, randomIpLong)
    //if (r1 > r2) r.swap else r
    val r1 = randomIpLong
    val r2 = r1 + Math.sqrt(random.nextLong(10000000000000000L).toDouble).toLong
    r1 -> r2.max(0xFFFFFFFF)
  }

  type RangeStream = Iterator[(Long, Long)]

  object RangeStream {
    def empty: RangeStream = Iterator.empty[(Long, Long)]
  }

  // Read source data from collection
  val iterableSource: Iterable[(String, String)] => RangeStream =
    _.iterator.map { case r@(sbegin, send) =>
      {
        for {
          begin <- ip4ToLong(sbegin)
          end <- ip4ToLong(send)
        } yield begin -> end
      }.getOrElse(throw new Exception(s"Wrong input range: $r"))
    }

  // Generate random source data
  def randomSource(size: Int = 50): RangeStream = (1 to size).iterator.map(_ => randomRange)

  // Print IP ranges (to debug)
  val printRange: RangeStream => Unit = LazyList.from(1).zip(_).foreach {
    case (num, (begin: Long, end: Long)) => assert(begin <= end); println(s"$num\t${longToIp4(begin)}, ${longToIp4(end)}")
  }



In [2]:
// iterator utils

import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.collection.SortedMap


  case class OrderViolation[T](a: T, b: T) extends Exception(s"Order assert violation: $a then $b")

  protected case class IteratorImplicit1[T](in: Iterator[T]) {

    // check if iterator represent ordered sequence
    def checkOrder[K](implicit ord: Ordering[T]): Iterator[T] = checkOrder[T](identity[T] _)

    def checkOrder[K](k: T => K)(implicit ord: Ordering[K]): Iterator[T] =
      Iterator.unfold(in.buffered) { bi =>
        bi.nextOption().map { current =>
          bi.headOption.foreach { next =>
            if (ord.gt(k(current), k(next))) throw OrderViolation(current, next) // detect order violation
          }
          current -> bi
        }
      }

    def groupByOrdered[K, V](k: T => K)(v: T => V /* = identity */): Iterator[(K, List[V])] =
      Iterator.unfold(in.buffered) { bi =>
        bi.headOption.map(k).map { key =>
          @tailrec def group(list: List[T] = List.empty): List[T] =
            if (bi.headOption.exists(k(_) == key)) group(bi.next() :: list) else list
          // Note: don't use  bi.takeWhile(k(_) == key)  - it spoils one-path iterator

          key -> group().map(v) -> bi
        }
      }

  }

  protected case class IteratorImplicit2[T](in: Iterator[(T, T)]) {
    def checkIntervalsOrder(implicit ord: Ordering[T]): Iterator[(T, T)] =
      Iterator.unfold(in.buffered) { bi =>
        bi.nextOption().map { current =>
          if (ord.gt(current._1, current._2)) throw OrderViolation(current._1, current._2) // detect order violation
          bi.headOption.foreach { next =>
            if (ord.gt(current._1, next._1)) throw OrderViolation(current, next) // detect order violation
          }
          current -> bi
        }
      }
  }

  implicit def fromIterator1[T](in: Iterator[T]) = IteratorImplicit1[T](in)

  implicit def fromIterator2[T](in: Iterator[(T, T)]) = IteratorImplicit2[T](in)




In [3]:
  // solution1

import cats.syntax.semigroup._


  def processPreparedPoints(points: Iterable[(Long, (Int, Int))]): RangeStream =
    processPreparedPoints(points.iterator)

  def processPreparedPoints(points: Iterator[(Long, (Int, Int))]): RangeStream = {
    val (it, zero) = points.checkOrder.foldLeft(Iterator.empty[Long] -> 0) {
      case ((output, counter), (ip, (delta1, delta2))) =>
        assert(counter >= 0)
        assert(delta1 >= 0)
        assert(delta2 <= 0)
        val next = counter + delta1 + delta2
        assert(next >= 0)
        val out = counter -> next -> delta1 match {
          case ((0, 0), 1) => Iterator(ip, ip) // standalone single point range without overlapping
          case ((0, 0), _) => Iterator.empty // several single point at the place
          case ((1, 1), _) => Iterator(ip - 1, ip + 1) // overlapping of single point
          case ((0, 1), 1) | ((1, 0), 0) => Iterator.single(ip) // begin or end of a range
          case ((0, 1), _) => Iterator(ip + 1) // overlapping with single point range(s) at the begin of a range
          case ((1, 0), _) => Iterator(ip - 1) // overlapping with single point range(s) at the end of a range
          case ((_, 1), _) => Iterator.single(ip + 1) // end of overlapping
          case ((1, _), _) => Iterator.single(ip - 1) // begin of overlapping
          case _ => Iterator.empty
        }
        output ++ out -> next
    }
    assert(zero == 0)
    // assert(it.size % 2 == 0) // this check drains it!

    it.grouped(2).flatMap {
      case Seq(begin, end) if begin - 1 == end => Seq() 
      // this situation occurs if inside one bigger range there are side by side ranges; e.g.:
      // [1,4],[2,2],[3,3] => [1,1],{3,2},[4,4] => [1,1],[4,4]
      // [1,8],[3,4],[5,6] => [1,2],{5,4},[7,8] => [1,2],[7,8]
      case Seq(begin, end) => assert(begin <= end); Seq(begin -> end)
      case sq => throw new Exception(s"Unpaired points in processPreparedPoints! $sq")
    }.checkIntervalsOrder
  }

  // solution1 - by total sorting
  def solution1(input: RangeStream): RangeStream = {
    val prepared = input.flatMap { case (begin, end) => Seq(begin -> (+1, 0), end -> (0, -1)) }
      .toSeq.groupMapReduce(_._1)(_._2)(_ |+| _)
      .toSeq.sorted

    processPreparedPoints(prepared)
  }



In [4]:
val test1 = Seq(
    "197.203.0.0" -> "197.206.9.255",
    "197.204.0.0" -> "197.204.0.24",
    "201.233.7.160" -> "201.233.7.168",
    "201.233.7.164" -> "201.233.7.168",
    "201.233.7.167" -> "201.233.7.167",
    "203.133.0.0" -> "203.133.255.255"
)

printRange(solution1(iterableSource(test1)))


1	197.203.0.0, 197.203.255.255
2	197.204.0.25, 197.206.9.255
3	201.233.7.160, 201.233.7.163
4	203.133.0.0, 203.133.255.255


In [8]:
import scala.collection.SortedMap

object PointCounter {
  type PointCounter = SortedMap[Long, Int]

  def empty = SortedMap.empty[Long, Int]

  protected case class PointCounterHelper(counter: PointCounter) {

    def countEnds(points: Iterable[Long]): PointCounter =
      counter ++ points.map(point => point -> (counter.getOrElse(point, 0) - 1))

  }

  implicit def fromPointCounter[K, V](in: PointCounter) = PointCounterHelper(in)

}

In [6]:
// solution2 - using (assuming) the order of input ranges by beginning of each range
  def solution2(input: RangeStream): RangeStream = {
    import PointCounter._
    val (prepared0, lastEnds) =
      input.checkOrder(_._1).groupByOrdered(_._1)(_._2)
        .foldLeft(Iterator.empty[(Long, Int, PointCounter)] -> PointCounter.empty) {
          case ((it, inheritedEnds), (begin, ends)) =>
            val allEnds = inheritedEnds.countEnds(ends)
            val currentEnds = allEnds.rangeTo(begin)
            val nextEnds = allEnds.rangeFrom(begin + 1)
            it ++ Iterable.single((begin, ends.size, currentEnds)) -> nextEnds
        }

    val prepared = prepared0.flatMap {
      case (now, beginCounter, ends) =>
        val endsBefore = ends.rangeUntil(now).view.mapValues(0 -> _)
        val endCounter = ends.get(now).getOrElse(0)
        endsBefore ++ Map(now -> (beginCounter, endCounter))
    } ++ lastEnds.view.mapValues(0 -> _)

    processPreparedPoints(prepared.checkOrder)
  }

  

In [10]:
printRange(solution2(iterableSource(test1)))

1	197.203.0.0, 197.203.255.255
2	197.204.0.25, 197.206.9.255
3	201.233.7.160, 201.233.7.163
4	203.133.0.0, 203.133.255.255


In [11]:
val tests = Seq(
    /*Seq(
        "197.203.0.0" -> "197.203.0.3",
        "197.203.0.1" -> "197.203.0.1",        
        "197.203.0.2" -> "197.203.0.2"        
    ),
    Seq(
        "197.203.0.0" -> "197.203.0.0",
        "197.203.0.1" -> "197.203.0.1"
    ),*/
    Seq(
        "197.203.0.0" -> "197.203.0.5",
        "197.203.0.1" -> "197.203.0.2",
        "197.203.0.2" -> "197.203.0.3"
    ),
    Seq(
        "197.203.0.0" -> "197.203.0.5",
        "197.203.0.1" -> "197.203.0.2",
        "197.203.0.3" -> "197.203.0.4"
    ),
    Seq(
        "197.203.0.0" -> "197.203.0.1",
        "197.203.0.1" -> "197.203.0.1"
    ),
    Seq(
        "197.203.0.0" -> "197.203.0.1",
        "197.203.0.0" -> "197.203.0.0"
    )        
)


In [12]:
tests.foreach(iterableSource andThen solution1 andThen printRange)     

1	197.203.0.0, 197.203.0.0
2	197.203.0.4, 197.203.0.5
1	197.203.0.0, 197.203.0.0
2	197.203.0.5, 197.203.0.5
1	197.203.0.0, 197.203.0.0
1	197.203.0.1, 197.203.0.1


In [13]:
tests.foreach(iterableSource andThen solution2 andThen printRange)

1	197.203.0.0, 197.203.0.0
2	197.203.0.4, 197.203.0.5
1	197.203.0.0, 197.203.0.0
2	197.203.0.5, 197.203.0.5
1	197.203.0.0, 197.203.0.0
1	197.203.0.1, 197.203.0.1
