# Assumptions
1. Data is clearned and not errorneous
2. Timezone consideration is not required

# Setup

In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import java.time.temporal.ChronoUnit
import java.time.{Period, LocalDate, Instant}
import java.sql.Timestamp

## Parition control based on core availability

In [14]:
val NUM_CORES = 4
val NUM_PARTITIONS = 3

lazy val spark: SparkSession = SparkSession.builder()
    .master("local")
    .appName("flight")
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", NUM_CORES * NUM_PARTITIONS)
spark.conf.set("spark.default.parallelism", NUM_CORES * NUM_PARTITIONS)

import spark.implicits._

NUM_CORES = 4
NUM_PARTITIONS = 3
spark = <lazy>


<lazy>

In [15]:
val FLIGHTDATA_CSV_PATH = "../resources/flightData.csv"
val PASSENGER_CSV_PATH = "../resources/passengers.csv"

FLIGHTDATA_CSV_PATH = ../resources/flightData.csv
PASSENGER_CSV_PATH = ../resources/passengers.csv


../resources/passengers.csv

## Utilities

### Elapsed time profiler

In [3]:
val timing = new StringBuffer
def timed[T](label: String, code: => T): T = {
    val start = System.currentTimeMillis()
    val result = code
    val stop = System.currentTimeMillis()
    timing.append(s"Processing $label took ${stop - start} ms.\n")
    result
}

timing = 


timed: [T](label: String, code: => T)T


In [4]:
// To flush out error: missing argument list for method timed
println("")

<console>:46: error: missing argument list for method timed
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `timed _` or `timed(_,_)` instead of `timed`.
       timed
       ^
lastException: Throwable = null


### UDF

In [5]:
//val BASE_TIMESTAMP = java.sql.Timestamp.valueOf("2017-01-01 00:00:00.0")
val BASE_LOCALDATE = LocalDate.parse("2017-01-01").withDayOfMonth(1)

def get_months_between(to: Timestamp): Short = {
    val monthsBetween = ChronoUnit.MONTHS.between(
        BASE_LOCALDATE,
        to.toLocalDateTime().toLocalDate().withDayOfMonth(1)
    )
    monthsBetween.toShort
}
val udf_months_between = udf((t:Timestamp) => get_months_between(t))

BASE_LOCALDATE = 2017-01-01
udf_months_between = UserDefinedFunction(<function1>,ShortType,Some(List(TimestampType)))


get_months_between: (to: java.sql.Timestamp)Short


UserDefinedFunction(<function1>,ShortType,Some(List(TimestampType)))

# Total flights per month
Month is in-between the first day of the month and the first day of the next month.

In [85]:
// Transformations, no action yet
val flightsPerMonth = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load("../resources/flightData.csv")
    .select("flightId", "date")
    .distinct()
    .groupBy(
        window(trunc(col("date"), "month"), "4 weeks")
    )
    .count
    .orderBy(asc("window"))
    .withColumnRenamed("window", "month")

flightsPerMonth.printSchema()
flightsPerMonth.show()

root
 |-- month: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)

