Skip to content

Commit

Permalink
updated version of kudu
Browse files Browse the repository at this point in the history
  • Loading branch information
tmalaska committed May 19, 2016
1 parent 3263dda commit 0abb29e
Show file tree
Hide file tree
Showing 13 changed files with 453 additions and 116 deletions.
52 changes: 52 additions & 0 deletions GamerSetup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@

ssh root@mriggs-strata-1.vpc.cloudera.com

scp -i "tedm2.pem" KuduSpark.jar ec2_user@ec2-52-36-220-83.us-west-2.compute.amazonaws.com:./

--Setting up Kafka
kafka-topics --zookeeper mriggs-strata-1.vpc.cloudera.com:2181 --create --topic gamer --partitions 1 --replication-factor 1
kafka-topics --zookeeper mriggs-strata-1.vpc.cloudera.com:2181 --list
kafka-console-producer --broker-list mriggs-strata-1.vpc.cloudera.com:9092 --topic test
kafka-cocsole-consumer --zookeeper mriggs-strata-1.vpc.cloudera.com:2181 --topic gamer --from-beginning

vi .bash_profile
export PATH=/usr/java/jdk1.7.0_67-cloudera/bin/:$PATH
export JAVA_HOME=/usr/java/jdk1.7.0_67-cloudera/

##Populating Kafka
java -cp KuduSpark.jar org.kududb.spark.demo.gamer.KafkaProducerGenerator mriggs-strata-1.vpc.cloudera.com:9092 gamer 10000 300 1000

##create Table
java -cp KuduSpark.jar org.kududb.spark.demo.gamer.CreateKuduTable ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer 3

##Run Spark Streaming
spark-submit \
--master yarn --deploy-mode client \
--executor-memory 2G \
--num-executors 2 \
--jars kudu-mapreduce-0.1.0-20150903.033037-21-jar-with-dependencies.jar \
--class org.kududb.spark.demo.gamer.GamerAggergatesSparkStreaming KuduSpark.jar \
mriggs-strata-1.vpc.cloudera.com:9092 gamer mriggs-strata-1.vpc.cloudera.com gamer C

##Run SparkSQL
spark-submit \
--master yarn --deploy-mode client \
--executor-memory 2G \
--num-executors 2 \
--class org.kududb.spark.demo.gamer.GamerSparkSQLExample \
KuduSpark.jar ec2-52-36-220-83.us-west-2.compute.amazonaws.com l

##Run direct insert
java -cp KuduSpark.jar org.kududb.spark.demo.gamer.DirectDataGenerator ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer 3

##Impala
impala-shell
connect ec2-52-11-171-85.us-west-2.compute.amazonaws.com:21007;

java -cp KuduSpark.jar org.kududb.spark.demo.gamer.cdc.CreateGamerCDCKuduTable ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer_cdc 3

java -cp KuduSpark.jar org.kududb.spark.demo.gamer.cdc.DirectDataMultiThreadedInjector ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer_cdc 10 5 1000

java -cp KuduSpark.jar org.kududb.spark.demo.gamer.cdc.DirectDataMultiThreadedInjector ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer_cdc 100 5 5

java -cp KuduSpark.jar org.kududb.spark.demo.gamer.DropTable ec2-52-36-220-83.us-west-2.compute.amazonaws.com gamer_cdc
33 changes: 0 additions & 33 deletions GamerSetup.txt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.kududb.spark.demo.gamer.aggregates

import java.util
import java.util.ArrayList

import org.kududb.ColumnSchema.ColumnSchemaBuilder
import org.kududb.client.{CreateTableOptions, KuduClient}
import org.kududb.{ColumnSchema, Schema, Type}

