Skip to content

Commit

Permalink
visit depth topology fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranab Ghosh committed Sep 29, 2014
1 parent 21219d0 commit 670133e
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 21 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Expand Up @@ -19,7 +19,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma
<properties>
<log4j.version>1.2.16</log4j.version>
<hadoop.version>0.20.2-cdh3u2</hadoop.version>
<jackson.version>1.6.3</jackson.version>
<jackson.version>1.9.13</jackson.version>
<jdk.level>1.6</jdk.level>
</properties>
</profile>
Expand Down Expand Up @@ -86,13 +86,13 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/ma

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-lgpl </artifactId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-lgpl </artifactId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.version}</version>
</dependency>

Expand Down
34 changes: 34 additions & 0 deletions resource/bounce.properties
@@ -0,0 +1,34 @@
debug.on=true

#VisitTopology
spout.threads=1
visit.session.bolt.threads=2
visit.depth.bolt.threads=1
num.workers=1
visit.session.tick.freq.sec=5
visit.depth.tick.freq.sec=10
max.spout.pending=1000
max.task.parallelism=100

#VisitSpout
redis.server.host=localhost
redis.server.port=6379
redis.log.queue=logQueue
date.ordinal=0
time.ordinal=1
url.ordinal=4
session.regex=.*?sessionid=(\\S{36}).*

#VisitSessionBolt
page.id.pattern=/(home\\S).*
logOut.pattern=logout
session.timeOut=30

#VisitDepthBolt
window.size=20
result.type=bounceRate
visit.depth.stat.queue=visitStatQueue




15 changes: 15 additions & 0 deletions resource/build_storm.xml
@@ -0,0 +1,15 @@
<project name="visitante_for_storm" default="uber-jar" basedir=".">
<target name="uber-jar">
<echo>Packaging visitante into a single uber JAR</echo>
<jar destfile="uber-visitante-1.0.jar">
<zipgroupfileset dir="/Users/pranab/Projects/lib"
includes="jedis-2.2.1.jar,commons-pool-1.5.5.jar,guava-14.0.1.jar,jackson-core-asl-1.9.13.jar,jackson-mapper-asl-1.9.13.jar" />
<zipgroupfileset dir="/Users/pranab/Projects/visitante/target"
includes="visitante-1.0.jar, " />
<zipgroupfileset dir="/Users/pranab/Projects/chombo/target"
includes="chombo-1.0.jar, " />
<zipgroupfileset dir="/Users/pranab/Projects/hoidla/target"
includes="hoidla-1.0.jar, " />
</jar>
</target>
</project>
45 changes: 45 additions & 0 deletions script/bounce.sh
@@ -0,0 +1,45 @@
#!/bin/bash

if [ $# -lt 1 ]
then
echo "Usage : $0 operation"
exit
fi

case "$1" in
"genLogs")
# ./bounce.py <num_items> <num_sessions>
echo "generating events and pushing to redis queue"
./bounce.py genLogs $2 $3
;;

"buildJar")
echo "building uber jar"
ant -f build_storm.xml
;;

"startStorm")
echo "starting storm"
storm nimbus &
sleep 5
storm supervisor &
sleep 5
storm ui &
;;

"deployTopology")
echo "deploying visitante storm topology"
storm jar uber-visitante-1.0.jar org.visitante.realtime.VisitTopology bounce bounce.properties
;;

"killTopology")
echo "killing visitante storm topology"
storm kill bounce
;;

*)
echo "unknown operation $1"
;;

esac

28 changes: 25 additions & 3 deletions script/python/app/bounce.py
Expand Up @@ -14,21 +14,22 @@
items = []
pagesGET = ["/home","/shoppingCart","/product","/onSale","/brands", "/about"]
pagesPOST = ["/addToCart","/checkOut","/billing","/confirmShipping","/placeOrder"]
homePages = ["home1", "home2", "home3"]
rc = redis.StrictRedis(host='localhost', port=6379, db=0)
epochInterval = 10

#Fields: date time c-ip cs-method cs-uri-stem sc-status cs(User-Agent) cs(Referrer)
#2002-05-24 20:18:01 172.224.24.114 GET /Default.htm - 200 Mozilla/4.0+(compatible;+MSIE+5.01;+Windows+2000+Server) http://64.224.24.114/

def genLogs(threadName, items, maxPages, pagesGET, pagesPOST):
def genLogs(threadName, items, maxPages, pagesGET, pagesPOST, homePages):
numItemsForThisUser = randint(10, 20)
itemsForThisUser = selectRandomSubListFromList(items, numItemsForThisUser)
session = str(uuid.uuid1())
ipAddress = genIpAddress()
status = 200

method = "GET"
page = "/home" + "?sessionid=" + session
page = "/" + selectRandomFromList(homePages) + "?sessionid=" + session
createLog(page, ipAddress, method, status)
if (maxPages == 1):
return
Expand Down Expand Up @@ -98,6 +99,22 @@ def createLog(page, ipAddress, method, status):
print log
rc.lpush("logQueue", log)

def readLogQueue():
while True:
line = rc.rpop("logQueue")
if line is not None:
print line
else:
break

def readStatQueue():
while True:
line = rc.rpop("visitStatQueue")
if line is not None:
print line
else:
break

