# (S1) Initialization

In [1]:
println("Yesterday was memorable, today is precious, tomorrow will be better")
println(s"Spark version = ${sc.version}")


Yesterday was memorable, today is precious, tomorrow will be better
Spark version = 2.2.0


### Load dependency files


In [3]:
import java.io.File
import java.util.Calendar

import scala.util.{Try, Success, Failure}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, Encoders}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.graphx._
import org.graphframes._


print(s"Successfully imported files... ${Calendar.getInstance().getTime()} ")

Successfully imported files... Sat May 19 07:56:25 PDT 2018 

### Referencs

https://stackoverflow.com/questions/43508054/spark-sql-how-to-read-a-tsv-or-csv-file-into-dataframe-and-apply-a-custom-sche?rq=1

https://stackoverflow.com/questions/44009455/scala-convert-list-of-dataframe-into-single-dataframe-then-merge-it-with-anothe


### Helper function

In [4]:
def getType[T: Manifest](t: T): Manifest[T] = manifest[T]

def getListOfFiles(parentDirectory: String, subDirectory: String):List[String] = {
    val d = new File(parentDirectory + subDirectory)
    if (d.exists && d.isDirectory) {
        d.listFiles.filter(_.isFile).toList.map{_.toString}
    } else {
        List.empty[String]
    }
}

def fromTsvToDF(fileNames: List[String], schema: StructType, bFileHasHeader: Boolean=false): org.apache.spark.sql.DataFrame = {
    val allDFs: List[org.apache.spark.sql.DataFrame] = fileNames.map{ 
        f => {
            println(s"Reading file: $f")
            spark.read.format("csv")        
                .option("header", bFileHasHeader.toString)
                .option("delimiter", "\t")
                .schema(schema)            
                .load(f)
        }
    }
    println(s"fromTsvToDF(): numFiles read = ${allDFs.size}")
    if (allDFs.size > 1)
        allDFs.reduce( _ union _)
    else if (allDFs.size == 1)
      allDFs.head
    else 
      spark.createDataFrame(sc.emptyRDD[Row], schema)
}

print(s"Successfully defined helper functions... ${Calendar.getInstance().getTime()} ")

Successfully defined helper functions... Sat May 19 07:56:30 PDT 2018 

# (S2) To DataFrame

### Configuration settings

In [5]:
val dataParentDir="/Users/thchang/Documents/dev/personal/ml/connect/git/projects/machine-learning/projects/capstone/data/"
//Users/thchang/Documents/dev/git/tensorlfow/tensorflow/tensorflow/examples/tutorials/word2vec/data
val dataParentResultDir="result"

val dataQueryAspectInputDir="ebay-query-aspects/dev/"
val dataQueryAspectResultDir="ebay-query-aspects/result/"

val dataQueryCatInputDir="ebay-query-cat/dev/"
val dataQueryCatResultDir="ebay-query-cat/result/"

println(s"Successfuly defined data directories. ${Calendar.getInstance().getTime()}")

Successfuly defined data directories. Sat May 19 07:56:35 PDT 2018


### Define  models and schemas

In [6]:
case class DataQueryAspect(query:String, aspect:String, probAGivenQ:Double, probQGiveA:Double)
val DataQueryAspectSchema: StructType = Encoders.product[DataQueryAspect].schema

case class DataQueryCat(query:String, catId:String, weight:Float, typeOf:String, priceLow:Float, priceHigh:Float)
val DataQueryCatSchema: StructType = Encoders.product[DataQueryCat].schema

println(s"Successfuly defined models and encoders. ${Calendar.getInstance().getTime()}")

Successfuly defined models and encoders. Sat May 19 07:56:38 PDT 2018


### Read from data directory

In [8]:
val queryAspectFiles = getListOfFiles(dataParentDir, dataQueryAspectInputDir)
val queryAspectRaw: org.apache.spark.sql.DataFrame = fromTsvToDF(queryAspectFiles, DataQueryAspectSchema)  //Dataframe is Dataset[Row]

val queryCatFiles = getListOfFiles(dataParentDir, dataQueryCatInputDir)
val queryCatRaw: org.apache.spark.sql.DataFrame = fromTsvToDF(queryCatFiles, DataQueryCatSchema, bFileHasHeader=true)

