Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Periodic update

  • Loading branch information...
commit e70be46bf5eee44921e72ea63ceea441eb820e10 1 parent 5c7aea1
@cedricsam cedricsam authored
View
73 blogs.parse.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import sys
+import pg
+import mypass
+import datetime
+import time
+import rfc822
+import urllib2
+import httplib
+from xml.dom import minidom
+
+try:
+ blogid = int(sys.argv[1])
+except:
+ print "Missing blog ID"
+ sys.exit()
+
+try:
+ url = sys.argv[2]
+except:
+ print "Missing URL"
+ sys.exit()
+
+p = urllib2.urlopen(url, timeout=30)
+txt = p.read()
+
+try:
+ dom = minidom.parseString(txt)
+except Exception as e:
+ print e
+ print "Invalid URL: " + url
+
+pgconn = mypass.getConn()
+
+for item in dom.getElementsByTagName('item'):
+ r = dict()
+ r["blogid"] = blogid
+ for a in ["title", "link", "guid", "description", "author", "comments", "category"]:
+ att = None
+ try:
+ att = item.getElementsByTagName(a)[0].firstChild.data
+ r[a] = att.encode("utf8")
+ except:
+ #print "does not exist: " + a
+ r[a] = att
+ try:
+ pubDate = item.getElementsByTagName("pubDate")[0].firstChild.data
+ #pubDate_dt = datetime.datetime.strptime(pubDate, '%a, %d %b %Y %H:%M:%S %z')
+ #print pubDate
+ try:
+ pubDate_dt = rfc822.parsedate_tz(pubDate)
+ pubDate_str = time.strftime("%Y-%m-%d %H:%M:%S", pubDate_dt[0:9])
+ tz = pubDate.split()
+ tz_str = tz[len(tz)-1]
+ r["pubdate"] = pubDate_str + " " + tz_str
+ except:
+ try:
+ r["pubdate"] = pubDate.replace("/","-") + " +0800"
+ except:
+ r["pubdate"] = pubDate + " +0800"
+ #print r
+ except Exception as e:
+ print e
+ continue
+ try:
+ pgconn.insert("blogs_entries", r)
+ except Exception as e:
+ print e
+ continue
+
+pgconn.close()
View
183 hkforums.search.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+
+import sys, os
+import time, datetime
+import csv
+import pg
+import re
+import lucene
+import mypass, sinaweibooauth
+
+class SearchForums(object):
+ """Usage: hkforums.search.py [-ds|-de DATE] terms <forum name>"""
+
+ pgconn = None
+ STORE_BASE_DIR = "/var/data/lucene/"
+ STORE_DIR = ""
+ supported_forums = ["uwants", "discuss", "hkreporter"]
+ analysers = list()
+ searcher = None
+ MAX_ITEMS = 1000
+ forum = ""
+
+ def __init__(self, forumname):
+ if not forumname in self.supported_forums:
+ sys.exit()
+ else:
+ self.forum = forumname
+ self.STORE_DIR = self.STORE_BASE_DIR + forumname
+ smartcn = lucene.SmartChineseAnalyzer(lucene.Version.LUCENE_33)
+ self.analyzers = { "smartcn": smartcn }
+ directory = lucene.SimpleFSDirectory(lucene.File(self.STORE_DIR))
+ self.searcher = lucene.IndexSearcher(directory, True)
+ self.pgconn = mypass.getConn()
+
+ def prepareDates(self, datestring):
+ if datestring is None:
+ return None
+ try:
+ mydate = time.strptime(datestring, "%Y-%m-%d")
+ except ValueError:
+ try:
+ mydate = time.strptime(datestring, "%Y-%m-%d %H:%M")
+ except ValueError, TypeError:
+ return None
+ return int(time.mktime(mydate))
+
+ def searchForums(self, q, time_start_secs, time_end_secs, uids=list(), offset=None, floor=None):
+ if offset <> None:
+ try:
+ offset = int(offset)
+ if offset > self.MAX_ITEMS:
+ self.MAX_ITEMS = offset + 100
+ except:
+ pass
+ page_start = page_end = None
+ if floor <> None and len(floor) > 0:
+ m = re.match(r"(\d+)-?(\d*)", floor)
+ if m <> None:
+ page_start = int(m.group(1))
+ try:
+ page_end = int(m.group(2))
+ except:
+ page_end = page_start
+ startexec = datetime.datetime.now()
+ first = True
+ query = lucene.BooleanQuery()
+ query.setMaxClauseCount(2097152)
+ sorter = lucene.Sort(lucene.SortField("time", lucene.SortField.INT, True))
+ pageFilter = None
+ if len(q) > 0:
+ query.add(lucene.QueryParser(lucene.Version.LUCENE_33, "content", self.analyzers["smartcn"]).parse(q), lucene.BooleanClause.Occur.MUST)
+ dateFilter = lucene.NumericRangeFilter.newIntRange("time", time_start_secs, time_end_secs, True, True)
+ else:
+ query.add(lucene.NumericRangeQuery.newIntRange("time", time_start_secs, time_end_secs, True, True), lucene.BooleanClause.Occur.MUST)
+ if page_start <> None and page_end <> None:
+ pageFilter = lucene.NumericRangeFilter.newIntRange("floor", page_start, page_end, True, True)
+ topScoreCollector = lucene.TopScoreDocCollector
+ if len(uids) > 0:
+ uids_str = list()
+ numfilters = list()
+ count = 0
+ for x in uids:
+ count += 1
+ uids_str.append(str(x))
+ numfilter = lucene.NumericRangeFilter.newIntRange("uid", x, x, True, True)
+ numfilters.append(numfilter)
+ #if count > 1000:
+ # break
+ chainedNumFilters = lucene.ChainedFilter(numfilters, lucene.ChainedFilter.OR)
+ cachingChainedNumFilters = lucene.CachingWrapperFilter(chainedNumFilters)
+ if len(q) > 0:
+ chain = lucene.ChainedFilter([cachingChainedNumFilters,dateFilter, pageFilter], lucene.ChainedFilter.AND)
+ else:
+ chain = cachingChainedNumFilters
+ topDocs = self.searcher.search(query, chain, sorter)
+ else:
+ if len(q) > 0 and time_start_secs is not None and time_end_secs is not None:
+ if pageFilter is not None:
+ filters = [dateFilter, pageFilter]
+ chainedFilters = lucene.ChainedFilter(filters, lucene.ChainedFilter.AND)
+ topDocs = self.searcher.search(query, chainedFilters, self.MAX_ITEMS, sorter)
+ else:
+ topDocs = self.searcher.search(query, dateFilter, self.MAX_ITEMS, sorter)
+ else:
+ if pageFilter is not None:
+ topDocs = self.searcher.search(query, pageFilter, self.MAX_ITEMS, sorter)
+ else:
+ topDocs = self.searcher.search(query, self.MAX_ITEMS, sorter)
+ #return "%(nb)d results found in %(secs)f seconds" %
+ ids = list()
+ ids_str = list()
+ hits = list()
+ count = 0
+ for scoreDoc in topDocs.scoreDocs:
+ count += 1
+ doc = self.searcher.doc(scoreDoc.doc)
+ id = doc.get("pid")
+ uid = doc.get("uid")
+ tid = doc.get("tid")
+ #ids.append(id)
+ hit = { "pid": id, "uid": uid, "tid": tid }
+ hits.append(hit)
+ #ids_str.append(str(id))
+ #if count > self.MAX_ITEMS:
+ #break
+ out = { "totalhits": topDocs.totalHits, "nb_users": len(uids), "ids": ids, "q": q, "hits": hits }
+ out["lucene_query_finished"] = long(time.mktime(datetime.datetime.now().timetuple())) * 1000
+ if len(uids) > 0:
+ out["user_ids"] = uids_str
+ # Logging
+ f = open("/var/data/hkforums/searchlog/%(forum)s.log" % {"forum": self.forum},"a")
+ f.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + "\t" + q + "\n")
+ f.close()
+ endexec = datetime.datetime.now()
+ td = endexec - startexec
+ microtime = td.microseconds + (td.seconds + td.days * 86400) * 1000000
+ secondstime = microtime / 1000000.0
+ out["secs"] = secondstime
+ print out
+ return out
+
+if __name__ == '__main__':
+ if len(sys.argv) <= 1:
+ print SearchSinaWeibo.__doc__
+ sys.exit(1)
+ inargs = False
+ datestart_str = None
+ dateend_str = None
+ for i in range(1, len(sys.argv)):
+ if sys.argv[i].find("-") != 0 and not inargs:
+ i -= 1
+ break
+ else:
+ inargs = False
+ if sys.argv[i] == "-ds":
+ if len(sys.argv) > i + 1:
+ inargs = True
+ datestart_str = sys.argv[i+1]
+ elif sys.argv[i] == "-de":
+ if len(sys.argv) > i + 1:
+ inargs = True
+ dateend_str = sys.argv[i+1]
+ terms = sys.argv[i+1:len(sys.argv)+1]
+ if inargs or len(terms) == 0:# or datestart_str is None:
+ print SearchSinaWeibo.__doc__
+ sys.exit(1)
+ if dateend_str is None:
+ dateend_str = datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M")
+ print terms
+ print "date start: " + str(datestart_str)
+ print "date end: " + str(dateend_str)
+ # Start Lucene
+ lucene.initVM(lucene.CLASSPATH)
+ print 'lucene', lucene.VERSION
+ search = SearchSinaWeibo()
+ if datestart_str is None and dateend_str is None:
+ search.searchWeibos(terms)
+ elif datestart_str is not None:
+ search.searchWeibos(terms, search.prepareDates(datestart_str))
+ elif dateend_str is not None:
+ search.searchWeibos(terms, 0, search.prepareDates(dateend_str))
+ else:
+ search.searchWeibos(terms, search.prepareDates(datestart_str), search.prepareDates(dateend_str))
View
36 sinatrace.py
@@ -22,6 +22,7 @@
pgconn = mypass.getConn()
def sinatrace(tid, minimal=False, extra_fields=False, get_users=False, outformat="json"):
+ # For RP: Should try to find the created_at if it's not known or given as argument...
sw = sinaweibooauth.SinaWeiboOauth()
sw.setToken(sw.sinaweiboOauth["oauth_token"], sw.sinaweiboOauth["oauth_token_secret"])
try:
@@ -35,9 +36,32 @@ def sinatrace(tid, minimal=False, extra_fields=False, get_users=False, outformat
u.followers_count user_followers_count, u.friends_count user_friends_count, u.retrieved user_retrieved "
else:
extra_fields = ""
+ '''
+ rps = sw.getRangePartitionByIds([tid])
+ for rp in rps:
+ x = rp.split(",")
+ year = int(x[0])
+ week = int(x[1])
+ break
+ isocal = datetime.datetime.now().isocalendar()
+ year_now = isocal[0]
+ week_now = isocal[1]
+ sw_tables_arr = list()
+ for x in range(year,year_now+1):
+ if year == year_now:
+ myrange = range(week,week_now+1)
+ elif x == year:
+ myrange = range(week,54)
+ elif x == year_now:
+ myrange = range(1,week)
+ for y in myrange:
+ sw_tables_arr.append("SELECT * FROM rp_sinaweibo_y%(year)dw%(week)d" % { "year": x, "week": y })
+ sw_tables = " UNION ".join(sw_tables_arr)
+ '''
sql = "SELECT s.id, s.created_at, s.user_id, s.screen_name, s.text, u.id AS user_id_ref %(extra_fields)s \
-FROM sinaweibo s LEFT JOIN sinaweibo_users u ON s.user_id = u.id \
-WHERE retweeted_status = %(tid)d ORDER BY s.id " % {"tid": tid, "extra_fields": extra_fields}
+FROM rp_sinaweibo s LEFT JOIN sinaweibo_users u ON s.user_id = u.id \
+WHERE retweeted_status = %(tid)d ORDER BY s.id " % {"tid": tid, "extra_fields": extra_fields }#, "sw_tables": sw_tables}
+ #print sql
rows = pgconn.query(sql).dictresult()
out = dict()
rts = list()
@@ -161,9 +185,13 @@ def gviz_trends(tid, req_id=0, interval="", period="", province=0, listid=0, out
basetime = None
if basetime is None:
sql_period = ""
+ sw_tables = "sinaweibo"
else:
basetime = datetime.datetime.combine(basetime, datetime.time())
sql_period = " AND s.created_at >= '%s' " % basetime.strftime("%Y-%m-%d")
+ import sinaweibooauth
+ sw = sinaweibooauth.SinaWeiboOauth()
+ sw_tables = "(%s)" % sw.getRangePartitionSQL(basetime)
sql_location = ""
sql_listidjoin = ""
sql_listid = ""
@@ -173,8 +201,8 @@ def gviz_trends(tid, req_id=0, interval="", period="", province=0, listid=0, out
if int(province) > 0:
sql_location = " AND u.province = %d " % int(province)
sql = "SELECT %(interval)s AS time, COUNT(*) AS count, COUNT(DISTINCT s.user_id) AS users \
-FROM sinaweibo s LEFT JOIN sinaweibo_users u ON s.user_id = u.id %(sql_listidjoin)s WHERE retweeted_status = %(tid)d %(sql_period)s %(sql_location)s %(sql_listid)s GROUP BY time ORDER BY time " \
-% {"tid": tid, "interval": sql_interval, "sql_period": sql_period, "sql_location": sql_location, "sql_listidjoin": sql_listidjoin, "sql_listid": sql_listid}
+FROM %(sw_tables)s s LEFT JOIN sinaweibo_users u ON s.user_id = u.id %(sql_listidjoin)s WHERE retweeted_status = %(tid)d %(sql_period)s %(sql_location)s %(sql_listid)s GROUP BY time ORDER BY time " \
+% {"tid": tid, "interval": sql_interval, "sql_period": sql_period, "sql_location": sql_location, "sql_listidjoin": sql_listidjoin, "sql_listid": sql_listid, "sw_tables": sw_tables }
rows = pgconn.query(sql).dictresult()
description = {"time": ("string", "Time"),
"count": ("number", "statuses"),
View
247 sinaweibo.lucene.py
@@ -0,0 +1,247 @@
+#!/usr/bin/env python
+
+import sys, os, lucene, threading, time
+import pg, mypass, sinaweibooauth
+import time, datetime
+import csv
+
+"""
+This class is loosely based on the Lucene (java implementation) demo class
+org.apache.lucene.demo.IndexFiles. It will take a directory as an argument
+and will index all of the files in that directory and downward recursively.
+It will index on the file path, the file name and the file contents. The
+resulting Lucene index will be placed in the current directory and called
+'index'.
+"""
+
+class Ticker(object):
+
+ def __init__(self):
+ self.tick = True
+
+ def run(self):
+ while self.tick:
+ sys.stdout.write('.')
+ sys.stdout.flush()
+ time.sleep(1.0)
+
+class IndexSinaWeibo(object):
+ """Usage: python IndexSinaWeibo [-d YYYY-MM-DD]"""
+
+ pgconn = None
+ sw = None
+ writer = None
+ storeDir = "/var/data/lucene/sinaweibo"
+ analyzers = list()
+ ticker = None
+
+ def __init__(self):
+ smartcn = lucene.SmartChineseAnalyzer(lucene.Version.LUCENE_33)
+ #analyzer = lucene.StandardAnalyzer(lucene.Version.LUCENE_33)
+ analyzers = { "smartcn": smartcn }
+ self.pgconn = mypass.getConn()
+ self.sw = sinaweibooauth.SinaWeiboOauth()
+ if not os.path.exists(self.storeDir):
+ os.mkdir(self.storeDir)
+ store = lucene.SimpleFSDirectory(lucene.File(self.storeDir))
+ writerconfig = lucene.IndexWriterConfig(lucene.Version.LUCENE_33, analyzers["smartcn"])
+ writerconfig.setWriteLockTimeout(600000L)
+ writerconfig.setMaxThreadStates(50)
+ writerconfig.setRAMBufferSizeMB(128.0)
+ self.writer = lucene.IndexWriter(store, writerconfig)
+ #self.writer.setMaxFieldLength(1048576)
+ #self.ticker = Ticker()
+ #threading.Thread(target=self.ticker.run).start()
+ #self.writer.optimize()
+ #self.writer.close()
+ #self.ticker.tick = False
+
+ def indexWeibosByIsoDate(self, yeariso, weekiso):
+ yw = { "yeariso": yeariso, "weekiso": weekiso }
+ print "Fetching weibos inserted on year %(yeariso)d and week %(weekiso)d " % yw
+ sql = "SELECT id, created_at, user_id, text FROM rp_sinaweibo_y%(yeariso)dw%(weekiso)d WHERE date(dbinserted) IS NULL ORDER BY id " % yw
+ try:
+ rows = self.pgconn.query(sql).dictresult()
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
+ return { "msg": self.pgconn.error }
+ print "Inserting %d rows into Lucene... " % len(rows)
+ count = 0
+ for r in rows:
+ count += 1
+ if count % 500000 == 0:
+ print count
+ print datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S")
+ tid = r["id"]
+ text = r["text"]
+ user_id = r["user_id"]
+ created_at = r["created_at"]
+ try:
+ t = time.strptime(created_at,"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ except:
+ print "Failed converting date %s " % created_at
+ continue
+ self.indexWeibo(tid, text, user_id, created_at_secs)
+
+ def indexWeibosByInsertDate(self, insertdate_str):
+ try:
+ insertdate = datetime.datetime.strptime(insertdate_str, "%Y-%m-%d")
+ except ValueError:
+ errmsg = "Invalid date: %s " % insertdate_str
+ return { "msg": errmsg }
+ print "Fetching weibos inserted on %s" % insertdate_str
+ sql = "SELECT id, created_at, user_id, text FROM rp_sinaweibo WHERE date(dbinserted) = '%s' ORDER BY id " % insertdate_str
+ try:
+ rows = self.pgconn.query(sql).dictresult()
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
+ return { "msg": self.pgconn.error }
+ print "Inserting %d rows into Lucene... " % len(rows)
+ count = 0
+ for r in rows:
+ count += 1
+ if count % 500000 == 0:
+ print count
+ print datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S")
+ tid = r["id"]
+ text = r["text"]
+ user_id = r["user_id"]
+ created_at = r["created_at"]
+ try:
+ t = time.strptime(created_at,"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ except:
+ print "Failed converting date %s " % created_at
+ continue
+ self.indexWeibo(tid, text, user_id, created_at_secs)
+
+ def indexWeibosByIdFromDB(self, tid):
+ print "adding weibo #%d" % tid
+ try:
+ rps = self.sw.getRangePartitionByIds([tid])
+ sw_tables_arr = list()
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sw_tables_arr.append("SELECT id, created_at, user_id, text FROM rp_sinaweibo_y%(year)dw%(week)d y%(year)dw%(week)d" % { "year": year, "week": week })
+ sw_tables = " UNION ".join(sw_tables_arr)
+ sw_tables = "(%s)" % sw_tables
+ res = self.pgconn.query("SELECT id, created_at, user_id, text FROM %(sw_tables)s s WHERE id = %(tid)d " % { "sw_tables": sw_tables, "tid": tid })
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
+ return self.pgconn.error
+ r = res.dictresult()
+ if len(r) <= 0:
+ return "Weibo doesn't exist"
+ text = r[0]["text"]
+ user_id = r[0]["user_id"]
+ created_at = r[0]["created_at"]
+ try:
+ t = time.strptime(created_at,"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ except:
+ return "Failed converting date "
+ self.indexWeibo(tid, text, user_id, created_at_secs)
+
+ def indexWeibosFromCSV(self, fpath):
+ f = open(fpath, "rU")
+ #dialect = csv.Sniffer().sniff(f.read(4096))
+ #f.seek(0)
+ #cr = csv.reader(f, dialect)
+ #cr = csv.DictReader(f, dialect=dialect, delimiter=",", quoting=csv.QUOTE_MINIMAL, quotechar='"', escapechar="\\", lineterminator="\n")
+ cr = csv.DictReader(f, delimiter=",", quoting=csv.QUOTE_MINIMAL, quotechar='"', escapechar="\\", lineterminator="\n")
+ count = 0
+ for x in cr:
+ count += 1
+ if count % 500000 == 0:
+ print count
+ print datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S")
+ try:
+ t = time.strptime(x["created_at"],"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ except:
+ print "Failed converting date: %s " % x["created_at"]
+ continue
+ try:
+ self.indexWeibo(x["id"], x["text"], x["user_id"], created_at_secs)
+ except Exception as e:
+ print e
+ print x["id"]
+ print x["text"]
+ continue
+ self.writer.optimize()
+ #self.writer.close()
+ #self.ticker.tick = False
+
+ def indexWeibosFromRows(self, rows):
+ '''Inserts rows in the index by timeline rows'''
+ for x in rows:
+ created_at_secs = None
+ try:
+ t = time.strptime(x["created_at"],"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ except:
+ continue
+ self.indexWeibo(x["id"], x["text"], x["user_id"], created_at_secs)
+ self.writer.optimize()
+ self.writer.close()
+ #self.ticker.tick = False
+
+ def indexWeibo(self, tid, text, user_id, created_at):
+ try:
+ doc = lucene.Document()
+ doc.add(lucene.NumericField("id", 8, lucene.Field.Store.YES, True).setLongValue(long(tid)))
+ doc.add(lucene.Field("text", text,
+ lucene.Field.Store.NO,
+ lucene.Field.Index.ANALYZED))
+ doc.add(lucene.NumericField("user_id", lucene.Field.Store.YES, True).setIntValue(int(user_id)))
+ doc.add(lucene.NumericField("created_at", lucene.Field.Store.YES, True).setIntValue(created_at))
+ self.writer.addDocument(doc)
+ except Exception, e:
+ print "Failed in indexWeibos:", e
+
+if __name__ == '__main__':
+ if len(sys.argv) <= 1:
+ print IndexSinaWeibo.__doc__
+ sys.exit(1)
+ opt = 0
+ for i in range(1,len(sys.argv)):
+ arg = sys.argv[i]
+ if arg == "-d" or arg == "--date":
+ if len(sys.argv) > i + 1:
+ opt = 1
+ insertdate_str = sys.argv[i+1]
+ break
+ if arg == "-yw" or arg == "--year-week":
+ if len(sys.argv) > i + 2:
+ opt = 2
+ yeariso = int(sys.argv[i+1])
+ weekiso = int(sys.argv[i+2])
+ break
+ lucene.initVM(lucene.CLASSPATH)
+ print 'lucene', lucene.VERSION
+ start = datetime.datetime.now()
+ indexer = IndexSinaWeibo()
+ if opt == 1:
+ indexer.indexWeibosByInsertDate(insertdate_str)
+ if opt == 2:
+ indexer.indexWeibosByIsoDate(yeariso, weekiso)
+ #indexer.indexWeibosByIdFromDB(3352644673101184)
+ #indexer.indexWeibosByIdFromDB(3352116729032709)
+ #indexer.indexWeibosByIdFromDB(3352742815828213)
+ #indexer.indexWeibosFromCSV("/var/data/lucene/y2011w33.csv")
+ #indexer.indexWeibosFromCSV("/var/data/lucene/y2011w34.csv")
+ #indexer.indexWeibosFromCSV("/var/data/lucene/y2011w35.csv")
+ #indexer.indexWeibosFromCSV("/var/data/lucene/y2011w36.csv")
+ #indexer.ticker.tick = False
+ try:
+ print 'optimizing index'
+ indexer.writer.optimize()
+ print 'done'
+ indexer.writer.close()
+ except:
+ pass
+ end = datetime.datetime.now()
+ print end - start
View
277 sinaweibo.oauth.py
@@ -7,6 +7,7 @@
import httplib
import simplejson
import time
+import string
import oauth2 as oauth
import pprint
@@ -21,6 +22,9 @@
#from weibopy.auth import OAuthHandler, BasicAuthHandler
#from weibopy.api import API
+import lucene
+import sinaweibolucene
+
class SinaWeiboOauth():
sinaweiboOauth = mypass.getSinaWeiboOauth()
pgconn = None
@@ -28,11 +32,13 @@ class SinaWeiboOauth():
toleranceNotToBeginningLong = 150 # for reposts
max_gotall_count = 3
api_wait_secs = 5
+ max_api_misses_half = 3
max_api_misses = 6
max_reposts_pages = max_comments_pages = 1000
max_reposts_blanks = max_comments_blanks = 3
max_reposts_tries = max_comments_tries = 3
usage = "sinaweibo.oauth.py [id or file with ids] [primary opts] [sec opts]"
+ rp_dir = "/var/data/sinaweibo/rp"
comments_dir = "/var/data/sinaweibo/comments"
reposts_dir = "/var/data/sinaweibo/reposts"
verbose = False
@@ -40,6 +46,9 @@ class SinaWeiboOauth():
force_screenname = False
checkonly = False
doupdate = False
+ saveRP = False
+ index = False
+ indexer = None
def __init__(self):
socket.setdefaulttimeout(300)
@@ -162,15 +171,31 @@ def get_status(self, id, getUser=False, toDB=True):
api_misses += 1
if api_misses >= self.max_api_misses:
return { "msg": e.reason }
- if e.reason.find("Error: target weibo does not exist!") > 0:
+ if e.reason.find("Error: target weibo does not exist!") >= 0:
+ try:
+ rps = self.getRangePartitionByIds([id])
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sql_deleted = "UPDATE rp_sinaweibo_y%(year)dw%(week)d SET deleted = NOW() WHERE id = %(id)d AND deleted IS NULL " % { "year": year, "week": week, "id": id }
+ if self.pgconn is None:
+ self.pgconn = mypass.getConn()
+ res = self.pgconn.query(sql_deleted)
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
return { "msg": e.reason }
time.sleep(self.api_wait_secs)
time_api = time.time() - start_time_api
+ if self.saveRP:
+ self.saveRangePartitionByIdDate(id, self.getAtt(status, "created_at"))
row = self.status_to_row(status)
r = dict()
if toDB and not self.checkonly:
start_time_db = time.time()
- resp = self.toDB("sinaweibo", row, self.doupdate)
+ tablename = self.getRangePartitionByDate(self.getAtt(status, "created_at"))
+ #tablename = "rp_sinaweibo"
+ resp = self.toDB(tablename, row, self.doupdate)
time_db += time.time() - start_time_db
r = resp
resp["time_db"] = time_db
@@ -205,6 +230,8 @@ def user_timeline(self, user_id, count=200, page=1):
if api_misses >= self.max_api_misses:
return { "msg": e.reason }
time.sleep(self.api_wait_secs)
+ if string.find(e.reason, "requests out of rate limit") >= 0:
+ self.waitRateLimit()
except socket.error as e:
print e
api_misses += 1
@@ -225,7 +252,7 @@ def user_timeline(self, user_id, count=200, page=1):
if "count" in r and r["count"] == 0:
if self.pgconn is None:
self.pgconn = mypass.getConn()
- self.pgconn.query("UPDATE sinaweibo_users SET posts_updated = NOW() WHERE id = %d" % user_id)
+ #self.pgconn.query("UPDATE sinaweibo_users SET posts_updated = NOW() WHERE id = %d" % user_id)
r["time_api"] = time_api
r["page"] = page
return r
@@ -244,8 +271,25 @@ def reposts(self, status_id, count=200, page=1):
if api_misses == self.max_api_misses:
return { "msg": e.reason }
if e.reason.find("Error: target weibo does not exist!") > 0:
+ try:
+ rps = self.getRangePartitionByIds([id])
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sql_deleted = "UPDATE rp_sinaweibo_y%(year)dw%(week)d SET deleted = NOW() WHERE id = %(id)d AND deleted IS NULL " % { "year": year, "week": week, "id": id }
+ if self.pgconn is None:
+ self.pgconn = mypass.getConn()
+ res = self.pgconn.query(sql_deleted)
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
return { "msg": e.reason }
time.sleep(self.api_wait_secs)
+ if e.reason.find("requests out of rate limit") >= 0:
+ if e.reason.find("IP") >= 0 and api_misses <= self.max_api_misses_half:
+ time.sleep(60) # to consider rolling IPs
+ else:
+ self.waitRateLimit()
time_api = time.time() - start_time_api
r = self.status_timeline(timeline, False)
r["time_api"] = time_api
@@ -257,6 +301,8 @@ def status_timeline(self, statuses, isSingleUser=True, toDB=True, toBeginning=Tr
already_exists_count = 0
time_db = 0
time_db_u = 0
+ if self.index:
+ time_index = 0
deleted_count = 0
timeline_users_ids = list()
toleranceNotToBeginningCount = 0
@@ -271,12 +317,19 @@ def status_timeline(self, statuses, isSingleUser=True, toDB=True, toBeginning=Tr
if self.pgconn is None:
self.pgconn = mypass.getConn()
try:
- sql_deleted = "UPDATE sinaweibo SET deleted = NOW() WHERE id = %d AND deleted IS NULL " % x["id"]
- res = self.pgconn.query(sql_deleted)
+ rps = self.getRangePartitionByIds([x["id"]])
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sql_deleted = "UPDATE rp_sinaweibo_y%(year)dw%(week)d SET deleted = NOW() WHERE id = %(id)d AND deleted IS NULL " % { "year": year, "week": week, "id": x["id"] }
+ res = self.pgconn.query(sql_deleted)
except pg.ProgrammingError, pg.InternalError:
print self.pgconn.error
continue
- resp = self.toDB("sinaweibo", x)
+ tablename = self.getRangePartitionByDate(self.getAtt(l, "created_at"))
+ #tablename = "rp_sinaweibo"
+ resp = self.toDB(tablename, x)
time_db += time.time() - start_time_db
if not resp["already_exists"] and resp["success"] and not isSingleUser:
timeline_user = self.getAtt(l, "user")
@@ -306,6 +359,15 @@ def status_timeline(self, statuses, isSingleUser=True, toDB=True, toBeginning=Tr
newlyadded += 1
if not toBeginning:
toleranceNotToBeginningCount = 0
+ if self.index: # index if the row doesn't already exist
+ time_index_start = time.time()
+ try:
+ t = time.strptime(x["created_at"],"%Y-%m-%d %H:%M:%S")
+ created_at_secs = int(time.mktime(t))
+ self.indexer.indexWeibo(x["id"], x["text"], x["user_id"], created_at_secs)
+ except Exception as e:
+ print e
+ time_index += time.time() - time_index_start
else:
print x
r = { "count": len(statuses), "deleted_count": deleted_count }
@@ -315,7 +377,7 @@ def status_timeline(self, statuses, isSingleUser=True, toDB=True, toBeginning=Tr
u["posts_updated"] = "NOW()"
start_time_db_u = time.time()
resp_u = self.toDB("sinaweibo_users", u, doupdate=True)
- #print resp_u
+ print resp_u
time_db_u += time.time() - start_time_db_u
r["user_id"] = u["id"]
if toDB:
@@ -324,6 +386,8 @@ def status_timeline(self, statuses, isSingleUser=True, toDB=True, toBeginning=Tr
r["newly_added"] = newlyadded
r["time_db"] = time_db
r["time_db_u"] = time_db_u
+ if self.index:
+ r["time_index"] = time_index
return r
def comments(self, status_id, count=200, page=1, toDB=True, toBeginning=True):
@@ -339,8 +403,26 @@ def comments(self, status_id, count=200, page=1, toDB=True, toBeginning=True):
api_misses += 1
if api_misses == self.max_api_misses:
return { "msg": e.reason }
+ if e.reason.find("Error: target weibo does not exist!") > 0:
+ try:
+ rps = self.getRangePartitionByIds([status_id])
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sql_deleted = "UPDATE rp_sinaweibo_y%(year)dw%(week)d SET deleted = NOW() WHERE id = %(id)d AND deleted IS NULL " % { "year": year, "week": week, "id": status_id }
+ if self.pgconn is None:
+ self.pgconn = mypass.getConn()
+ res = self.pgconn.query(sql_deleted)
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
+ return { "msg": e.reason }
time.sleep(self.api_wait_secs)
- continue
+ if e.reason.find("requests out of rate limit") >= 0:
+ if e.reason.find("IP") >= 0 and api_misses <= self.max_api_misses_half:
+ time.sleep(60) # to consider rolling IPs
+ else:
+ self.waitRateLimit()
time_api = time.time() - start_time_api
time_db = 0
time_db_u = 0
@@ -386,7 +468,7 @@ def user(self, user_id, screen_name=None, toDB=True):
else:
user = self.api.get_user(user_id=user_id)
except weibopy.error.WeibopError as e:
- if e.reason.find("User does not exists") > 0:
+ if e.reason.find("User does not exists") >= 0:
if self.pgconn is None:
self.pgconn = mypass.getConn()
try:
@@ -424,25 +506,126 @@ def socialgraph(self, user_id, rel="followers", cursor=-1, count=5000, toDB=True
already_exists_count = 0
time_db = 0
start_time_api = time.time()
- if rel == "friends":
- relations = self.api.friends_ids(user_id=user_id, cursor=cursor, count=count)
- else:
- relations = self.api.followers_ids(user_id=user_id, cursor=cursor, count=count)
+ api_misses = 0
+ while api_misses < self.max_api_misses:
+ try:
+ if rel == "friends":
+ relations = self.api.friends_ids(id=user_id, cursor=cursor, count=count)
+ else:
+ relations = self.api.followers_ids(id=user_id, cursor=cursor, count=count)
+ except httplib.IncompleteRead as h:
+ print h
+ api_misses += 1
+ if api_misses >= self.max_api_misses:
+ return { "msg": h }
+ time.sleep(self.api_wait_secs)
+ except weibopy.error.WeibopError as e:
+ api_misses += 1
+ if api_misses == self.max_api_misses:
+ return { "msg": e.reason }
+ if e.reason.find("requests out of rate limit") >= 0:
+ self.waitRateLimit()
+ else:
+ time.sleep(self.api_wait_secs)
+ continue
+ except socket.error as e:
+ print e
+ api_misses += 1
+ if api_misses >= self.max_api_misses:
+ return { "msg": e.message }
+ time.sleep(self.api_wait_secs)
fids = self.getAtt(relations, "ids")
time_api = time.time() - start_time_api
+ r = dict()
for fid in fids:
x = { "source_id": user_id, "target_id": fid, "retrieved": "NOW()" }
start_time_db = time.time()
- resp = self.toDB("sinaweibo_" + rel, x, True)
+ resp = self.toDB("sinaweibo_" + rel, x, doupdate=False)
time_db += time.time() - start_time_db
if resp["already_exists"]:
already_exists_count += 1
- r = { r["ids"]: fids, r["time_api"]: time_api, r["count"]: len(fids) }
+ r = resp
+ r["ids"] = fids
+ r["time_api"] = time_api
+ r["count"] = len(fids)
if toDB:
r["time_db"] = time_db
r["already_exists_count"] = already_exists_count
return r
+ def waitRateLimit(self):
+ rls = self.api.rate_limit_status()
+ ratelimstatus = { "remaining_hits": self.getAtt(rls, "remaining_hits"), "hourly_limit": self.getAtt(rls, "hourly_limit"), "reset_time_in_seconds": self.getAtt(rls, "reset_time_in_seconds"), "reset_time": self.getAtt(rls, "reset_time") }
+ reset_time = int(self.getAtt(rls, "reset_time_in_seconds")) + self.api_wait_secs
+ if self.verbose:
+ print "Reset time in %(secs)d seconds (now: %(now)s) " % { "secs": reset_time, "now": datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S")}
+ if reset_time > 2700 and reset_time < 3600:
+ time.sleep(120)
+ elif reset_time > 120:
+ time.sleep(reset_time + 30)
+ else:
+ time.sleep(self.api_wait_secs * 5)
+
+ def saveRangePartitionByIdDate(self, id, date):
+ isocal = date.isocalendar()
+ f = open(self.rp_dir + "/ids/" + str(id), "w")
+ f.write(str(isocal[0]) + "," + str(isocal[1]))
+ f.close()
+
+ def getRangePartitionByIds(self, ids=list()):
+ # Prepare the minmax file for Python
+ f_minmax = open(self.rp_dir + "/rp-minmaxids-sw.csv", "r")
+ cr = csv.DictReader(f_minmax)
+ minmaxids = dict()
+ for csvrow in cr:
+ if not csvrow["year"] in minmaxids: # put year
+ minmaxids[csvrow["year"]] = dict()
+ if not csvrow["week"] in minmaxids[csvrow["year"]]: # put week
+ minmaxids[csvrow["year"]][csvrow["week"]] = dict()
+ if len(csvrow["min"]) > 0 and len(csvrow["max"]) > 0:
+ minmaxids[csvrow["year"]][csvrow["week"]]["min"] = long(csvrow["min"])
+ minmaxids[csvrow["year"]][csvrow["week"]]["max"] = long(csvrow["max"])
+ years_sorted = sorted(minmaxids.keys())
+ yearsweeks_sorted = dict()
+ for yeariso in years_sorted: # get a dict per year of sorted weeks
+ yearsweeks_sorted[yeariso] = sorted(minmaxids[yeariso].keys())
+ if self.verbose:
+ print yearsweeks_sorted
+ # Mark range partitions to check and keep IDs of weibos
+ rp_tocheck = list()
+ for thisid in ids:
+ thisid = long(thisid)
+ thismin = None
+ thismax = None
+ donethisid = False
+ last_within_minmax = False
+ # we go in the min-max
+ for year in years_sorted:
+ if donethisid:
+ break
+ for week in yearsweeks_sorted[year]:
+ minmax = minmaxids[year][week] # the minmax we are checking currently
+ lastmin = thismin # not used
+ lastmax = thismax # not used
+ if "min" not in minmax or "max" not in minmax: # don't check empty partitions
+ continue
+ thismin = minmax["min"]
+ thismax = minmax["max"]
+ if thismin is None or thismax is None: # don't check empty partitions
+ continue
+ if thisid >= thismin and thisid <= thismax: # within this partition
+ yw_str = str(year) + "," + str(week)
+ if yw_str not in rp_tocheck:
+ rp_tocheck.append(yw_str)
+ if last_within_minmax:
+ donethisid = True
+ break
+ else:
+ last_within_minmax = True
+ continue
+ #return { "rp": rp_tocheck, "ids": ids_tocheck, "ids_str": ids_str_tocheck }
+ return rp_tocheck
+
def toDB(self, tablename, data, doupdate=False, updatefirst=False):
if self.pgconn is None:
self.pgconn = mypass.getConn()
@@ -462,7 +645,7 @@ def toDB(self, tablename, data, doupdate=False, updatefirst=False):
resp["already_exists"] = True
else:
try:
- print data
+ #print data
r = self.pgconn.insert(tablename, data)
resp["success"] = True
except:
@@ -470,8 +653,9 @@ def toDB(self, tablename, data, doupdate=False, updatefirst=False):
resp["already_exists"] = True
else:
try:
+ #print data
self.pgconn.insert(tablename, data)
- resp["success"] = True
+ resp["success"] = True
except pg.ProgrammingError, pg.InternalError:
if self.pgconn.error.find('duplicate key value violates unique constraint') > 0:
resp["already_exists"] = True
@@ -480,10 +664,36 @@ def toDB(self, tablename, data, doupdate=False, updatefirst=False):
self.pgconn.update(tablename, data)
resp["success"] = True
except:
+ resp["reason"] = self.pgconn.error
pass
+
#pgconn.close()
return resp
+ def getRangePartitionByDate(self, dt_created_at):
+ isocal = dt_created_at.isocalendar()
+ return "rp_sinaweibo_y" + str(isocal[0]) + "w" + str(isocal[1])
+
+ def getRangePartitionSQL(self, date_start, date_end=datetime.datetime.now(), select_list="*"):
+ isostart = date_start.isocalendar()
+ yearisostart = isostart[0]
+ weekisostart = isostart[1]
+ isoend = date_end.isocalendar()
+ yearisoend = isoend[0]
+ weekisoend = isoend[1]
+ sw_tables_arr = list()
+ for x in range(yearisostart,yearisoend+1):
+ if yearisostart == yearisoend:
+ weekisorange = range(weekisostart, weekisoend+1)
+ elif yearisostart == x:
+ weekisorange = range(weekisostart, 54)
+ elif yearisoend == x:
+ weekisorange = range(1, weekisoend)
+ for y in weekisorange:
+ sw_tables_arr.append("SELECT %(select_list)s FROM rp_sinaweibo_y%(year)dw%(week)d y%(year)dw%(week)d" % { "select_list": select_list, "year": x, "week": y })
+ sw_tables = " UNION ".join(sw_tables_arr)
+ return sw_tables
+
# Sends from command-line to the appropriate function
def dispatch(self, opt, id, output_counts=False):
if opt == 1: # user timeline
@@ -497,11 +707,11 @@ def dispatch(self, opt, id, output_counts=False):
out = self.user(id)
elif opt == 3: # friends
out = self.socialgraph(id, "friends")
- if out["count"] == 5000:
+ if "count" in out and out["count"] == 5000:
out = self.socialgraph(id, "friends", 4999)
elif opt == 4: # followers
out = self.socialgraph(id, "followers")
- if out["count"] == 5000:
+ if "count" in out and out["count"] == 5000:
out = self.socialgraph(id, "followers", 4999)
elif opt == 7: # reposts
blanks_count = 0
@@ -548,10 +758,17 @@ def dispatch(self, opt, id, output_counts=False):
if output_counts:
if self.pgconn is None:
self.pgconn = mypass.getConn()
- sql_count = "SELECT COUNT(*) FROM sinaweibo WHERE retweeted_status = %d " % id
- res_count = self.pgconn.query(sql_count).getresult()
+ rps = self.getRangePartitionByIds([id])
+ rps_count = 0
+ for x in rps:
+ yw = x.split(",")
+ year = int(yw[0])
+ week = int(yw[1])
+ sql_count = "SELECT COUNT(*) FROM rp_sinaweibo_y%(year)dw%(week)d WHERE retweeted_status = %(id)d " % { "year": year, "week": week, "id": id }
+ res_count = self.pgconn.query(sql_count).getresult()
+ rps_count += int(res_count[0][0])
fo = open(self.reposts_dir + "/counts/" + str(id), "w")
- fo.write(str(res_count[0][0]))
+ fo.write(str(rps_count))
fo.close()
elif opt == 8: # comments
blanks_count = 0
@@ -657,10 +874,19 @@ def dispatch(self, opt, id, output_counts=False):
sw.checkonly = True
elif sys.argv[i] == "-u" or sys.argv[i] == "--update":
sw.doupdate = True
+ elif sys.argv[i] == "-srp" or sys.argv[i] == "--save-range-partition":
+ sw.saveRP = True
+ elif sys.argv[i] == "-i" or sys.argv[i] == "--index":
+ sw.index = True
# bind token and set the api
sw.setToken(sw.sinaweiboOauth["oauth_token"], sw.sinaweiboOauth["oauth_token_secret"])
+ # initialize the indexer, if needed
+ if sw.index:
+ lucene.initVM(lucene.CLASSPATH)
+ sw.indexer = sinaweibolucene.IndexSinaWeibo()
+
# dispatch
if sw.force_screenname:
out = sw.dispatch(opt, fname, output_counts)
@@ -688,9 +914,14 @@ def dispatch(self, opt, id, output_counts=False):
thisout["id"] = id
out.append(thisout)
output = { "data": out, "opt": opt, "count": len(out), "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") }
+
if sw.verbose:
print output
-
+
+ if sw.index:
+ sw.indexer.writer.optimize()
+ sw.indexer.writer.close()
+ sw.indexer.ticker.tick = False
#sw.auth()
#sw.setToken("24910039392f0acf7b83b8cb91ee9191", "1363bae989c232bf5db6aefb3e6460c7")
#print sw.user_timeline(1801443400)
View
160 sinaweibo.search.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+
+import sys, os
+import time, datetime
+import csv
+import pg
+import lucene
+import mypass, sinaweibooauth
+
+class SearchSinaWeibo(object):
+ """Usage: sinaweibo.search.py [-ds|-de DATE] terms"""
+
+ pgconn = None
+ sw = None
+ STORE_DIR = "/var/data/lucene/sinaweibo"
+ analysers = list()
+ searcher = None
+ MAX_ITEMS = 500000
+
+ def __init__(self):
+ smartcn = lucene.SmartChineseAnalyzer(lucene.Version.LUCENE_33)
+ self.analyzers = { "smartcn": smartcn }
+ directory = lucene.SimpleFSDirectory(lucene.File(self.STORE_DIR))
+ self.searcher = lucene.IndexSearcher(directory, True)
+ self.pgconn = mypass.getConn()
+ self.sw = sinaweibooauth.SinaWeiboOauth()
+
+ def prepareDates(self, datestring):
+ if datestring is None:
+ return None
+ try:
+ mydate = time.strptime(datestring, "%Y-%m-%d")
+ except ValueError:
+ try:
+ mydate = time.strptime(datestring, "%Y-%m-%d %H:%M")
+ except ValueError:
+ return None
+ return int(time.mktime(mydate))
+
+ def searchWeibos(self, q, created_at_start_secs, created_at_end_secs, user_ids=list()):
+ startexec = datetime.datetime.now()
+ #q = " ".join(text)
+ first = True
+ query = lucene.BooleanQuery()
+ query.setMaxClauseCount(2097152)
+ #q = 'created_at:[%(start)d TO %(end)d] AND (%(q)s)' % { "q": q, "start": created_at_start_secs, "end": created_at_end_secs }
+ #q = 'created_at:[%(start)d TO %(end)d]' % { "q": q, "start": created_at_start_secs, "end": created_at_end_secs }
+ #query = lucene.QueryParser(lucene.Version.LUCENE_33, "created_at", self.analyzers["smartcn"]).parse(q)
+ sorter = lucene.Sort(lucene.SortField("created_at", lucene.SortField.INT, True))
+ if len(q) > 0:
+ query.add(lucene.QueryParser(lucene.Version.LUCENE_33, "text", self.analyzers["smartcn"]).parse(q), lucene.BooleanClause.Occur.MUST)
+ if created_at_start_secs is not None and created_at_end_secs is not None:
+ dateFilter = lucene.NumericRangeFilter.newIntRange("created_at", created_at_start_secs, created_at_end_secs, True, True)
+ else:
+ if created_at_start_secs is not None and created_at_end_secs is not None:
+ query.add(lucene.NumericRangeQuery.newIntRange("created_at", created_at_start_secs, created_at_end_secs, True, True), lucene.BooleanClause.Occur.MUST)
+ #dateFilter = lucene.NumericRangeFilter.newIntRange("created_at", created_at_start_secs, created_at_end_secs, True, True)
+ topScoreCollector = lucene.TopScoreDocCollector
+ if len(user_ids) > 0:
+ user_ids_str = list()
+ numfilters = list()
+ count = 0
+ for x in user_ids:
+ count += 1
+ user_ids_str.append(str(x))
+ #user_ids_str.append("user_id:\"" + str(x) + '"')
+ #query.add(lucene.NumericRangeQuery.newIntRange("user_id", x, x, True, True), lucene.BooleanClause.Occur.SHOULD)
+ numfilter = lucene.NumericRangeFilter.newIntRange("user_id", x, x, True, True)
+ numfilters.append(numfilter)
+ #if count > 1000:
+ # break
+ chainedNumFilters = lucene.ChainedFilter(numfilters, lucene.ChainedFilter.OR)
+ cachingChainedNumFilters = lucene.CachingWrapperFilter(chainedNumFilters)
+ if len(q) > 0:
+ chain = lucene.ChainedFilter([cachingChainedNumFilters,dateFilter], lucene.ChainedFilter.AND)
+ else:
+ chain = cachingChainedNumFilters
+ #query.add(lucene.QueryParser(lucene.Version.LUCENE_33, "user_id", self.analyzers["smartcn"]).parse("(%s)" % " ".join(user_ids_str)), lucene.BooleanClause.Occur.MUST)
+ #query.add(lucene.QueryParser(lucene.Version.LUCENE_33, "user_id", self.analyzers["smartcn"]).parse("user_id:(%s)" % " OR ".join(user_ids_str)), lucene.BooleanClause.Occur.MUST)
+ #topDocs = self.searcher.search(query, chain, self.MAX_ITEMS, sorter)
+ topDocs = self.searcher.search(query, chain, sorter)
+ else:
+ if len(q) > 0 and created_at_start_secs is not None and created_at_end_secs is not None:
+ topDocs = self.searcher.search(query, dateFilter, self.MAX_ITEMS, sorter)
+ else:
+ topDocs = self.searcher.search(query, self.MAX_ITEMS, sorter)
+ #return "%(nb)d results found in %(secs)f seconds" %
+ ids = list()
+ ids_str = list()
+ hits = list()
+ count = 0
+ for scoreDoc in topDocs.scoreDocs:
+ count += 1
+ doc = self.searcher.doc(scoreDoc.doc)
+ id = doc.get("id")
+ user_id = doc.get("user_id")
+ #ids.append(id)
+ hit = { "id": id, "user_id": user_id }
+ hits.append(hit)
+ #ids_str.append(str(id))
+ #if count > self.MAX_ITEMS:
+ #break
+ out = { "totalhits": topDocs.totalHits, "nb_users": len(user_ids), "ids": ids, "q": q, "hits": hits }
+ out["lucene_query_finished"] = long(time.mktime(datetime.datetime.now().timetuple())) * 1000
+ if len(user_ids) > 0:
+ out["user_ids"] = user_ids_str
+ # Logging
+ f = open("/var/data/sinaweibo/searchlog/searchweibos.log","a")
+ f.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + "\t" + q + "\n")
+ f.close()
+ endexec = datetime.datetime.now()
+ td = endexec - startexec
+ microtime = td.microseconds + (td.seconds + td.days * 86400) * 1000000
+ secondstime = microtime / 1000000.0
+ out["secs"] = secondstime
+ print out
+ return out
+
+if __name__ == '__main__':
+ if len(sys.argv) <= 1:
+ print SearchSinaWeibo.__doc__
+ sys.exit(1)
+ inargs = False
+ datestart_str = None
+ dateend_str = None
+ for i in range(1, len(sys.argv)):
+ if sys.argv[i].find("-") != 0 and not inargs:
+ i -= 1
+ break
+ else:
+ inargs = False
+ if sys.argv[i] == "-ds":
+ if len(sys.argv) > i + 1:
+ inargs = True
+ datestart_str = sys.argv[i+1]
+ elif sys.argv[i] == "-de":
+ if len(sys.argv) > i + 1:
+ inargs = True
+ dateend_str = sys.argv[i+1]
+ terms = sys.argv[i+1:len(sys.argv)+1]
+ if inargs or len(terms) == 0:# or datestart_str is None:
+ print SearchSinaWeibo.__doc__
+ sys.exit(1)
+ if dateend_str is None:
+ dateend_str = datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M")
+ print terms
+ print "date start: " + str(datestart_str)
+ print "date end: " + str(dateend_str)
+ # Start Lucene
+ lucene.initVM(lucene.CLASSPATH)
+ print 'lucene', lucene.VERSION
+ search = SearchSinaWeibo()
+ if datestart_str is None and dateend_str is None:
+ search.searchWeibos(terms)
+ elif datestart_str is not None:
+ search.searchWeibos(terms, search.prepareDates(datestart_str))
+ elif dateend_str is not None:
+ search.searchWeibos(terms, 0, search.prepareDates(dateend_str))
+ else:
+ search.searchWeibos(terms, search.prepareDates(datestart_str), search.prepareDates(dateend_str))
View
119 social.lucene.py
@@ -0,0 +1,119 @@
+#!/usr/bin/env python
+
+import sys, os, lucene, threading, time
+import pg, mypass
+import time, datetime
+import csv
+
+class SocialLucene(object):
+ """Usage: python SocialLucene <network name> [-d YYYY-MM-DD]"""
+
+ networks = ["uwants", "hkreporter", "discuss", "facebook", "twitter"]
+ network = ""
+ pgconn = None
+ sw = None
+ writer = None
+ storeDirBase = "/var/data/lucene/"
+ storeDir = None
+ analyzers = list()
+
+ def __init__(self, network):
+ self.network = network
+ smartcn = lucene.SmartChineseAnalyzer(lucene.Version.LUCENE_33)
+ #analyzer = lucene.StandardAnalyzer(lucene.Version.LUCENE_33)
+ analyzers = { "smartcn": smartcn }
+ self.pgconn = mypass.getConn()
+ writerconfig = lucene.IndexWriterConfig(lucene.Version.LUCENE_33, analyzers["smartcn"])
+ writerconfig.setWriteLockTimeout(600000L)
+ writerconfig.setMaxThreadStates(50)
+ writerconfig.setRAMBufferSizeMB(128.0)
+ self.storeDir = self.storeDirBase + self.network
+ store = lucene.SimpleFSDirectory(lucene.File(self.storeDir))
+ self.writer = lucene.IndexWriter(store, writerconfig)
+
+ def indexByDate(self, date_str):
+ if self.network == "uwants" or self.network == "hkreporter" or self.network == "discuss":
+ sql = "SELECT * FROM %(network)s_post WHERE date(dbinserted) = '%(date)s' ORDER BY pid " % { "date": date_str, "network": self.network }
+ try:
+ rows = self.pgconn.query(sql).dictresult()
+ except pg.ProgrammingError, pg.InternalError:
+ print self.pgconn.error
+ print "Inserting %d rows into Lucene... " % len(rows)
+ count = 0
+ for r in rows:
+ count += 1
+ if count % 500000 == 0:
+ print count
+ print datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S")
+ pid = r["pid"]
+ uid = r["uid"]
+ tid = r["tid"]
+ title = r["title"]
+ content = r["content"]
+ floor = r["floor"]
+ time_str = r["time"]
+ try:
+ t = time.strptime(time_str,"%Y-%m-%d %H:%M:%S")
+ time_str_secs = int(time.mktime(t))
+ except:
+ print "Failed converting date %s " % time_str
+ continue
+ self.indexHKForumPost(pid, uid, tid, title, content, floor, time_str_secs)
+ else:
+ return 0
+
+ def indexHKForumPost(self, pid, uid, tid, title, content, floor, time):
+ try:
+ doc = lucene.Document()
+ doc.add(lucene.NumericField("pid", 8, lucene.Field.Store.YES, True).setLongValue(long(pid)))
+ doc.add(lucene.NumericField("uid", 8, lucene.Field.Store.YES, True).setLongValue(long(uid)))
+ doc.add(lucene.NumericField("tid", 8, lucene.Field.Store.YES, True).setLongValue(long(tid)))
+ doc.add(lucene.Field("title", title,
+ lucene.Field.Store.NO,
+ lucene.Field.Index.ANALYZED))
+ doc.add(lucene.Field("content", content,
+ lucene.Field.Store.NO,
+ lucene.Field.Index.ANALYZED))
+ doc.add(lucene.NumericField("floor", lucene.Field.Store.YES, True).setIntValue(floor))
+ doc.add(lucene.NumericField("time", lucene.Field.Store.YES, True).setIntValue(time))
+ self.writer.addDocument(doc)
+ except Exception, e:
+ print "Failed in indexWeibos:", e
+
+if __name__ == '__main__':
+ networks = ["uwants", "hkreporter", "discuss", "facebook", "twitter"]
+ if len(sys.argv) <= 1:
+ print SocialLucene.__doc__
+ sys.exit(1)
+ opt = 0
+ network = sys.argv[1]
+ if network not in networks:
+ print SocialLucene.__doc__
+ sys.exit()
+ for i in range(2,len(sys.argv)):
+ arg = sys.argv[i]
+ if arg == "-d" or arg == "--date":
+ if len(sys.argv) > i + 1:
+ opt = 1
+ insertdate_str = sys.argv[i+1]
+ '''
+ if len(sys.argv) > i + 2:
+ if ":" in sys.argv[i+2]:
+ insertdate_str += " " + sys.argv[i+2]
+ '''
+ break
+ lucene.initVM(lucene.CLASSPATH)
+ print 'lucene', lucene.VERSION
+ start = datetime.datetime.now()
+ indexer = SocialLucene(network=network)
+ if opt == 1:
+ indexer.indexByDate(insertdate_str)
+ try:
+ print 'optimizing index'
+ indexer.writer.optimize()
+ print 'done'
+ indexer.writer.close()
+ except:
+ pass
+ end = datetime.datetime.now()
+ print end - start
Please sign in to comment.
Something went wrong with that request. Please try again.