#command processing
op = sys.argv[1]
if (op == "genLogs"):
Expand All @@ -123,11 +140,16 @@ def createLog(page, ipAddress, method, status):
maxPage = 1
else:
maxPage = randint(2, pageCountMax)
t = threading.Thread(target=genLogs, args=(threadName, items,maxPage,pagesGET,pagesPOST, ))
t = threading.Thread(target=genLogs, args=(threadName, items,maxPage,pagesGET,pagesPOST,homePages ))
t.start()
time.sleep(randint(6,12))

except:
print "Error: unable to start thread"

elif (op == "readLogQueue"):
readLogQueue()

elif (op == "readStatQueue"):
readStatQueue()

7 changes: 3 additions & 4 deletions src/main/java/org/visitante/realtime/VisitDepthBolt.java
Expand Up @@ -23,8 +23,8 @@
import java.util.Map;
import java.util.TreeMap;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.chombo.storm.GenericBolt;
import org.chombo.storm.MessageHolder;
import org.chombo.util.ConfigUtility;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class VisitDepthBolt extends GenericBolt {
private Jedis jedis;
private String vistDepthStatQueue;

private static final Logger LOG = Logger.getLogger(VisitDepthBolt.class);
private static final Logger LOG = LoggerFactory.getLogger(VisitDepthBolt.class);

public VisitDepthBolt(int tickFrequencyInSeconds) {
super();
Expand All @@ -70,7 +70,6 @@ public Map<String, Object> getComponentConfiguration() {
public void intialize(Map stormConf, TopologyContext context) {
debugOn = ConfigUtility.getBoolean(stormConf,"debug.on", false);
if (debugOn) {
LOG.setLevel(Level.INFO);
}
windowSize = ConfigUtility.getInt(stormConf, "window.size");
resultType = ConfigUtility.getString(stormConf, "result.type");
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/visitante/realtime/VisitDepthSpout.java
Expand Up @@ -25,8 +25,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.chombo.storm.GenericSpout;
import org.chombo.storm.MessageHolder;
import org.chombo.util.ConfigUtility;
Expand All @@ -44,8 +44,8 @@ public class VisitDepthSpout extends GenericSpout {
private int dateOrd;
private int timeOrd;
private int urlOrd;
private Pattern pattern = Pattern.compile(".+?sessionid=(\\S{36}).+");
private static final Logger LOG = Logger.getLogger(VisitDepthSpout.class);
private Pattern pattern;
private static final Logger LOG = LoggerFactory.getLogger(VisitDepthSpout.class);

@Override
public void close() {
Expand Down Expand Up @@ -79,9 +79,9 @@ public void intialize(Map stormConf, TopologyContext context) {
timeOrd = ConfigUtility.getInt(stormConf, "time.ordinal");
urlOrd = ConfigUtility.getInt(stormConf, "url.ordinal");
seesionRegex = ConfigUtility.getString(stormConf, "session.regex");
pattern = Pattern.compile(seesionRegex);
debugOn = ConfigUtility.getBoolean(stormConf,"debug.on", false);
if (debugOn) {
LOG.setLevel(Level.INFO);;
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/visitante/realtime/VisitSessionBolt.java
Expand Up @@ -24,8 +24,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.chombo.storm.GenericBolt;
import org.chombo.storm.MessageHolder;
import org.chombo.util.ConfigUtility;
Expand All @@ -51,7 +51,7 @@ public class VisitSessionBolt extends GenericBolt {
private long sessionTimeout;
private MessageHolder msg;
private List<String> expiredSessions = new ArrayList<String>();
private static final Logger LOG = Logger.getLogger(VisitSessionBolt.class);
private static final Logger LOG = LoggerFactory.getLogger(VisitSessionBolt.class);

public VisitSessionBolt(int tickFrequencyInSeconds) {
super();
Expand All @@ -69,7 +69,6 @@ public Map<String, Object> getComponentConfiguration() {
public void intialize(Map stormConf, TopologyContext context) {
debugOn = ConfigUtility.getBoolean(stormConf,"debug.on", false);
if (debugOn) {
LOG.setLevel(Level.INFO);
}
pageIdPatternStr = ConfigUtility.getString(stormConf, "page.id.pattern");
pageIdPattern = Pattern.compile(pageIdPatternStr);
Expand All @@ -95,7 +94,7 @@ public boolean process(Tuple input) {
String pageId = sessDetail.getRight();
if (pageId != null) {
msg = new MessageHolder();
msg.setMessage(new Values(timeStamps.size()));
msg.setMessage(new Values(pageId, timeStamps.size()));
outputMessages.add(msg);
}
expiredSessions.add(sessionID);
Expand All @@ -115,7 +114,8 @@ public boolean process(Tuple input) {
if (null == sessDetail) {
//new session
timeStamps = new ArrayList<Long>();
sessions.put(sessionID, new SessionDetail(timeStamps, pageId));
sessDetail = new SessionDetail(timeStamps, pageId);
sessions.put(sessionID, sessDetail);
} else {
timeStamps = sessDetail.getLeft();
if (null != pageId) {
Expand Down

0 comments on commit 670133e

Please sign in to comment.