In [1]:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType, LongType};

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schemaStruct1 = StructType(
            StructField( "booking_id", LongType, true )::
            StructField( "traveller_id", IntegerType, true )::
            StructField( "company_id", IntegerType, true )::
            StructField( "booking_date", DateType, true )::
            StructField( "departure_date", DateType, true )::
            StructField( "origin", StringType, true )::
            StructField( "destination", StringType, true )::
            StructField( "priceUSD", IntegerType, true )::
            StructField( "status", StringType, true ) ::Nil)
       
val df = sqlContext.read.format("com.databricks.spark.csv")
    .option("header","false")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .schema( schemaStruct1 )
    .load("/Users/admin/Documents/CWT_DataEngineerCaseStudy/bookings.csv")
df.show()

val expectedDF = sqlContext.read.format("com.databricks.spark.csv")
    .option("header","false")
    .option("delimiter", ",")
    .option("mode", "DROPMALFORMED")
    .schema( schemaStruct1 )
    .load("/Users/admin/Documents/CWT_DataEngineerCaseStudy/bookings.csv")

//unit testing the dataframe 
assert(df.count() == 16546) //16546
val resultDF = df.except(expectedDF)
assert(resultDF.rdd.isEmpty())

Intitializing Scala interpreter ...

Spark Web UI available at http://LATITUDE3500-K98D0QU:4043
SparkContext available as 'sc' (version = 2.4.8, master = local[*], app id = local-1625568717312)
SparkSession available as 'spark'


+-------------------+------------+----------+------------+--------------+------------+------------+--------+------+
|         booking_id|traveller_id|company_id|booking_date|departure_date|      origin| destination|priceUSD|status|
+-------------------+------------+----------+------------+--------------+------------+------------+--------+------+
|2188183561171387127|         334|        10|  2019-01-02|    2019-03-20|       Sofia|    Budapest|     725|BOOKED|
|4323535508030040434|           8|         9|  2019-01-02|    2019-04-09|   New Delhi|      Sydney|      87|BOOKED|
| 454396553776035072|          86|        12|  2019-01-02|    2019-03-25|   Amsterdam|      Sydney|     283|BOOKED|
|5401900986077032623|         433|         9|  2019-01-02|    2019-04-16|     Beijing|    Brussels|     150|BOOKED|
|2982690493231102623|         253|         4|  2019-01-02|    2019-03-18|      Sydney|       Osaka|     669|BOOKED|
|3303592101252476893|         265|        16|  2019-01-02|    2019-04-26

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType, LongType}
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@63d51b85
schemaStruct1: org.apache.spark.sql.types.StructType = StructType(StructField(booking_id,LongType,true), StructField(traveller_id,IntegerType,true), StructField(company_id,IntegerType,true), StructField(booking_date,DateType,true), StructField(departure_date,DateType,true), StructField(origin,StringType,true), StructField(destination,StringType,true), StructField(priceUSD,IntegerType,true), StructField(status,StringType,true))
df: org.apache.spark.sql.DataFrame = [booking_id: bigint, traveller_id: int ... 7 more fields]
expected...

In [2]:
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

//val arg1 = 2188183561171387127L
//val arg2 = 334

val arg1 = 621196447552601573L
val arg2 = 416
val arg3 = "cancel"     // cancel or alter 
val arg4 = current_date //  date for exchange / cancel

val locateDF = df.filter($"departure_date" > arg4 && $"booking_id" === arg1 && $"traveller_id" === arg2)
                 .orderBy(desc("booking_date")).limit(1)

val canceleDF = locateDF.filter($"status"==="EXCHANGED")

locateDF.show()

if (locateDF.rdd.isEmpty) {
    println("Record not found")
}

else if (arg3 == "cancel" && canceleDF.count() > 0){
    println("Booking has already Exchanged Cancellation not possible")
} 