object CreateGamerAggregatesKuduTable {
def main(args:Array[String]): Unit = {
if (args.length == 0) {
println("{kuduMaster} {tableName}")
return
}

val kuduMaster = args(0)
val tableName = args(1)
val numberOfBuckets = args(2).toInt

val kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
val columnList = new util.ArrayList[ColumnSchema]()

columnList.add(new ColumnSchemaBuilder("gamer_id", Type.STRING).key(true).build())
columnList.add(new ColumnSchemaBuilder("last_time_played", Type.INT64).key(false).build())
columnList.add(new ColumnSchemaBuilder("games_played", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("games_won", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("oks", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("deaths", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("damage_given", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("damage_taken", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("max_oks_in_one_game", Type.INT32).key(false).build())
columnList.add(new ColumnSchemaBuilder("max_deaths_in_one_game", Type.INT32).key(false).build())
val schema = new Schema(columnList)

if (kuduClient.tableExists(tableName)) {
println("Deleting Table")
kuduClient.deleteTable(tableName)
}
val builder = new CreateTableOptions()

val hashColumnList = new ArrayList[String]
hashColumnList.add("gamer_id")

builder.addHashPartitions(hashColumnList, numberOfBuckets)

println("Creating Table")
kuduClient.createTable(tableName, schema, builder)
println("Created Table")
kuduClient.shutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.kududb.spark.demo.gamer.aggregates

import java.util.Random

import org.kududb.client.KuduClient

object DirectDataInjector {

val random = new Random
def main(args:Array[String]): Unit = {

if (args.length == 0) {
println("<kuduMaster> <tableName> <numberOfRecords>")
return
}

val kuduMaster = args(0)
val tableName = args(1)
val numberOfRecords = args(2).toInt

val kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
val table = kuduClient.openTable(tableName)
val session = kuduClient.newSession()

table.newInsert()

for (i <- 0 to numberOfRecords) {
val record = GamerDataGenerator.makeNewGamerRecord(100000)
val op = table.newInsert()

val row = op.getRow
row.addString("gamer_id", record.gamerId)
row.addLong("last_time_played", record.lastTimePlayed)
row.addInt("games_played", record.gamesPlayed)
row.addInt("games_won", record.gamesWon)
row.addInt("oks", record.oks)
row.addInt("deaths", record.deaths)
row.addInt("damage_given", record.damageGiven)
row.addInt("damage_taken", record.damageTaken)
row.addInt("max_oks_in_one_game", record.maxOksInOneGame)
row.addInt("max_deaths_in_one_game", record.maxDeathsInOneGame)

session.apply(op)
}
session.flush()

kuduClient.close()


}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package org.kududb.spark.demo.gamer
package org.kududb.spark.demo.gamer.aggregates

import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.{HashPartitioner, SparkContext, SparkConf}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.kududb.client.Operation
import org.kududb.client.SessionConfiguration.FlushMode
import org.kududb.spark.KuduContext
import org.kududb.spark.KuduDStreamFunctions.GenericKuduDStreamFunctions
import org.kududb.spark.demo.gamer.{GamerEvent, GamerEventBuilder}

object GamerAggergatesSparkStreaming {

//Logger.getRootLogger.setLevel(Level.ERROR)

def main(args:Array[String]): Unit = {
if (args.length == 0) {
println("{brokerList} {topics} {kuduMaster} {tableName} {local}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.kududb.spark.demo.gamer.aggregates

import java.util.{Date, Random}

import org.kududb.spark.demo.gamer.GamerEvent

object GamerDataGenerator {

val random = new Random()
val averagePlayerPercentage = 40
val advancedPlayerPercentage = 80
val superStarPlayerPercentage = 100
var date = System.currentTimeMillis()

def makeNewGamerRecord(numOfGamers:Int): GamerEvent = {
println("date" + new Date(date))
date += 60000 * 60 * 6
val playerSelection = random.nextInt(100)
if (playerSelection < averagePlayerPercentage) {

val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
date,
1,
if (random.nextInt(10) > 7) 1 else 0,
random.nextInt(10),
random.nextInt(20),
random.nextInt(1000),
random.nextInt(2000))
} else if (playerSelection < advancedPlayerPercentage) {
val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
date,
1,
if (random.nextInt(10) > 5) 1 else 0,
random.nextInt(20),
random.nextInt(18),
random.nextInt(2000),
random.nextInt(2000))
} else {
val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
date,
1,
if (random.nextInt(10) > 3) 1 else 0,
random.nextInt(20),
random.nextInt(10),
random.nextInt(4000),
random.nextInt(1500))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.kududb.spark.demo.gamer
package org.kududb.spark.demo.gamer.aggregates

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}

object GamerSparkSQLExample {
def main(args:Array[String]): Unit = {
Expand Down Expand Up @@ -54,7 +54,7 @@ object GamerSparkSQLExample {

println("Query 3: SELECT * FROM gamer order_by last_time_played desc limit 100")
val startTimeQ3 = System.currentTimeMillis()
sqlContext.sql("SELECT * FROM gamer order_by last_time_played desc limit 100").take(100).foreach(r => {
sqlContext.sql("SELECT * FROM gamer order by last_time_played desc limit 100").take(100).foreach(r => {
println(" - (" + r + ")")
})
println("Finish Query 3: " + (System.currentTimeMillis() - startTimeQ3))
Expand All @@ -74,9 +74,14 @@ object GamerSparkSQLExample {
val array = Array(r.getInt(1).toDouble, r.getInt(2).toDouble, r.getInt(3).toDouble)
Vectors.dense(array)
})
val clusters = KMeans.train(parsedData, 3, 5)
clusters.clusterCenters.foreach(v => println(" Vector Center:" + v))

val dataCount = parsedData.count()

if (dataCount > 0) {
val clusters = KMeans.train(parsedData, 3, 5)
clusters.clusterCenters.foreach(v => println(" Vector Center:" + v))

}
//TODO add Mllib here
println("Finish Query 5 + MLLIB: " + (System.currentTimeMillis() - startTimeQ5))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package org.kududb.spark.demo.gamer
package org.kududb.spark.demo.gamer.aggregates

import java.util.{Random, Properties}
import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


object KafkaProducerGenerator {
object KafkaProducerInjector {

val random = new Random()
val averagePlayerPercentage = 40
val advancedPlayerPercentage = 80
val superStarPlayerPercentage = 100

def main(args:Array[String]): Unit = {
if (args.length == 0) {
Expand All @@ -28,7 +24,7 @@ object KafkaProducerGenerator {

for (i <- 0 until numOfRecords) {

val gamerRecord = makeNewGamerRecord(numOfGamers)
val gamerRecord = GamerDataGenerator.makeNewGamerRecord(numOfGamers)

val message = new ProducerRecord[String, String](topic, gamerRecord.gamerId.toString, gamerRecord.toString())

Expand Down Expand Up @@ -65,42 +61,5 @@ object KafkaProducerGenerator {
new KafkaProducer[String, String](kafkaProps)
}

def makeNewGamerRecord(numOfGamers:Int): GamerEvent = {
val playerSelection = random.nextInt(100)
if (playerSelection < averagePlayerPercentage) {

val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
System.currentTimeMillis(),
1,
if (random.nextInt(10) > 7) 1 else 0,
random.nextInt(10),
random.nextInt(20),
random.nextInt(1000),
random.nextInt(2000))
} else if (playerSelection < advancedPlayerPercentage) {
val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
System.currentTimeMillis(),
1,
if (random.nextInt(10) > 5) 1 else 0,
random.nextInt(20),
random.nextInt(18),
random.nextInt(2000),
random.nextInt(2000))
} else {
val gamerId = random.nextInt(numOfGamers/100) * 100 + playerSelection

new GamerEvent(gamerId.toString,
System.currentTimeMillis(),
1,
if (random.nextInt(10) > 3) 1 else 0,
random.nextInt(20),
random.nextInt(10),
random.nextInt(4000),
random.nextInt(1500))
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.kududb.spark.demo.gamer
package org.kududb.spark.demo.gamer.aggregates

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}

object SparkSQLCmd {
def main(args:Array[String]): Unit = {
Expand Down
Loading

0 comments on commit 0abb29e

Please sign in to comment.