Skip to content

Commit

Permalink
added analysis
Browse files Browse the repository at this point in the history
  • Loading branch information
kykamath committed Feb 13, 2012
1 parent a520338 commit 6e0ffbc
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 13 deletions.
40 changes: 40 additions & 0 deletions .gitignore
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,40 @@
# Compiled source #
###################
*.com
*.class
*.dll
*.exe
*.o
*.so
*.pyc
*.stream
*.gr
# Packages #
############
# it's better to unpack these files and commit the raw source
# git has its own built in compression methods
*.7z
*.dmg
*.iso
*.jar
*.rar
*.tar
*.zip

# Logs and databases #
######################
*.log
*.sql
*.sqlite
*.bbl
*.aux
*.blg
*.pdf
*.dvi

# OS generated files #
######################
.DS_Store?
ehthumbs.db
Icon?
Thumbs.db
31 changes: 31 additions & 0 deletions hashtags_for_locations/analysis.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,31 @@
'''
Created on Nov 19, 2011
@author: kykamath
'''
from settings import hdfsInputFolder, hashtagsWithoutEndingWindowFile, hashtagsWithEndingWindowFile
from datetime import datetime
from dateutil.relativedelta import relativedelta
from library.mrjobwrapper import runMRJob
from mr_analysis import MRAnalysis
from hashtags_for_locations.mr_analysis import PARAMS_DICT
from library.file_io import FileIO

def getInputFiles(startTime, endTime, folderType='world'):
current=startTime
while current<=endTime:
yield hdfsInputFolder%folderType+'%s_%s'%(current.year, current.month)
current+=relativedelta(months=1)

def mr_analysis(startTime, endTime, outputFolder):
outputFile = hashtagsWithEndingWindowFile%outputFolder
runMRJob(MRAnalysis, outputFile, getInputFiles(startTime, endTime), jobconf={'mapred.reduce.tasks':300})
FileIO.writeToFileAsJson(PARAMS_DICT, outputFile)

if __name__ == '__main__':
startTime, endTime, outputFolder = datetime(2011, 4, 1), datetime(2012, 1, 31), 'complete' # Complete duration
# startTime, endTime, outputFolder = datetime(2011, 5, 1), datetime(2011, 12, 31), 'complete_prop' # Complete propagation duration
# startTime, endTime, outputFolder = datetime(2011, 5, 1), datetime(2011, 10, 31), 'training' # Training duration
# startTime, endTime, outputFolder = datetime(2011, 11, 1), datetime(2011, 12, 31), 'testing' # Testing duration

mr_analysis(startTime, endTime, outputFolder)
30 changes: 17 additions & 13 deletions hashtags_for_locations/mr_analysis.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from library.mrjobwrapper import ModifiedMRJob from library.mrjobwrapper import ModifiedMRJob
from library.geo import getLatticeLid, getLattice, isWithinBoundingBox,\ from library.geo import getLatticeLid, getLattice, isWithinBoundingBox,\
getLocationFromLid, getHaversineDistance, getCenterOfMass getLocationFromLid, getHaversineDistance, getCenterOfMass
import cjson, time, datetime import cjson, time
from datetime import datetime
from collections import defaultdict from collections import defaultdict
from itertools import groupby from itertools import groupby
import numpy as np import numpy as np
Expand All @@ -23,14 +24,18 @@
MIN_HASHTAG_OCCURENCES = 250 MIN_HASHTAG_OCCURENCES = 250


