Permalink
Browse files

First commit

  • Loading branch information...
0 parents commit 350ff1ee1b45dcc1b53c9e26fcb1a9ce6f350294 @simonholgate committed Jul 1, 2012
Showing with 486 additions and 0 deletions.
  1. +14 −0 .gitignore
  2. +20 −0 README.md
  3. +20 −0 project.clj
  4. +235 −0 src/hbase_likely/core.clj
  5. +58 −0 src/hbase_likely/hbase.clj
  6. +132 −0 src/hbase_likely/query.clj
  7. +7 −0 test/hbase_likely/core_test.clj
14 .gitignore
@@ -0,0 +1,14 @@
+/target
+/lib
+/classes
+/checkouts
+pom.xml
+*.jar
+*.class
+.lein-deps-sum
+.lein-failures
+.lein-plugins
+*~
+*.lock
+*.swp
+*.out
20 README.md
@@ -0,0 +1,20 @@
+# hbase-likely
+
+A collection of functions to explore Twitter data stored in HBase for a
+likely.co project.
+
+## Usage
+
+With the HBase tables of 'tweets', and 'short_urls' the 'language' and 'urls'
+tables must be created in the HBase shell with:
+create 'urls', 'url'
+create 'language', 'lang'
+Then run '-main' in the repl or lein run.
+
+Cascalog queries are defined in the 'hbase-likely.query' namespace.
+
+## License
+
+Copyright © 2012 Simon Holgate
+
+Distributed under the Eclipse Public License, the same as Clojure.
20 project.clj
@@ -0,0 +1,20 @@
+(defproject hbase-likely "0.1.0"
+ :description "Code to connect to HBase for Likely.co project"
+ :url "http://example.com/FIXME"
+ :license {:name "Eclipse Public License"
+ :url "http://www.eclipse.org/legal/epl-v10.html"}
+ :dependencies [[org.clojure/clojure "1.3.0"]
+ [org.apache.hbase/hbase "0.92.1"]
+ [org.apache.hadoop/hadoop-core "1.0.3"]
+ [com.cybozu.labs.langdetect/langdetect "09.13.2011"]
+ [net.arnx.jsonic/jsonic "1.1.3"]
+ [cascalog "1.9.0"]
+ [cascading/cascading-core "2.0.0"]
+ [com.twitter/maple "0.2.0"]
+ [cascading.hbase "0.0.2"]
+ [org.codehaus.jackson/jackson-core-lgpl "1.9.7"]
+ [org.codehaus.jackson/jackson-mapper-lgpl "1.9.7"]
+ [clojure-csv/clojure-csv "2.0.0-alpha1"]]
+ :profiles {:dev {:dependencies [[org.apache.hadoop/hadoop-core "1.0.3"]]}}
+ :repositories {"mvnrepository.com" "http://mvnrepository.com/artifact/"
+ "conjars" "http://conjars.org/repo"})
235 src/hbase_likely/core.clj
@@ -0,0 +1,235 @@
+;;
+(ns hbase-likely.core
+ (:require (cascalog [workflow :as w]
+ [ops :as c]
+ [vars :as v]))
+ (:import (java.util Set ArrayList)
+ (org.apache.hadoop.hbase HBaseConfiguration)
+ (org.apache.hadoop.conf Configuration)
+ (org.apache.hadoop.hbase.client Put Get HTable Scan)
+ (org.apache.hadoop.hbase.util Bytes)
+ (com.twitter.maple.hbase HBaseTap HBaseScheme)
+ (cascading.tuple Fields)
+ (cascading.hbase ByteHolder)
+ (com.cybozu.labs.langdetect Detector DetectorFactory Language)))
+
+(defn -main
+ "Populate data tables with languages of tweets and full urls
+ Assumes that:
+ create 'urls', 'url'
+ and
+ create 'language', 'lang'
+ have been run in the hbase shell.
+ This may take some time to run..."
+ []
+ (process-languages "tweets" "base" "content")
+ (process-language-probabilities "tweets" "base" "content")
+ (split-urls "tweets" "base"))
+
+(defn hbase-table [table-name]
+ ;; Note that (HBaseConfiguration.) is deprecated in HBase 0.95-SNAPSHOT
+ ;; and will be replaced by (Configuration/create)
+ (HTable. (HBaseConfiguration.) table-name))
+
+(defn hbase-scan
+ "Returns a scan object with added column"
+ [column-family qualifier]
+ (.addColumn (Scan.) (Bytes/toBytes column-family) (Bytes/toBytes qualifier)))
+
+(defn hbase-scans
+ "Returns a scan object with added columns"
+ [column-family qualifiers]
+ (let [s (Scan.)]
+ (doseq [q qualifiers]
+ (.addColumn s (Bytes/toBytes column-family) (Bytes/toBytes q)))
+ s))
+
+(defn hbase-results
+ "Returns a ResultScanner for a table and scanner object"
+ [table scanner]
+ (.getScanner table scanner))
+
+;; Detect the language
+(defn load-detector-factory-profile []
+ (DetectorFactory/loadProfile "/home/hduser/src/java/langdetect/profiles/"))
+
+(defn detect-language
+ "Detect the most probable language of a string"
+ [string]
+ (let [detector (DetectorFactory/create)]
+ (.append detector string)
+ (String. (.detect detector))))
+
+(defn detect-language-probabilities
+ "Return an seq of com.cybozu.labs.langdetect.Language containing the possible
+ languages of a string along with their probabilities"
+ [string]
+ (let [detector (DetectorFactory/create)]
+ (.append detector string)
+ (seq (.getProbabilities detector))))
+
+(defn lang-detect-rs
+ "Iterates over a result set and detects language"
+ [results table column-family qualifier]
+ (let [iterator (.iterator results)]
+ (while (.hasNext iterator)
+ (let [row (.next iterator)
+ row-id (String. (.getRow row))
+ put-data (Put. (Bytes/toBytes row-id))]
+ (try
+ (let [lang (detect-language
+ (String. (.getValue row
+ (Bytes/toBytes column-family)
+ (Bytes/toBytes qualifier))))]
+ (.add put-data (Bytes/toBytes column-family)
+ (Bytes/toBytes "language")
+ (Bytes/toBytes lang))
+ (.put table put-data))
+ (catch Exception e
+ (.add put-data (Bytes/toBytes column-family)
+ (Bytes/toBytes "language")
+ (Bytes/toBytes "nil"))
+ 0))))))
+
+(defn split-lang-prob
+ "Splits a Language string into language and probability"
+ [language]
+ (seq (.split #":" (.toString language))))
+
+(defn lang-prob-rs
+ "Iterates over a result set and detects language probabilities"
+ [results table lang-table column-family qualifier]
+ (let [iterator (.iterator results)]
+ (while (.hasNext iterator)
+ (let [row (.next iterator)
+ row-id (String. (.getRow row))
+ put-data (Put. (Bytes/toBytes row-id))]
+ (try
+ (let [lang (detect-language-probabilities
+ (String. (.getValue row
+ (Bytes/toBytes column-family)
+ (Bytes/toBytes qualifier))))
+ ;; Each language and probability in the array needs to be added
+ ;; to the 'language' table. The qualifier for the table
+ ;; is the language and the value is the probability
+ lang-prob (map #(split-lang-prob %) lang)]
+ (doseq [lp lang-prob]
+ (let [lang (first lp)
+ prob (second lp)]
+ (.add put-data (Bytes/toBytes "lang")
+ (Bytes/toBytes lang)
+ (Bytes/toBytes prob))))
+ (.put lang-table put-data))
+ (catch Exception e
+ ;; If no language is detected add 'nil' to 'other' qualifier
+ ;; in language table
+ (.add put-data (Bytes/toBytes "lang")
+ (Bytes/toBytes "other")
+ (Bytes/toBytes "nil"))
+ (.put lang-table put-data)
+ 0))))))
+
+(defn process-languages
+ "Detect languages for every tweet"
+ [table-name column-family qualifier]
+ (let [table (hbase-table table-name)
+ scan (hbase-scan column-family qualifier)
+ rs (hbase-results table scan)]
+ (load-detector-factory-profile)
+ (lang-detect-rs rs table column-family qualifier)))
+
+;; (process-languages "tweets_copy" "base" "content")
+
+(defn process-language-probabilities
+ "Detect language probabilities for every tweet"
+ [table-name column-family qualifier]
+ (let [table (hbase-table table-name)
+ lang-table (hbase-table "language")
+ scan (hbase-scan column-family qualifier)
+ rs (hbase-results table scan)]
+ (load-detector-factory-profile)
+ (lang-prob-rs rs table lang-table column-family qualifier)))
+
+;; (process-language-probabilities "tweets_copy" "base" "content")
+
+(defn strip-url
+ "Strips a split url of square braces and quotes"
+ [url]
+ (let [url1 (.replaceAll url "\\x5B" "")
+ url2 (.replaceAll url1 "\\x5D" "")]
+ ;; The double quote character is hex 22
+ (.replaceAll url2 "\\x22" "")))
+
+(defn add-url
+ "Takes a string url and adds it to the database"
+ [short-url full-url screen-name tweet-id]
+ (let [url-table (hbase-table "urls")
+ s-url (strip-url short-url)
+ split-url (second (.split s-url "\\x2F\\x2F"))
+ row-id (str tweet-id "-" split-url)
+ p (Put. (Bytes/toBytes row-id))
+ cf (Bytes/toBytes "url")]
+ (doto p
+ (.add cf (Bytes/toBytes "screen_name") (Bytes/toBytes screen-name))
+ (.add cf (Bytes/toBytes "tweet_id") (Bytes/toBytes tweet-id))
+ (.add cf (Bytes/toBytes "short_url") (Bytes/toBytes s-url))
+ (.add cf (Bytes/toBytes "full_url") (Bytes/toBytes full-url)))
+ (.put url-table p)))
+
+(defn lookup-url
+ "Takes url string and looks it up in short_url table"
+ [url-str]
+ (let [stripped (strip-url url-str)
+ g (Get. (Bytes/toBytes stripped))
+ table-name "short_urls"
+ cf (Bytes/toBytes "rel")
+ q (Bytes/toBytes "url")
+ url-table (hbase-table table-name)]
+ (.addColumn g cf q)
+ (let [result (.get url-table g)]
+ (if (.containsColumn result cf q)
+ (String. (.getValue result cf q))
+ ;; If the full-url can't be found just return the stripped
+ ;; original url string
+ stripped))))
+
+(defn tweet-iterator-seq
+ "Returns a lazy sequence of rows from the resultset iterator"
+ [iterator]
+ (lazy-seq
+ (when (.hasNext iterator)
+ (cons (.next iterator) (tweet-iterator-seq iterator)))))
+
+(defn url-rs-seq
+ "Iterate over a lazy-seq of urls"
+ [rs column-family qualifiers]
+ (let [iterator (.iterator rs)
+ row-seq (tweet-iterator-seq iterator)]
+ (for [row row-seq]
+ (let [row-id (String. (.getRow row))
+ screen-name (String. (.getValue row
+ (Bytes/toBytes column-family)
+ (Bytes/toBytes (first qualifiers))))]
+ ;; Check if there are actually urls present in the tweet
+ (if (.containsColumn row
+ (Bytes/toBytes column-family)
+ (Bytes/toBytes (second qualifiers)))
+ (let [urls (String. (.getValue row
+ (Bytes/toBytes column-family)
+ (Bytes/toBytes (second qualifiers))))
+ url-seq (seq (.split urls ","))]
+ (map #(add-url % (lookup-url %) screen-name row-id) url-seq))
+ ;; If no urls are present, no need to add them
+ (println (str "Warning: no urls present in row: " row-id)))))))
+
+(defn split-urls
+ "Split multiple urls and place into a new table"
+ [table-name column-family]
+ (let [qualifiers ["screen_name" "urls"]
+ table (hbase-table table-name)
+ scan (hbase-scans column-family qualifiers)
+ rs (hbase-results table scan)]
+ (url-rs-seq rs column-family qualifiers)))
+
+;; Stunningly slow!
+;;(split-urls "tweets_copy" "base")
58 src/hbase_likely/hbase.clj
@@ -0,0 +1,58 @@
+;;
+(ns hbase-likely.hbase
+ (:require (cascalog [workflow :as w]
+ [ops :as c]
+ [vars :as v]))
+ (:import (org.apache.hadoop.hbase.util Bytes)
+ (com.twitter.maple.hbase HBaseTap HBaseScheme)
+ (cascading.hbase ByteHolder)))
+
+(use '[cascalog api playground])
+(bootstrap-emacs)
+
+;; (defn hbase-tap [table-name key-field column-family & value-fields]
+;; (let [scheme (HBaseScheme. (w/fields key-field) column-family (w/fields value-fields))]
+;; (HBaseTap. table-name scheme)))
+
+;; (defn as-string [^ByteHolder bytes]
+;; (.toString bytes))
+
+;; (let [age-table (hbase-tap "age-table" "?person" "cf" "?age")]
+;; (?<- (stdout) [?p ?a] (age-table ?p ?age) (as-string ?age :> ?a)))
+
+(defn hbase-tap [table-name key-field column-family & value-fields]
+ (let [scheme (HBaseScheme. (w/fields key-field) column-family (w/fields value-fields))]
+ (HBaseTap. table-name scheme)))
+
+(defn as-string [^ByteHolder bytes]
+ (.toString bytes))
+
+(defn to-string [bytes]
+ (String. bytes))
+
+(defn to-split-string [bytes]
+ (seq (.split #"\," (String. bytes))))
+
+(defn to-int [bytes]
+ (Integer/parseInt (String. bytes)))
+
+(defmapcatop split
+ "Splits a 'sentence' on non-word characters"
+ [sentence]
+ (seq (.split sentence "\\s+")))
+
+(defmapcatop split-urls
+ "Splits a set of comma separated urls"
+ [urls]
+ (seq (.split #"\\," urls)))
+
+(defmapcatop strip-urls
+ "Strips a split url of square braces and quotes"
+ [url]
+ (let [url1 (.replaceAll url "\\x5B" "")
+ url2 (.replaceAll url1 "\\x5D" "")]
+ ;; The double quote character is hex 22
+ (.replaceAll url2 "\\x22" "")))
+
+
+(defn lowercase [w] (.toLowerCase w))
132 src/hbase_likely/query.clj
@@ -0,0 +1,132 @@
+;; Queries on HBase made in Cascalog
+(ns hbase-likely.query)
+
+(use 'hbase-likely.hbase)
+(use '[cascalog api playground])
+(require '(cascalog [workflow :as w]
+ [ops :as c]
+ [vars :as v]))
+(bootstrap-emacs)
+
+(defbufferop first-tuple [tuples] (take 1 tuples))
+
+(defn distinct-langs
+ "How many distinct languages are found?"
+ []
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base" "language")]
+ (?<- (stdout) [?count] (h-table _ ?language)
+ (to-string ?language :> ?l)
+ (c/distinct-count ?l :> ?count))))
+;; 44!
+;; Do we really have a tweet in Tamil? Yep!
+(defn tweet-lang
+ "Find tweets with a given language, e.g. 'en'"
+ [lang-str]
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base"
+ "screen_name" "language")]
+ (?<- (stdout) [?l ?r ?s] (h-table ?row-id ?screen-name ?language)
+ (to-string ?language :> ?l)
+ (= ?l lang-str)
+ (to-string ?screen-name :> ?s)
+ (to-string ?row-id :> ?r))))
+
+(defn timezone-lang
+ "Find the timezone of a tweet with a given language, e.g. 'en'"
+ [lang-str]
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base"
+ "screen_name" "language")
+ t-table (hbase-tap "twitter_accounts_copy" "row-id" "base"
+ "screen_name" "time_zone")]
+ (?<- (stdout) [?l ?r ?s ?t]
+ (h-table ?row-id ?screen-name ?language)
+ (to-string ?language :> ?l)
+ (= ?l lang-str)
+ (t-table ?row ?screen-name ?tz)
+ (to-string ?screen-name :> ?s)
+ (to-string ?row :> ?r)
+ (to-string ?tz :> ?t))))
+
+(defn lang-tweet-count
+ "How many tweets of each language?"
+ []
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base" "language")]
+ (?<- (stdout) [?l ?count] (h-table _ ?language)
+ (to-string ?language :> ?l)
+ (c/count ?count))))
+
+(defn distinct-urls
+ "How many different urls are there?"
+ []
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base" "urls")
+ u-table (hbase-tap "short_urls_copy" "row-id" "rel" "url")]
+ ;; Note that there may be multiple urls in a tweet
+ ;; Also note that these urls are shortened and will all be different
+ ;; so we need to convert them back into full urls
+ ;; URLs are comma separated and quoted
+ (?<- (stdout) [?count]
+ (h-table _ ?urls)
+ (to-string ?urls :> ?u)
+ (split-urls ?u :> ?usplit)
+ (strip-urls ?usplit :> ?ustripped)
+ (u-table ?ustripped ?full-url)
+ (c/distinct-count ?ustripped :> ?count))))
+;; 128759 urls without inner join. But of course the same URL may be shortened
+;; in a different way so need to link to short_urls table.
+
+(defn distinct-full-urls
+ "How many different full urls are there?"
+ []
+ (let [u-table (hbase-tap "short_urls_copy" "row-id" "rel" "url")]
+ (?<- (stdout) [?count]
+ (u-table ?ustripped ?full-url)
+ (c/distinct-count ?full-url :> ?count))))
+;; 128759 urls in tweets. 147548 distinct full urls. What gives? Multiple
+;; urls in the same tweet is one problem.
+
+(defn distinct-full-urls-2
+ "How many different full urls are there in the urls table?"
+ []
+ (let [u-table (hbase-tap "urls" "row-id" "url" "full_url")]
+ (?<- (stdout) [?count]
+ (u-table ?row-id ?full-url)
+ (to-string ?full-url :> ?u)
+ (c/distinct-count ?u :> ?count))))
+;; 43017 - now they have been split down properly.
+
+(defn url-tweet-count
+ "How many tweets of each url are there?"
+ []
+ (let [u-table (hbase-tap "urls" "row-id" "url" "full_url")]
+ (?<- (stdout) [?u ?count]
+ (u-table ?row ?url)
+ (to-string ?url :> ?u)
+ (c/count ?count))))
+
+(defn url-tweet-count-2
+ "How many times does each person tweet a url??"
+ []
+ (let [h-table (hbase-tap "tweets_copy" "row-id" "base" "screen_name")
+ u-table (hbase-tap "urls" "row-id" "url" "screen_name")]
+ (?<- (stdout) [?sn ?most]
+ (h-table ?hrow ?screen-name)
+ (u-table ?urow ?screen-name)
+ (to-string ?screen-name :> ?sn)
+ (c/count ?count))))
+
+(defn distinct-timezones
+ "How many timezones are there?"
+ []
+ (let [h-table (hbase-tap "twitter_accounts_copy" "row-id" "base" "time_zone")]
+ (?<- (stdout) [?count] (h-table _ ?timezone)
+ (to-string ?timezone :> ?tz)
+ (c/distinct-count ?tz :> ?count))))
+;; 139
+
+(defn people-timezone-count
+ "How many people from each timezone are there?"
+ []
+ (let [a-table (hbase-tap "twitter_accounts_copy" "row-id" "base" "time_zone")]
+ (?<- (stdout) [?tz ?count] (a-table _ ?timezone)
+ (to-string ?timezone :> ?tz)
+ (:sort ?tz)
+ (c/count ?count))))
7 test/hbase_likely/core_test.clj
@@ -0,0 +1,7 @@
+(ns hbase-likely.core-test
+ (:use clojure.test
+ hbase-likely.core))
+
+(deftest a-test
+ (testing "FIXME, I fail."
+ (is (= 0 1))))

0 comments on commit 350ff1e

Please sign in to comment.