queryAspectRaw.show(5)



Reading file: /Users/thchang/Documents/dev/personal/ml/connect/git/projects/machine-learning/projects/capstone/data/ebay-query-aspects/dev/a.tsv
fromTsvToDF(): numFiles read = 1
Reading file: /Users/thchang/Documents/dev/personal/ml/connect/git/projects/machine-learning/projects/capstone/data/ebay-query-cat/dev/query-cat-dev.tsv
fromTsvToDF(): numFiles read = 1
+--------------------+--------------------+-----------+----------+
|               query|              aspect|probAGivenQ|probQGiveA|
+--------------------+--------------------+-----------+----------+
|   climbers wall art|Style##Arts & Cra...|     -2.416|   -12.631|
|   climbers wall art|Features##Persona...|     -2.416|   -12.797|
|   climbers wall art|Type##Sticker/3D ...|     -2.416|   -11.134|
|   climbers wall art|        Color##Black|     -2.416|   -18.746|
|clincher wheelset...|Brand##Oval Concepts|     -2.493|    -8.015|
+--------------------+--------------------+-----------+----------+
only showing top 5 rows



In [17]:
val queryDF = queryAspectRaw.select("query").dropDuplicates()
queryDF.show(5)


+--------------------+
|               query|
+--------------------+
|   climbers wall art|
|clincher wheelset...|
|affinity series c...|
|     maelstorm nexus|
|               shoes|
+--------------------+
only showing top 5 rows



In [14]:
val outputDir = "/Users/thchang/Documents/dev/personal/ml/connect/git/projects/machine-learning/projects/capstone/data/ebay-query-aspects"
queryAspectRaw.select("query").write.csv(s"${outputDir}/query.csv")
println(s"Successfuly saved queryAsepctRaw to file. ${Calendar.getInstance().getTime()}")

Successfuly saved queryAsepctRaw to file. Sat May 19 08:24:04 PDT 2018


### Create input to wordvec [DONE]

In [8]:
val queryAspectDF = queryAspectRaw.select(queryAspectRaw("query"), queryAspectRaw("aspect"))
println(s"queryAspectDF:\n")
queryAspectDF.show

val queryAspectRDD: org.apache.spark.rdd.RDD[String] = queryAspectDF.rdd.map { r => {
            val queryTokens: Seq[String] = r.get(0).asInstanceOf[String].split(" ")
            val aspect: String = r.get(1).toString.replaceAll("\\s", "_")
            
            aspect + " " + queryTokens.mkString(s" $aspect " ) + " " + aspect    
        }
}
println(s"\nqueryAspectRDD:\n${queryAspectRDD.collect.mkString("\n")}")

val outputDir = s"file:///$dataParentDir$dataQueryAspectResultDir/${Calendar.getInstance().getTime()}".replaceAll(" ", "_")
println(outputDir)
queryAspectRDD.saveAsTextFile(outputDir)

println(s"\nSuccessfully converted to DF. ${Calendar.getInstance().getTime()}")

queryAspectDF:

+--------------------+--------------------+
|               query|              aspect|
+--------------------+--------------------+
|   climbers wall art|Style##Arts & Cra...|
|   climbers wall art|Features##Persona...|
|   climbers wall art|Type##Sticker/3D ...|
|   climbers wall art|        Color##Black|
|clincher wheelset...|Brand##Oval Concepts|
|clincher wheelset...|For Bike Type##Un...|
|clincher wheelset...|For Bike Type##Ro...|
|clincher wheelset...|Brake Compatibili...|
|clincher wheelset...|For Bike Type##Cy...|
|affinity series c...|Design/Finish##Pl...|
|affinity series c...|Type##Fitted Case...|
|affinity series c...|        Color##Clear|
|affinity series c...|  Bundle Listing##No|
|     maelstorm nexus| Color##Multicolored|
|     maelstorm nexus|   Set##Alara Reborn|
|     maelstorm nexus|      Set##Darksteel|
|     maelstorm nexus|   Type##Enchantment|
|     maelstorm nexus|For##Collectors &...|
|     maelstorm nexus|      Type##Creature|
|            iph

### Similar queries via approximation [WIP]
Not sure how well this will work because I am picking the first and last queries. Not sure how well this will work.