# Time windows. # Time windows.
HASHTAG_STARTING_WINDOW = time.mktime(datetime.datetime(2011, 4, 1).timetuple()) startTime, endTime = datetime(2011, 4, 1), datetime(2012, 1, 31) # Complete duration
HASHTAG_ENDING_WINDOW = time.mktime(datetime.datetime(2012, 1, 31).timetuple()) #startTime, endTime = datetime(2011, 5, 1), datetime(2011, 12, 31) # Complete propagation duration
#HASHTAG_STARTING_WINDOW = time.mktime(datetime.datetime(2011, 5, 1).timetuple()) #startTime, endTime = datetime(2011, 5, 1), datetime(2011, 10, 31) # Training duration
#HASHTAG_ENDING_WINDOW = time.mktime(datetime.datetime(2011, 12, 31).timetuple()) #startTime, endTime = datetime(2011, 11, 1), datetime(2011, 12, 31) # Testing duration
#HASHTAG_STARTING_WINDOW = time.mktime(datetime.datetime(2011, 5, 1).timetuple()) HASHTAG_STARTING_WINDOW, HASHTAG_ENDING_WINDOW = time.mktime(startTime.timetuple()), time.mktime(endTime.timetuple())
#HASHTAG_ENDING_WINDOW = time.mktime(datetime.datetime(2011, 10, 31).timetuple())
#HASHTAG_STARTING_WINDOW = time.mktime(datetime.datetime(2011, 11, 1).timetuple()) # Parameters for the MR Job that will be logged.
#HASHTAG_ENDING_WINDOW = time.mktime(datetime.datetime(2011, 12, 31).timetuple()) PARAMS_DICT = dict(PARAMS_DICT = True,
LATTICE_ACCURACY=LATTICE_ACCURACY,
MIN_HASHTAG_OCCURENCES=MIN_HASHTAG_OCCURENCES,
HASHTAG_STARTING_WINDOW = HASHTAG_STARTING_WINDOW, HASHTAG_ENDING_WINDOW = HASHTAG_ENDING_WINDOW,
)


def iterateHashtagObjectInstances(line): def iterateHashtagObjectInstances(line):
data = cjson.decode(line) data = cjson.decode(line)
Expand All @@ -39,7 +44,6 @@ def iterateHashtagObjectInstances(line):
else: l = data['bb'] else: l = data['bb']
t = time.mktime(getDateTimeObjectFromTweetTimestamp(data['t']).timetuple()) t = time.mktime(getDateTimeObjectFromTweetTimestamp(data['t']).timetuple())
point = getLattice(l, LATTICE_ACCURACY) point = getLattice(l, LATTICE_ACCURACY)
# if isWithinBoundingBox(point, BOUNDING_BOX):
for h in data['h']: yield h.lower(), [point, t] for h in data['h']: yield h.lower(), [point, t]


def getHashtagWithoutEndingWindow(key, values): def getHashtagWithoutEndingWindow(key, values):
Expand All @@ -62,10 +66,10 @@ def getHashtagWithEndingWindow(key, values):
if numberOfInstances>=MIN_HASHTAG_OCCURENCES and \ if numberOfInstances>=MIN_HASHTAG_OCCURENCES and \
e[1]>=HASHTAG_STARTING_WINDOW and l[1]<=HASHTAG_ENDING_WINDOW: return {'h': key, 't': numberOfInstances, 'e':e, 'l':l, 'oc': sorted(occurences, key=lambda t: t[1])} e[1]>=HASHTAG_STARTING_WINDOW and l[1]<=HASHTAG_ENDING_WINDOW: return {'h': key, 't': numberOfInstances, 'e':e, 'l':l, 'oc': sorted(occurences, key=lambda t: t[1])}


class MRAreaAnalysis(ModifiedMRJob): class MRAnalysis(ModifiedMRJob):
DEFAULT_INPUT_PROTOCOL='raw_value' DEFAULT_INPUT_PROTOCOL='raw_value'
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(MRAreaAnalysis, self).__init__(*args, **kwargs) super(MRAnalysis, self).__init__(*args, **kwargs)
self.hashtags = defaultdict(list) self.hashtags = defaultdict(list)
''' Start: Methods to get hashtag objects ''' Start: Methods to get hashtag objects
''' '''
Expand Down Expand Up @@ -93,4 +97,4 @@ def steps(self):
# return self.jobsToGetHastagObjectsWithoutEndingWindow() # return self.jobsToGetHastagObjectsWithoutEndingWindow()


if __name__ == '__main__': if __name__ == '__main__':
MRAreaAnalysis.run() MRAnalysis.run()
10 changes: 10 additions & 0 deletions hashtags_for_locations/settings.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,10 @@
'''
Created on Feb 12, 2012
@author: kykamath
'''
hdfsInputFolder = 'hdfs:///user/kykamath/geo/hashtags/%s/'

analysisFolder = '/mnt/chevron/kykamath/data/geo/hashtags/hashtags_for_locations/%s/'
hashtagsWithoutEndingWindowFile = analysisFolder+'hashtagsWithoutEndingWindow'
hashtagsWithEndingWindowFile = analysisFolder+'hashtagsWithEndingWindow'

0 comments on commit 6e0ffbc

Please sign in to comment.