Skip to content

Commit

Permalink
Merge pull request apache#70 from wangtuanjie/wtj6
Browse files Browse the repository at this point in the history
Wtj6
  • Loading branch information
wangtuanjie committed Dec 4, 2016
2 parents 237de94 + 08d54fe commit 73d4d81
Showing 1 changed file with 27 additions and 4 deletions.
Expand Up @@ -134,29 +134,52 @@ object Main {
classOf[MongoOutputFormat[Object, BSONObject]],
min5Cfg)

val dayMgo = r.map(x => {
val dayDurationMgo = r.map(x => {
(x.uid, x.duration)
}).reduceByKey(_ + _).map(x => {
val uid: Int = x._1

val duration: Long = x._2

val query = new BasicBSONObject("time", day)
.append("uid", uid)

val set = new BasicBSONObject("duration", duration)
new MongoUpdateWritable(query, new BasicBSONObject("$set", set), true, false)
})

// 每天的最大并发数以分钟为粒度
// 每个房间每个用户 算1路
val dayConcurrentMgo = r.map(x => {
((x.uid, x.time / 60 * 60), (x.roomID, x.uid2))
}).groupByKey().map(x => {
(x._1._1, x._2.toStream.distinct.size)
}).reduceByKey(math.max).map(x => {
val uid: Int = x._1
val concurrent: Long = x._2

val query = new BasicBSONObject("time", day)
.append("uid", uid)
val set = new BasicBSONObject("concurrent", concurrent)
new MongoUpdateWritable(query, new BasicBSONObject("$set", set), true, false)
})


val dayCfg = new Configuration()
dayCfg.set("mongo.output.uri", dayMgoCfg.URI)
dayMgo.map((null, _)).saveAsNewAPIHadoopFile(

dayDurationMgo.map((null, _)).saveAsNewAPIHadoopFile(
"file://this-is-not-used",
classOf[Object],
classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]],
dayCfg)

dayConcurrentMgo.map((null, _)).saveAsNewAPIHadoopFile(
"file://this-is-not-used",
classOf[Object],
classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]],
dayCfg)

}

}

0 comments on commit 73d4d81

Please sign in to comment.