In [124]:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.{array, collect_list}
val cachedQueries = Seq("iphone 8", "htc")
val cachedQueriesBroadcast = sc.broadcast(cachedQueries)

val aspectToQueriesDF = queryAspectRaw.groupBy("aspect").agg(collect_set("query") as "qAspectList")
//println(s"\naspectToQueriesDF:")
//println(s"${aspectToQueriesDF.show()}")
val similarQueriesByAspectsRDD = aspectToQueriesDF.rdd.flatMap{
    row:Row => {
        val queries = row.get(1).asInstanceOf[Seq[String]].filter(word => word.nonEmpty && word.charAt(0).isLetter).sorted
        if (queries.size >1) {
            val queriesOfInterest = cachedQueriesBroadcast.value intersect queries
            if (queriesOfInterest.nonEmpty) {
                val key = queriesOfInterest.head
                val rest = queries.filter(f => !f.equals(key))
                Some(key, rest)
            } else {                
                //Some((queries.head, queries.last))    
                Some((queries.head, queries.tail.toSeq))
            }        
        } else None
    }
}.distinct.cache
println(s"\nsimilarQueriesByAspectsRDD[${similarQueriesByAspectsRDD.count}]: ${similarQueriesByAspectsRDD.collect.mkString(", ")}")



val catToQueriesDF = queryCatRaw.groupBy("catId").agg(collect_list("query") as "qCatList")
//println(s"\ncatToQueriesDF:")
//println(s"${catToQueriesDF.show()}")
val similarQueriesByCatRDD = catToQueriesDF.rdd.flatMap{
    row:Row => {
        val queries = row.get(1).asInstanceOf[Seq[String]].filter(word => word.nonEmpty && word.charAt(0).isLetter).sorted
        if (queries.size >1) {
            val queriesOfInterest = cachedQueriesBroadcast.value intersect queries
            if (queriesOfInterest.nonEmpty) {
                val key = queriesOfInterest.head
                val rest = queries.filter(f => !f.equals(key))
                Some(key, rest)
            } else {                
                //Some((queries.head, queries.last))    
                Some((queries.head, queries.tail.toSeq))
            }        
        } else None
    }
}.distinct.cache
println(s"\nsimilarQueriesByCatRDD[${similarQueriesByCatRDD.count}]: ${similarQueriesByCatRDD.collect.mkString(", ")}")

val unionRDD = similarQueriesByAspectsRDD join similarQueriesByCatRDD
println(s"\nunionRDD=${unionRDD.collect.mkString(", ")}")

val unionFinal = unionRDD.flatMap {
    case (q, similarQueriesTuples) => {
        val queries: List[Any] = similarQueriesTuples.productIterator.toList //convert from N dimension tuple to list
        val duplicates = queries.groupBy(identity).collect { case (x, List(_,_,_*)) => x } // duplicates means entry appear in both cat and aspecect list
        if (duplicates.nonEmpty) {
            Some( duplicates.toSeq :+ q)
        } else None
        
    }
}
println(s"\nunionFinal=${unionFinal.collect.mkString(", ")}")





similarQueriesByAspectsRDD[2]: (iphone 8,WrappedArray(galaxy nexus, htc)), (iphone 8,WrappedArray(galaxy nexus, shoes))

similarQueriesByCatRDD[1]: (iphone 8,WrappedArray(galaxy nexus, htc))
                                                                                
unionRDD=(iphone 8,(WrappedArray(galaxy nexus, htc),WrappedArray(galaxy nexus, htc))), (iphone 8,(WrappedArray(galaxy nexus, shoes),WrappedArray(galaxy nexus, htc)))

unionFinal=List(WrappedArray(galaxy nexus, htc), iphone 8)


### Similar Queries - [Works, but EXPENSIVE]

In [None]:
def removeElementFromList(values: Seq[String], removeValue:String): Seq[String] = {
    var s : scala.collection.mutable.Set[String] = scala.collection.mutable.Set(values:_* )
    s -= removeValue
    s.toSeq
}

//queryAspectRaw.show(4)
// Step1: Group aspects to Queries
val aspectToQueriesDF = queryAspectRaw.groupBy("aspect").agg(collect_list("query") as "qList")
//println("aspectToQueriesDF:\n")
//aspectToQueriesDF.show()