+--------------------+-----+
|               month|count|
+--------------------+-----+
|[2016-12-29 10:00...|   97|
|[2017-01-26 10:00...|   73|
|[2017-02-23 10:00...|   82|
|[2017-03-23 10:00...|   92|
|[2017-04-20 10:00...|   92|
|[2017-05-18 10:00...|   71|
|[2017-06-15 10:00...|   87|
|[2017-07-13 10:00...|   76|
|[2017-08-10 10:00...|   85|
|[2017-09-07 10:00...|   76|
|[2017-10-05 10:00...|   75|
|[2017-11-30 10:00...|   94|
+--------------------+-----+



flightsPerMonth = [month: struct<start: timestamp, end: timestamp>, count: bigint]


lastException: Throwable = null


[month: struct<start: timestamp, end: timestamp>, count: bigint]

In [None]:
// Transformations, no action yet
val flightsPerMonth = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load(FLIGHTDATA_CSV_PATH)
    .select("flightId", "date")
    .distinct()
    .groupBy(
        trunc(col("date"), "month").alias("Month")
    )
    .agg(count("flightId").alias("Number of Flights"))
    .orderBy(asc("Month"))
    .withColumn(
        "Month", udf_months_between(col("Month"))
    )

flightsPerMonth.printSchema()

In [80]:
flightsPerMonth.show()

+--------------------+-----+
|               month|count|
+--------------------+-----+
|[2016-12-31 10:00...|    5|
|[2017-01-01 10:00...|    6|
|[2017-01-02 10:00...|    5|
|[2017-01-03 10:00...|    3|
|[2017-01-04 10:00...|    1|
|[2017-01-06 10:00...|    2|
|[2017-01-07 10:00...|    4|
|[2017-01-08 10:00...|    6|
|[2017-01-09 10:00...|    9|
|[2017-01-10 10:00...|    4|
|[2017-01-11 10:00...|    1|
|[2017-01-12 10:00...|    5|
|[2017-01-13 10:00...|    1|
|[2017-01-14 10:00...|    3|
|[2017-01-15 10:00...|    2|
|[2017-01-16 10:00...|    3|
|[2017-01-17 10:00...|    1|
|[2017-01-18 10:00...|    3|
|[2017-01-19 10:00...|    6|
|[2017-01-20 10:00...|    2|
+--------------------+-----+
only showing top 20 rows



# Frequent flyers
Top 100 frequent flyers

### Top frequent flyers

In [193]:
val TOP_N = 100
val frequentFlyers = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load("../resources/flightData.csv")
    .select("passengerId")
    .groupBy("passengerId")
    .count
    .orderBy(desc("count"))
    .withColumnRenamed("count", "numberOfFlights")
    //--------------------------------------------------------------------------------
    // TOP_N flyers 
    //--------------------------------------------------------------------------------
    .limit(LIMIT)
    //--------------------------------------------------------------------------------
    // Re-sort for passengerId match
    //--------------------------------------------------------------------------------
    .orderBy(asc("passengerId"))
    .persist

frequentFlyers.printSchema()

root
 |-- passengerId: integer (nullable = true)
 |-- numberOfFlights: long (nullable = false)



TOP_N = 100
frequentFlyers = [passengerId: int, numberOfFlights: bigint]


[passengerId: int, numberOfFlights: bigint]

###  Passengers

In [194]:
val LIMIT = 100
val passengers = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("inferSchema", "true")
    .load(PASSENGER_CSV_PATH)
    //--------------------------------------------------------------------------------
    // Sort for passengerId match
    //--------------------------------------------------------------------------------
    .orderBy(asc("passengerId"))
    .persist

passengers.printSchema()

root
 |-- passengerId: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)



LIMIT = 100
passengers = [passengerId: int, firstName: string ... 1 more field]


[passengerId: int, firstName: string ... 1 more field]

### Frequent flyer listing

In [195]:
frequentFlyers.createOrReplaceTempView("frequentFlyers")
passengers.createOrReplaceTempView("passengers")

In [196]:
val queryTopFrequetFlyers = """
SELECT 
    f.passengerId AS passenger_id,
    f.numberOfFlights AS number_of_flights,
    p.firstName AS first_name,
    p.lastName as last_name
FROM
    frequentFlyers f
    INNER JOIN passengers p
    ON f.passengerId = p.passengerId
ORDER BY number_of_flights DESC
"""

val topFrequentFlyers = spark.sql(queryTopFrequetFlyers)

queryTopFrequetFlyers = 
topFrequentFlyers = [passenger_id: int, number_of_flights: bigint ... 2 more fields]


"
SELECT
    f.passengerId AS passenger_id,
    f.numberOfFlights AS number_of_flights,
    p.firstName AS first_name,
    p.lastName as last_name
FROM
    frequentFlyers f
    INNER JOIN passengers p
    ON f.passengerId = p.passengerId
ORDER BY number_of_flights DESC
"


[passenger_id: int, number_of_flights: bigint ... 2 more fields]

In [197]:
topFrequentFlyers
    // Coalesce to save in the driver node as one file, otherwise no need
    .coalesce(1)   
    // .persist
    .write
    .format("csv")
    .mode(SaveMode.Overwrite)
    .option("header", "true")
    .save("topFrequentFlyers") 

topFrequentFlyers.show(5)

+------------+-----------------+----------+---------+
|passenger_id|number_of_flights|first_name|last_name|
+------------+-----------------+----------+---------+
|        2068|               32|   Yolande|     Pete|
|        1677|               27| Katherina| Vasiliki|
|        4827|               27|     Jaime|    Renay|
|        3173|               26|  Sunshine|    Scott|
|        8961|               26|     Ginny|    Clara|
+------------+-----------------+----------+---------+
only showing top 5 rows



In [198]:
/*
frequentFlyers.unpersist
passengers.unpersist
*/

Name: Syntax Error.
Message: 
StackTrace: 

# Longest passages
Find the greatest number of countries a passenger has been in without being in the UK. For example, if the countries a passenger was in were: UK -> FR -> US -> CN -> UK -> DE -> UK, the correct answer would be 3 countries.

In [28]:
import java.sql.Timestamp
case class Passage (
    passengerId: Int,
    flightId: String,
    from: String,
    to: String,
    date: Timestamp
)

defined class Passage


In [61]:
// Transformations, no action yet
val flightData = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load(FLIGHTDATA_CSV_PATH)
    .orderBy(asc("passengerId"), asc("date"))
    .select("passengerId", "from", "to")

flightData.printSchema()

root
 |-- passengerId: integer (nullable = true)
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)



flightData = [passengerId: int, from: string ... 1 more field]


[passengerId: int, from: string ... 1 more field]

In [65]:
flightData.show()

+-----------+----+---+
|passengerId|from| to|
+-----------+----+---+
|          1|  cg| ir|
|          1|  ir| at|
|          1|  at| cn|
|          1|  cn| ch|
|          1|  ch| pk|
|          2|  cg| ir|
|          3|  cg| ir|
|          3|  ir| sg|
|          3|  sg| be|
|          3|  be| ir|
|          4|  cg| ir|
|          4|  ir| no|
|          4|  no| cn|
|          4|  cn| sg|
|          4|  sg| jo|
|          4|  jo| ir|
|          4|  ir| tj|
|          4|  tj| at|
|          5|  cg| ir|
|          6|  cg| ir|
+-----------+----+---+
only showing top 20 rows



In [62]:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
val flightDataRDD: RDD[Row] = flightData.rdd

flightDataRDD = MapPartitionsRDD[214] at rdd at <console>:59


MapPartitionsRDD[214] at rdd at <console>:59

In [64]:
flightDataRDD.take(5).foreach(println)

[1,cg,ir]
[1,ir,at]
[1,at,cn]
[1,cn,ch]
[1,ch,pk]


In [77]:
passages.take(5).foreach(println)

(12516,CompactBuffer((no,be), (be,au), (au,cn), (cn,au), (au,sg), (sg,tk), (tk,se), (se,us), (us,au), (au,be)))
(12456,CompactBuffer((dk,dk)))
(14508,CompactBuffer((ca,co)))
(3456,CompactBuffer((au,ch), (ch,fr), (fr,pk)))
(7608,CompactBuffer((sg,ar), (ar,cl), (cl,sg), (sg,iq), (iq,jo), (jo,no), (no,il), (il,uk), (uk,ca), (ca,jo), (jo,ch), (ch,il), (il,jo), (jo,us), (us,au)))


In [1]:
// Transformations, no action yet
val passageDirections = spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .load("../resources/flightData.csv")
    .withColumn(
        "direction", 
        when(lower(col("from")) === "uk", 1)
        .when(lower(col("to"))   === "uk", -1)
        .otherwise(0)
    )
    .orderBy(asc("passengerId"), asc("date"))

passageDirections.printSchema()

Name: Compile Error
Message: <console>:36: error: not found: value when
               when(lower(col("from")) === "uk", 1)
               ^
<console>:36: error: not found: value lower
               when(lower(col("from")) === "uk", 1)
                    ^
<console>:36: error: not found: value col
               when(lower(col("from")) === "uk", 1)
                          ^
<console>:37: error: not found: value lower
               .when(lower(col("to"))   === "uk", -1)
                     ^
<console>:37: error: not found: value col
               .when(lower(col("to"))   === "uk", -1)
                           ^
<console>:43: error: not found: value asc
           .orderBy(asc("passengerId"), asc("date"))
                    ^
<console>:43: error: not found: value asc
           .orderBy(asc("passengerId"), asc("date"))
                                        ^

StackTrace: 

In [200]:
//passageDirections.show(300)
//passageDirections.filter(col("passengerId") isin (16, 22, 47, 52)).show(50)
passageDirections.filter(col("passengerId") isin (22,14)).show(50)

+-----------+--------+----+---+-------------------+---------+
|passengerId|flightId|from| to|               date|direction|
+-----------+--------+----+---+-------------------+---------+
|         14|       0|  cg| ir|2017-01-01 00:00:00|        0|
|         14|     200|  ir| no|2017-03-14 00:00:00|        0|
|         14|     222|  no| ir|2017-03-19 00:00:00|        0|
|         22|       0|  cg| ir|2017-01-01 00:00:00|        0|
|         22|      32|  ir| sg|2017-01-10 00:00:00|        0|
|         22|     147|  sg| tj|2017-02-22 00:00:00|        0|
|         22|     162|  tj| iq|2017-02-27 00:00:00|        0|
|         22|     279|  iq| il|2017-04-11 00:00:00|        0|
|         22|     290|  il| pk|2017-04-15 00:00:00|        0|
|         22|     330|  pk| co|2017-04-27 00:00:00|        0|
|         22|     335|  co| at|2017-04-29 00:00:00|        0|
|         22|     363|  at| iq|2017-05-09 00:00:00|        0|
|         22|     681|  iq| uk|2017-09-06 00:00:00|       -1|
|       

In [207]:
val hoge = passageDirections
    .select("passengerId", "from", "to")

hoge.toDs()
hoge.show()

Name: Unknown Error
Message: <console>:141: error: value toDs is not a member of org.apache.spark.sql.DataFrame
       hoge.toDs()
            ^

StackTrace: 

In [202]:
passageDirections.createOrReplaceTempView("passageDirections")

In [203]:
var query = """
SELECT 
    passageDirections.*,
    row_number() OVER (PARTITION BY passengerId ORDER BY date)            AS seqnum,
    row_number() OVER (PARTITION BY passengerId, direction ORDER BY date) AS seqnum_2
FROM passageDirections
WHERE passengerId in (22)
ORDER BY passengerId, date
"""

var result = spark.sql(query)

result.show(100)

+-----------+--------+----+---+-------------------+---------+------+--------+
|passengerId|flightId|from| to|               date|direction|seqnum|seqnum_2|
+-----------+--------+----+---+-------------------+---------+------+--------+
|         22|       0|  cg| ir|2017-01-01 00:00:00|        0|     1|       1|
|         22|      32|  ir| sg|2017-01-10 00:00:00|        0|     2|       2|
|         22|     147|  sg| tj|2017-02-22 00:00:00|        0|     3|       3|
|         22|     162|  tj| iq|2017-02-27 00:00:00|        0|     4|       4|
|         22|     279|  iq| il|2017-04-11 00:00:00|        0|     5|       5|
|         22|     290|  il| pk|2017-04-15 00:00:00|        0|     6|       6|
|         22|     330|  pk| co|2017-04-27 00:00:00|        0|     7|       7|
|         22|     335|  co| at|2017-04-29 00:00:00|        0|     8|       8|
|         22|     363|  at| iq|2017-05-09 00:00:00|        0|     9|       9|
|         22|     681|  iq| uk|2017-09-06 00:00:00|       -1|   

query = 
result = [passengerId: int, flightId: int ... 6 more fields]


"
SELECT
    passageDirections.*,
    row_number() OVER (PARTITION BY passengerId ORDER BY date)            AS seqnum,
    row_number() OVER (PARTITION BY passengerId, direction ORDER BY date) AS seqnum_2
FROM passageDirections
WHERE passengerId in (22)
ORDER BY passengerId, date
"


[passengerId: int, flightId: int ... 6 more fields]

In [204]:
val queryPassages = """
SELECT 
    passengerId, 
    min(date), 
    max(date), 
    count(*) AS num_passages
FROM (
    SELECT 
        passageDirections.*,
        row_number() OVER (PARTITION BY passengerId ORDER BY date)            AS seqnum,
        row_number() OVER (PARTITION BY passengerId, direction ORDER BY date) AS seqnum_2
    FROM passageDirections
    )
WHERE direction = 0
GROUP BY passengerId, (seqnum - seqnum_2), direction
"""

val passages = spark.sql(queryPassages)

queryPassages = 
passages = [passengerId: int, min(date): timestamp ... 2 more fields]


"
SELECT
    passengerId,
    min(date),
    max(date),
    count(*) AS num_passages
FROM (
    SELECT
        passageDirections.*,
        row_number() OVER (PARTITION BY passengerId ORDER BY date)            AS seqnum,
        row_number() OVER (PARTITION BY passengerId, direction ORDER BY date) AS seqnum_2
    FROM passageDirections
    )
WHERE direction = 0
GROUP BY passengerId, (seqnum - seqnum_2), direction
"


[passengerId: int, min(date): timestamp ... 2 more fields]

In [205]:
passages.show()


+-----------+-------------------+-------------------+------------+
|passengerId|          min(date)|          max(date)|num_passages|
+-----------+-------------------+-------------------+------------+
|         14|2017-01-01 00:00:00|2017-03-19 00:00:00|           3|
|         18|2017-01-01 00:00:00|2017-12-20 00:00:00|          14|
|         25|2017-01-01 00:00:00|2017-12-16 00:00:00|           2|
|         38|2017-01-01 00:00:00|2017-12-22 00:00:00|           4|
|         46|2017-01-01 00:00:00|2017-12-25 00:00:00|           6|
|         50|2017-01-01 00:00:00|2017-12-12 00:00:00|           6|
|         73|2017-01-01 00:00:00|2017-07-13 00:00:00|           5|
|         97|2017-01-01 00:00:00|2017-12-16 00:00:00|           8|
|        161|2017-01-01 00:00:00|2017-01-01 00:00:00|           1|
|        172|2017-01-01 00:00:00|2017-04-27 00:00:00|           9|
|        186|2017-01-01 00:00:00|2017-07-15 00:00:00|           9|
|        225|2017-01-01 00:00:00|2017-12-22 00:00:00|         

In [206]:
var query = """
SELECT
  lag(v) OVER (ORDER BY v) as A,
  v as B,
  lead(v) OVER (ORDER BY v) as C
FROM (
  VALUES (1), (2), (3), (4)
) t(v)
"""
var result = spark.sql(query)
result.show()

+----+---+----+
|   A|  B|   C|
+----+---+----+
|null|  1|   2|
|   1|  2|   3|
|   2|  3|   4|
|   3|  4|null|
+----+---+----+



query = 
result = [A: int, B: int ... 1 more field]


"
SELECT
  lag(v) OVER (ORDER BY v) as A,
  v as B,
  lead(v) OVER (ORDER BY v) as C
FROM (
  VALUES (1), (2), (3), (4)
) t(v)
"


[A: int, B: int ... 1 more field]