/
BFSpark.scala
145 lines (121 loc) · 5.43 KB
/
BFSpark.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.esri.spark
import java.io.PrintWriter
import java.io.File
import com.twitter.algebird._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.broadcast.Broadcast
/*
* Uses Twitter's algebird BloomFilterMoniod to find top k spending users for marketing
*/
object BFSpark {
/*
* filter header
*/
def isHeader(line: String): Boolean = line.contains("userid")
/*
* validate phone
*/
def isEmail(item: String): Boolean = item.contains("@")
/*
* validate phone
*/
def hasPhoneNum(item: String): Boolean = item.contains("-") && item.contains("(")
/*
* filter transactions using users bloom filter
*/
def transactionsBloomFilter( usersBF:Broadcast[BF], userTransactionId : String):Boolean = {
println(userTransactionId + ", set membership : " + usersBF.value.contains(userTransactionId).isTrue)
usersBF.value.contains(userTransactionId).isTrue
}
/*
* do not call list filter
*/
def donotcallFilter(donotcallBC:Broadcast[Array[String]], phoneListStr:String ) : Boolean = {
if (phoneListStr != null){
//println(phoneListStr)
var phones = phoneListStr.split(",")
return donotcallBC.value.intersect(phones).size == 0
}
return false;
}
def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage BFSpark <input csv file>")
//bin\spark-submit --driver-memory 6g --executor-memory 6g
// --class com.esri.spark.BFSpark target\topk-users-bf-spark-0.1.jar users.txt donotcall.txt transactions.txt
return
}
val conf = new SparkConf()
.setAppName("SelectTopKUsers")
.set("spark.executor.memory", "6g")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val usersFile = args(0);
val donotcallFile = args(1)
val transactionsFile = args(2)
val usersRaw = sc.textFile(usersFile)
val usersRDD = usersRaw.filter(!isHeader(_))
//usersRDD.take(5).foreach(println)
//broadcast donot call list
val donotcallRaw = sc.textFile(donotcallFile)
val donotcallList = donotcallRaw.distinct().collect();
//donotcallList.foreach(println)
val donotcallBC = sc.broadcast[Array[String]](donotcallList)
//user data format: userid;name,email,phone1,phone2
//filter user dataset for valid data
//remove invalid email, phone numbers
//some users might have more than one phone numbers listed
val users = usersRDD.filter(hasPhoneNum(_))
.map(_.split(";"))
.map(_.filter(!isEmail(_)))
.map{case tokens => (tokens(0), tokens(1), tokens(2))} // emit(id, name, phone)
.filter {case (a, b, c) => donotcallFilter(donotcallBC, c)}
.map { case (a, b, c) => (a, (b,c))}
.cache()
println("Number of user records " + users.count)
println("Typical user record:" + users.first)
//users.take(5).foreach(println)
//create bloom filter for small dataset (users dataset)
//users dataset is smaller than transactions
val NUM_HASHES = 3
val WIDTH = 8192
val SEED = 1
val bf = BloomFilterMonoid(NUM_HASHES, WIDTH, SEED);
val usersBFRDD = users.map { case (userid, (name, phones)) =>
bf.create(userid)
}
val usersBF = usersBFRDD.reduce{ (a:BF, b:BF) => a ++ b}
//broadcast users bloomfilter
val usersBFBC = sc.broadcast[BF](usersBF)
val transactionsRaw = sc.textFile(transactionsFile)
//println("User transactions")
//transactionsRaw.take(5).foreach(println)
val YEAR = "2015"
val transactionsRDD = transactionsRaw.map(_.replace("$", ""))
.map(_.replace("-", ";")) //replace 2015-09-05 with 2015;09;05 to split and remove month/date
.map(_.split(";")) // convert to Array(815581247, 144.82, 2015, 09, 05)
.filter{ x => x(2) == YEAR} // filter by 2015 entries
.map{arr => (arr(0), (arr(1).toDouble))} // keep customerID and amount
.reduceByKey(_ + _)
.filter { case(id, dollar) => transactionsBloomFilter(usersBFBC, id)}
.join(users)
.map {case (userid, (dollar, (name, phones))) => UserTransaction (userid, name, phones, dollar)}
.cache()
println("Number of transaction records " + transactionsRDD.count)
println("Typical merged transaction record:" + transactionsRDD.first)
val SEP = ";"
val NUM_OF_USER = 1000
val topUsers = transactionsRDD
.takeOrdered(NUM_OF_USER)(Ordering[Double].reverse.on(x=>x.amount))
.map(x => x.id + SEP + x.name + SEP + x.phone + SEP + "$" + "%.2f".format(x.amount))
val pw = new PrintWriter(new File("./data/top_users.txt" ))
for (elem <- topUsers) {
pw.write(elem)
pw.write("\n")
}
pw.close
}
}
case class UserTransaction(id:String, name: String, phone: String, amount:Double)