// Queries are similar if they share one common aspect.
// Note: Converting DF to RDD yields a RDD[Row]
val similarQueriesByAspectsRDD = aspectToQueriesDF.rdd.flatMap{
    row:Row => {
        val queries = row.get(1).asInstanceOf[Seq[String]]
        queries.map{ q =>
            (q, queries.toSet)
        }
        if (queries.size >1) Some(queries.sorted) else None
    }
}.cache
println(s"\nsimilarQueriesByAspectsRDD[${similarQueriesByAspectsRDD.count}]: ${similarQueriesByAspectsRDD.collect.mkString(", ")}")


val catToQueriesDF = queryCatRaw.groupBy("catId").agg(collect_list("query") as "qList")
//catToQueriesDF.show()
val similarQueriesByCategoryRDD = catToQueriesDF.rdd.flatMap{
    row:Row => {
        val queries = row.get(1).asInstanceOf[Seq[String]]
        if (queries.size>1) Some(queries.sorted) else None
    }    
}.cache
println(s"\nsimilarQueriesByCategoryRDD[${similarQueriesByCategoryRDD.count}]: ${similarQueriesByCategoryRDD.collect.mkString(", ")}")

val finalCandidatesRDD = similarQueriesByAspectsRDD.cartesian(similarQueriesByCategoryRDD).filter{
    case(a:Seq[String], b:Seq[String]) => {
        //println(s"\t a=${a.mkString(",")}  b=${b.mkString(",")}")
        a.nonEmpty && b.nonEmpty && ((a intersect b).size > 1)
    }
}.map { 
    case(a:Seq[String], b:Seq[String]) => { a intersect b } 
}.distinct

println(s"\nfinalCandidatesRDD= ${finalCandidatesRDD.collect().mkString(", ")}")

println(s"\nFind Similar Queries. ${Calendar.getInstance().getTime()}")


similarQueriesByAspectsRDD[2]: WrappedArray(galaxy nexus, iphone 8), WrappedArray(galaxy nexus, iphone 8, shoes)

similarQueriesByCategoryRDD[1]: WrappedArray(.095 echo crossfire trimmer line, galaxy nexus, iphone 8)


### Scratchpad

In [9]:
val inputRDD = sc.parallelize(Seq(("maths", 50), ("maths", 60), ("english", 65)))
println(s"\ninputRDD: ${inputRDD.collect.mkString("\n")}")

val mappedRDD = inputRDD.mapValues(mark => (mark, 1));
println(s"\nmappedRDD: ${mappedRDD.collect.mkString("\n")}")

val reducedRDD = mappedRDD.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
println(s"\nreducedRDD: ${reducedRDD.collect.mkString("\n")}")

val averageRDD = reducedRDD.map { x =>
    val temp = x._2
    val total = temp._1
    val count = temp._2
    (x._1, total / count)
}
println(s"\naverageRDD: ${averageRDD.collect.mkString("\n")}")

print("Successfully defined WIP functions. ${Calendar.getInstance().getTime()}")


inputRDD: (maths,50)
(maths,60)
(english,65)

mappedRDD: (maths,(50,1))
(maths,(60,1))
(english,(65,1))

reducedRDD: (english,(65,1))
(maths,(110,2))

averageRDD: (english,65)
(maths,55)
Successfully defined WIP functions. ${Calendar.getInstance().getTime()}

In [None]:
/*
val qA1 = queryAspectDF.select(queryAspectDF("query"), queryAspectDF("aspect"))
qA1.show

val qA2 = qA1.groupBy("query").agg(collect_list("aspect") as "list")


val qA3RDD = qA2.rdd.flatMap{ 
    r => {
        val queryTokens: Seq[String] = r.get(0).asInstanceOf[String].split(" ")
        val aspects: Seq[String] = r.get(1).asInstanceOf[Seq[String]].map{_.replaceAll("\\s", "_")}

        aspects.map { a =>
            a + " " + queryTokens.mkString(s" $a ") + " " + a
        }
        //(queryTokens, aspects)
    }
}
println(s"qA3RDD: ${qA3RDD.count} \n${qA3RDD.collect.mkString("\n")}\n")
*/

### Convert RELATIONSHIPS to DF

# (S3) Tensor flow program