# Exercise 2

## Initializing the spark context


In [1]:
val scalaVersion = scala.util.Properties.versionNumberString

[36mscalaVersion[0m: [32mString[0m = [32m"2.11.8"[0m

In [2]:
classpath.add("org.apache.spark" %% "spark-core" % "1.6.0")

130 new artifact(s)

130 new artifacts in macro





130 new artifacts in runtime
130 new artifacts in compile




The SparkContext instance is made transient to prevent attempting to serialize it.

I encountered some serialization issues when using closures referencing results from previous notebook cells.
It seems all cell results are somehow considered as fields of the same object and Spark was trying to serialize it all (including my SparkContext).

I inspired myself from: https://gist.github.com/alexarchambault/efa9e184c53fb2af6229

In [3]:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

@transient val sc = {
  val conf = new SparkConf()
      .setAppName("Test1")
      .setMaster("local[*]")
  SparkContext.getOrCreate(conf)
}


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/05/17 23:24:23 INFO SparkContext: Running Spark version 1.6.0
16/05/17 23:24:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/05/17 23:24:23 INFO SecurityManager: Changing view acls to: mtrampont
16/05/17 23:24:23 INFO SecurityManager: Changing modify acls to: mtrampont
16/05/17 23:24:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(mtrampont); users with modify permissions: Set(mtrampont)
16/05/17 23:24:24 INFO Utils: Successfully started service 'sparkDriver' on port 55757.
16/05/17 23:24:24 INFO Slf4jLogger: Slf4jLogger started
16/05/17 23:24:24 INFO Remoting: Starting remoting
16/05/17 23:24:24 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:55770]
16/05/17 23:24:24 INFO Utils: Successfully start

[32mimport [36morg.apache.spark.SparkConf[0m
[32mimport [36morg.apache.spark.SparkContext[0m
[36msc[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mSparkContext[0m = org.apache.spark.SparkContext@3b1fe641

## Parsing the csv file

### Into an RDD
Here is we parse the file using only spark to create an RDD with tuple containing only the fields we want (namely the airport code and the number of passengers) 

In [9]:
val csvPath = "d:/workspace/TID_coding_tests/coding_tests/src/main/resources/bookings.csv"

[36mcsvPath[0m: [32mString[0m = [32m"d:/workspace/TID_coding_tests/coding_tests/src/main/resources/bookings.csv"[0m

Extracting the header line and the columns indexes.

In [5]:
import scala.io.Source

val headerLine =
  Source.fromFile(csvPath)
        .getLines()
        .take(1)
        .toList
        .head

val colNames =
  headerLine.split("""\^""", -1)
      .map(_.trim)
      .zipWithIndex
      .toMap


[32mimport [36mscala.io.Source[0m
[36mheaderLine[0m: [32mString[0m = [32m"act_date           ^source^pos_ctry^pos_iata^pos_oid  ^rloc          ^cre_date           ^duration^distance^dep_port^dep_city^dep_ctry^arr_port^arr_city^arr_ctry^lst_port^lst_city^lst_ctry^brd_port^brd_city^brd_ctry^off_port^off_city^off_ctry^mkt_port^mkt_city^mkt_ctry^intl^route          ^carrier^bkg_class^cab_class^brd_time           ^off_time           ^pax^year^month^oid      "[0m
[36mcolNames[0m: [32mMap[0m[[32mString[0m, [32mInt[0m] = [33mMap[0m(
  [32m"off_port"[0m -> [32m21[0m,
  [32m"duration"[0m -> [32m7[0m,
  [32m"brd_ctry"[0m -> [32m20[0m,
  [32m"lst_city"[0m -> [32m16[0m,
  [32m"pos_ctry"[0m -> [32m2[0m,
  [32m"source"[0m -> [32m1[0m,
  [32m"mkt_port"[0m -> [32m24[0m,
  [32m"carrier"[0m -> [32m29[0m,
  [32m"arr_ctry"[0m -> [32m14[0m,
  [32m"pos_oid"[0m -> [32m4[0m,
  [32m"lst_ctry"[0m -> [32m17[0m,
  [32m"oid"[0m -> [32m37[0m,
  [32m

Now we can parse the file and create a RDD[(String, Int)] with the arrival airport and the number of passengers per booking.

In [10]:
val bookings =
      sc.textFile(csvPath)
        .filter(_ != headerLine) // The first partition will get the header line. We need to discard it
        .map{ record =>
          val fields = record.split("""\^""", -1).map(_.trim)
          (fields(colNames("arr_port")), fields(colNames("pax")).toInt)
        }


[36mbookings[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[([32mString[0m, [32mInt[0m)] = MapPartitionsRDD[9] at map at Main.scala:34

For now we only defined transformations. Let's try an action to test the parsing.

In [11]:
bookings.count()

: 

Seems like parsing of some records failed.
Let's try a version with more errors handling.

In [13]:
import scala.util.{Try, Failure, Success}

val bookings =
  sc.textFile(csvPath)
    .filter(_ != headerLine)
    .map{ record =>
      Try{
        val fields = record.split("""\^""", -1).map(_.trim)
        (fields(colNames("arr_port")), fields(colNames("pax")).toInt)
      }.recoverWith{ case cause =>
        Failure(new IllegalArgumentException(s"Parsing failed with line: $record", cause))
      }
    }

bookings.collect{case Failure(cause) => cause}
    .collect()
    .foreach(println)


java.lang.IllegalArgumentException: Parsing failed with line: 2013-03-25 00:00:00^1V    JP      ^^a37584d1485cb35991e4ff1a2ba92262^2013-03-25 00:00:00^8371^60^NRT     ^TYO     ^JP      ^SIN     ^SIN     ^SG      ^HND     TYO     ^JP      ^NRT     ^TYO     ^JP      ^SIN     ^SIN     ^SG      ^NRTSIN  ^SINTYO  ^JPSG    ^1^NRTSIN         ^XR,Q        ^Y        ^2013-04-14 11:05:00^2013-04-14 17:10:56^2^2013^3^NULL     


[32mimport [36mscala.util.{Try, Failure, Success}[0m
[36mbookings[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[[32mTry[0m[([32mString[0m, [32mInt[0m)]] = MapPartitionsRDD[17] at map at Main.scala:40

It seems only one record has some issues. Some fields and separators are missing, and there is even a "," that I guess should be a separator.
Let's discard it and continue with the rest.

In [14]:
val arrivalAirports =
  bookings.collect{case Success(bkg) => bkg}
      .aggregateByKey(0)(_+_, _+_)

[36marrivalAirports[0m: [32morg[0m.[32mapache[0m.[32mspark[0m.[32mrdd[0m.[32mRDD[0m[([32mString[0m, [32mInt[0m)] = ShuffledRDD[22] at aggregateByKey at Main.scala:27

This sums the passengers by airport.

We can then sort the resulting RDD and take the top 10.

In [16]:
arrivalAirports.sortBy(_._2, false).take(10)

[36mres15[0m: [32mArray[0m[([32mString[0m, [32mInt[0m)] = [33mArray[0m(
  [33m[0m([32m"LHR"[0m, [32m88809[0m),
  [33m[0m([32m"MCO"[0m, [32m70930[0m),
  [33m[0m([32m"LAX"[0m, [32m70530[0m),
  [33m[0m([32m"LAS"[0m, [32m69630[0m),
  [33m[0m([32m"JFK"[0m, [32m66270[0m),
  [33m[0m([32m"CDG"[0m, [32m64490[0m),
  [33m[0m([32m"BKK"[0m, [32m59460[0m),
  [33m[0m([32m"MIA"[0m, [32m58150[0m),
  [33m[0m([32m"SFO"[0m, [32m58000[0m),
  [33m[0m([32m"DXB"[0m, [32m55590[0m)
)

But if we need only the top 10, we can use `top`, which will save us from sorting the whole RDD.

In [17]:
arrivalAirports.top(10)(Ordering.by[(String, Int),Int](_._2))

[36mres16[0m: [32mArray[0m[([32mString[0m, [32mInt[0m)] = [33mArray[0m(
  [33m[0m([32m"LHR"[0m, [32m88809[0m),
  [33m[0m([32m"MCO"[0m, [32m70930[0m),
  [33m[0m([32m"LAX"[0m, [32m70530[0m),
  [33m[0m([32m"LAS"[0m, [32m69630[0m),
  [33m[0m([32m"JFK"[0m, [32m66270[0m),
  [33m[0m([32m"CDG"[0m, [32m64490[0m),
  [33m[0m([32m"BKK"[0m, [32m59460[0m),
  [33m[0m([32m"MIA"[0m, [32m58150[0m),
  [33m[0m([32m"SFO"[0m, [32m58000[0m),
  [33m[0m([32m"DXB"[0m, [32m55590[0m)
)

In [13]:
sc.stop()