else if (arg3 == "cancel" ){
    val cancelDF = locateDF.withColumn("status",when ($"status"==="BOOKED", "CANCELLED"))
                           .withColumn("priceUSD", (($"priceUSD"*20/100)-$"priceUSD"))
                           .withColumn("booking_date", arg4)
    cancelDF.show()
    df.union(cancelDF)
} 
else if (arg3 == "alter" ){
    val alterDF = locateDF.withColumn("status",when (($"status"==="BOOKED")||($"status"==="EXCHANGED") , "EXCHANGED"))
                          .withColumn("priceUSD", ($"priceUSD"*20/100))
                          .withColumn("departure_date", arg4)
    alterDF.show()
    df.union(alterDF)
}


+------------------+------------+----------+------------+--------------+--------+-----------+--------+---------+
|        booking_id|traveller_id|company_id|booking_date|departure_date|  origin|destination|priceUSD|   status|
+------------------+------------+----------+------------+--------------+--------+-----------+--------+---------+
|621196447552601573|         416|        17|  2021-08-24|    2021-09-22|Brussels|     Sydney|      21|EXCHANGED|
+------------------+------------+----------+------------+--------------+--------+-----------+--------+---------+

Booking has already Exchanged Cancellation not possible


import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._
arg1: Long = 621196447552601573
arg2: Int = 416
arg3: String = cancel
arg4: org.apache.spark.sql.Column = current_date()
locateDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [booking_id: bigint, traveller_id: int ... 7 more fields]
canceleDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [booking_id: bigint, traveller_id: int ... 7 more fields]
res1: Any = ()


In [3]:
//generate 2 new fields called ‘year’ and ‘month’ of the booking date and saving the result to a file

val addcolDF = df.withColumn("year",year($"booking_date")).withColumn("month",month($"booking_date"))
addcolDF.write.option("header",true)
              .option("delimiter",",")
              .format("csv").save("/Users/admin/Documents/CWT_DataEngineerCaseStudy/file_out")

addcolDF: org.apache.spark.sql.DataFrame = [booking_id: bigint, traveller_id: int ... 9 more fields]


In [5]:
df.createOrReplaceTempView("JBTbookings")
val top5companies = sqlContext.sql("""SELECT company_id,sum(priceUSD)total_cost FROM JBTbookings 
                                    group by company_id order by total_cost desc limit 5""")
val top5destinations = sqlContext.sql("""
                                        select destination,count(destination) trips from (
                                        SELECT booking_id,destination,count(destination)top FROM JBTbookings
                                        where status = 'BOOKED' or status = 'EXCHANGED'
                                        group by booking_id,destination 
                                        )  as t
                                        group by destination order by trips desc limit 5
                                      """)

val alldestinations = sqlContext.sql("SELECT destination,count(destination)top FROM JBTbookings group by destination order by top desc ")

// top 5 companies by based on the cost of travel
top5companies.show()

// top 5 popular destinations based on trips
top5destinations.show()

// all destinations w.r.t to visits
alldestinations.show()              
              

+----------+----------+
|company_id|total_cost|
+----------+----------+
|        15|    332968|
|        20|    300703|
|         9|    289818|
|        18|    279874|
|         1|    271974|
+----------+----------+

+-----------+-----+
|destination|trips|
+-----------+-----+
|     Sydney| 1250|
|  Amsterdam|  966|
|      Paris|  808|
|  Sao Paolo|  621|
|  Hong Kong|  371|
+-----------+-----+

+-----------+----+
|destination| top|
+-----------+----+
|     Sydney|2066|
|  Amsterdam|1613|
|      Paris|1348|
|  Sao Paolo|1023|
|  Hong Kong| 639|
|       Oslo| 408|
|  Stockholm| 384|
|      Dubai| 371|
|     Prague| 370|
|     Athens| 353|
|     London| 350|
|    Toronto| 347|
|      Cairo| 346|
|     Berlin| 343|
|     Moscow| 340|
|     Milano| 338|
|     Lisbon| 336|
|     Madrid| 335|
|Los Angeles| 332|
|       Kiev| 330|
+-----------+----+
only showing top 20 rows



top5companies: org.apache.spark.sql.DataFrame = [company_id: int, total_cost: bigint]
top5destinations: org.apache.spark.sql.DataFrame = [destination: string, trips: bigint]
alldestinations: org.apache.spark.sql.DataFrame = [destination: string, top: bigint]
