Skip to content
Browse files

More code for Hive.

  • Loading branch information...
1 parent 24eaed7 commit 97376f3f694a4d38e8234c3cf2e2635e6c89cfd1 @tomwhite committed May 26, 2010
View
5 input/hive/joins/sales.txt
@@ -0,0 +1,5 @@
+Joe 2
+Hank 4
+Ali 0
+Eve 3
+Hank 2
View
4 input/hive/joins/things.txt
@@ -0,0 +1,4 @@
+2 Tie
+4 Coat
+3 Hat
+1 Scarf
View
4 input/hive/tables/users.txt
@@ -0,0 +1,4 @@
+0Nat
+2Joe
+3Kay
+4Ann
View
3 input/hive/tmp.txt
@@ -0,0 +1,3 @@
+1 a
+2 b
+3 c
View
1 input/hive/types/complex.txt
@@ -0,0 +1 @@
+12a1b2a11.0
View
3 input/hive/udfs/max1.txt
@@ -0,0 +1,3 @@
+1
+2
+3
View
2 input/hive/udfs/max2.txt
@@ -0,0 +1,2 @@
+4
+3
View
47 src/main/hive/hive/buckets.txt
@@ -0,0 +1,47 @@
+echo -e '0\x01Nat' > /Users/tom/workspace/hadoop-book/input/hive/tables/users.txt
+echo -e '2\x01Joe' >> /Users/tom/workspace/hadoop-book/input/hive/tables/users.txt
+echo -e '3\x01Kay' >> /Users/tom/workspace/hadoop-book/input/hive/tables/users.txt
+echo -e '4\x01Ann' >> /Users/tom/workspace/hadoop-book/input/hive/tables/users.txt
+
+drop table users;
+drop table bucketed_users;
+
+CREATE TABLE users (id INT, name STRING);
+
+LOAD DATA LOCAL INPATH '/Users/tom/workspace/hadoop-book/input/hive/tables/users.txt'
+OVERWRITE INTO TABLE users;
+
+dfs -cat /user/hive/warehouse/users/users.txt;
+
+CREATE TABLE bucketed_users (id INT, name STRING)
+CLUSTERED BY (id) INTO 4 BUCKETS;
+
+CREATE TABLE bucketed_users (id INT, name STRING)
+CLUSTERED BY (id) SORTED BY (id) INTO 4 BUCKETS;
+
+SELECT * FROM users;
+
+SET hive.enforce.bucketing=true; -- from Hive 0.6.0, previously had to set number of reducers manually
+
+INSERT OVERWRITE TABLE bucketed_users
+SELECT * FROM users;
+
+dfs -ls /user/hive/warehouse/bucketed_users;
+
+-- Sampling
+-- why is this useful again?
+-- useful for creating samples of tables to try out your queries on
+-- can use rand() on unbucketed tables, but inefficient as it scans whole dataset
+
+SELECT * FROM bucketed_users
+TABLESAMPLE(BUCKET 1 OUT OF 4 ON id);
+
+SELECT * FROM bucketed_users
+TABLESAMPLE(BUCKET 1 OUT OF 2 ON id);
+
+SELECT * FROM users
+TABLESAMPLE(BUCKET 1 OUT OF 4 ON rand());
+
+-- buckets make certain mapside joins possible, since if the two tables are bucketed on the same column. Only the necessary buckets
+need be fetched.
+-- if sorted, even more efficient, since the mapper only needs to merge (not sort)
View
49 src/main/hive/hive/joins.txt
@@ -0,0 +1,49 @@
+CREATE TABLE sales (name STRING, id INT)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
+
+CREATE TABLE things (id INT, name STRING)
+ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
+
+LOAD DATA LOCAL INPATH '/Users/tom/workspace/hadoop-book/input/hive/joins/sales.txt'
+OVERWRITE INTO TABLE sales;
+
+LOAD DATA LOCAL INPATH '/Users/tom/workspace/hadoop-book/input/hive/joins/things.txt'
+OVERWRITE INTO TABLE things;
+
+SELECT * FROM sales;
+
+SELECT * FROM things;
+
+-- inner
+
+SELECT sales.*, things.*
+FROM sales JOIN things ON (sales.id = things.id);
+
+
+-- outer
+
+SELECT sales.*, things.*
+FROM sales LEFT OUTER JOIN things ON (sales.id = things.id);
+
+SELECT sales.*, things.*
+FROM sales RIGHT OUTER JOIN things ON (sales.id = things.id);
+
+SELECT sales.*, things.*
+FROM sales FULL OUTER JOIN things ON (sales.id = things.id);
+
+-- semi join
+
+SELECT *
+FROM things
+WHERE things.id IN (SELECT id from sales);
+
+SELECT *
+FROM things LEFT SEMI JOIN sales ON (sales.id = things.id);
+
+-- map side
+
+SELECT /*+ MAPJOIN(things) */ sales.*, things.*
+FROM sales JOIN things ON (sales.id = things.id);
+
+SELECT /*+ MAPJOIN(things) */ sales.*, things.*
+FROM sales LEFT OUTER JOIN things ON (sales.id = things.id);
View
26 src/main/hive/hive/mapreduce.txt
@@ -0,0 +1,26 @@
+ADD FILE /Users/tom/workspace/hadoop-book/src/main/hive/python/is_good_quality.py;
+ADD FILE /Users/tom/workspace/hadoop-book/src/main/hive/python/max_temperature_reduce.py;
+
+FROM records2
+SELECT TRANSFORM(year, temperature, quality)
+USING 'is_good_quality.py'
+AS year, temperature;
+
+FROM (
+ FROM records2
+ MAP year, temperature, quality
+ USING 'is_good_quality.py'
+ AS year, temperature) map_output
+REDUCE year, temperature
+USING 'max_temperature_reduce.py'
+AS year, temperature;
+
+FROM (
+ FROM records2
+ SELECT TRANSFORM(year, temperature, quality)
+ USING 'is_good_quality.py'
+ AS year, temperature) map_output
+SELECT TRANSFORM(year, temperature)
+USING 'max_temperature_reduce.py'
+AS year, temperature;
+
View
76 src/main/hive/hive/max_temp.hive
@@ -0,0 +1,76 @@
+CREATE TABLE records (year STRING, temperature INT, quality INT)
+ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '\t';
+
+LOAD DATA LOCAL INPATH 'input/ncdc/micro-tab/sample.txt'
+OVERWRITE INTO TABLE records;
+
+SELECT year, MAX(temperature)
+FROM records
+WHERE temperature != 9999
+ AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = 5 OR quality = 9)
+GROUP BY year;
+
+-- weather dataset stats
+
+CREATE TABLE records2 (station STRING, year STRING, temperature INT, quality INT)
+ROW FORMAT DELIMITED
+ FIELDS TERMINATED BY '\t';
+
+LOAD DATA LOCAL INPATH '/Users/tom/workspace/hadoop-book/input/ncdc/micro-tab/sample2.txt'
+OVERWRITE INTO TABLE records2;
+
+-- total stations and records (by year)
+SELECT year, COUNT(DISTINCT station), COUNT(1)
+FROM records2
+GROUP BY year;
+
+-- total good records (by year)
+SELECT year, COUNT(1)
+FROM records2
+WHERE temperature != 9999
+ AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = 5 OR quality = 9)
+GROUP BY year;
+
+-- scans data twice...
+
+CREATE TABLE stations_by_year (year STRING, num INT);
+CREATE TABLE records_by_year (year STRING, num INT);
+CREATE TABLE good_records_by_year (year STRING, num INT);
+
+FROM records2
+INSERT OVERWRITE TABLE stations_by_year
+ SELECT year, COUNT(DISTINCT station)
+ GROUP BY year
+INSERT OVERWRITE TABLE records_by_year
+ SELECT year, COUNT(1)
+ GROUP BY year
+INSERT OVERWRITE TABLE good_records_by_year
+ SELECT year, COUNT(1)
+ WHERE temperature != 9999
+ AND (quality = 0 OR quality = 1 OR quality = 4 OR quality = 5 OR quality = 9)
+ GROUP BY year;
+
+SELECT /*+ MAPJOIN(records_by_year,good_records_by_year) */
+ stations_by_year.year, stations_by_year.num, records_by_year.num, good_records_by_year.num
+FROM stations_by_year
+ JOIN records_by_year ON (stations_by_year.year = records_by_year.year)
+ JOIN good_records_by_year ON (stations_by_year.year = good_records_by_year.year);
+
+
+--
+
+-- DISTRIBUTED BY
+
+FROM records2
+SELECT year, temperature
+DISTRIBUTE BY year
+SORT BY year ASC, temperature DESC;
+
+FROM (
+FROM records2
+SELECT year, temperature
+DISTRIBUTE BY year
+SORT BY year ASC, temperature DESC) t
+SELECT year, MAX(temperature)
+GROUP BY year;
View
10 src/main/hive/hive/partitions.hive
@@ -0,0 +1,10 @@
+CREATE TABLE logs (ts BIGINT, line STRING)
+PARTITIONED BY (dt STRING, country STRING);
+
+LOAD DATA LOCAL INPATH 'input/hive/tables/file1'
+INTO TABLE logs
+PARTITION (dt='2001-01-01', country='GB');
+
+SELECT ts, dt, line
+FROM logs
+WHERE country='GB';
View
3 src/main/hive/hive/rcfile.hive
@@ -0,0 +1,3 @@
+CREATE TABLE columnar (a STRING, b STRING, c STRING, d STRING, e STRING)
+ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
+STORED AS RCFILE;
View
12 src/main/hive/hive/types.hive
@@ -0,0 +1,12 @@
+-- echo -e '1\x022\x01a\x031\x02b\x032\x01a\x021\x021.0' > /Users/tom/workspace/hadoop-book/input/hive/types/complex.txt
+
+CREATE TABLE complex (
+ col1 ARRAY<INT>,
+ col2 MAP<STRING, INT>,
+ col3 STRUCT<a:STRING, b:INT, c:DOUBLE>
+);
+
+LOAD DATA LOCAL INPATH '/Users/tom/workspace/hadoop-book/input/hive/types/complex.txt'
+OVERWRITE INTO TABLE complex;
+
+SELECT col1[0], col2['b'], col3.c FROM complex;
View
46 src/main/hive/java/com/hadoopbook/hive/Maximum.java
@@ -0,0 +1,46 @@
+package com.hadoopbook.hive;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
+import org.apache.hadoop.io.IntWritable;
+
+public class Maximum extends UDAF {
+
+ public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {
+
+ private IntWritable result;
+
+ public void init() {
+ System.err.printf("%s %s\n", hashCode(), "init");
+ result = null;
+ }
+
+ public boolean iterate(IntWritable value) {
+ System.err.printf("%s %s %s\n", hashCode(), "iterate", value);
+ if (value == null) {
+ return true;
+ }
+ if (result == null) {
+ result = new IntWritable(value.get());
+ } else {
+ result.set(Math.max(result.get(), value.get()));
+ }
+ return true;
+ }
+
+ public IntWritable terminatePartial() {
+ System.err.printf("%s %s\n", hashCode(), "terminatePartial");
+ return result;
+ }
+
+ public boolean merge(IntWritable other) {
+ System.err.printf("%s %s %s\n", hashCode(), "merge", other);
+ return iterate(other);
+ }
+
+ public IntWritable terminate() {
+ System.err.printf("%s %s\n", hashCode(), "terminate");
+ return result;
+ }
+ }
+}
View
56 src/main/hive/java/com/hadoopbook/hive/Mean.java
@@ -0,0 +1,56 @@
+package com.hadoopbook.hive;
+
+import org.apache.hadoop.hive.ql.exec.UDAF;
+import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+
+public class Mean extends UDAF {
+
+ public static class MeanDoubleUDAFEvaluator implements UDAFEvaluator {
+ public static class PartialResult {
+ double sum;
+ long count;
+ }
+
+ private PartialResult partial;
+
+ public void init() {
+ partial = null;
+ }
+
+ public boolean iterate(DoubleWritable value) {
+ if (value == null) {
+ return true;
+ }
+ if (partial == null) {
+ partial = new PartialResult();
+ }
+ partial.sum += value.get();
+ partial.count++;
+ return true;
+ }
+
+ public PartialResult terminatePartial() {
+ return partial;
+ }
+
+ public boolean merge(PartialResult other) {
+ if (other == null) {
+ return true;
+ }
+ if (partial == null) {
+ partial = new PartialResult();
+ }
+ partial.sum += other.sum;
+ partial.count += other.count;
+ return true;
+ }
+
+ public DoubleWritable terminate() {
+ if (partial == null) {
+ return null;
+ }
+ return new DoubleWritable(partial.sum / partial.count);
+ }
+ }
+}
View
9 src/main/hive/python/is_good_quality.py
@@ -0,0 +1,9 @@
+#!/usr/bin/env python
+
+import re
+import sys
+
+for line in sys.stdin:
+ (year, temp, q) = line.strip().split()
+ if (temp != "9999" and re.match("[01459]", q)):
+ print "%s\t%s" % (year, temp)
View
15 src/main/hive/python/max_temperature_reduce.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+
+import sys
+
+(last_key, max_val) = (None, 0)
+for line in sys.stdin:
+ (key, val) = line.strip().split("\t")
+ if last_key and last_key != key:
+ print "%s\t%s" % (last_key, max_val)
+ (last_key, max_val) = (key, int(val))
+ else:
+ (last_key, max_val) = (key, max(max_val, int(val)))
+
+if last_key:
+ print "%s\t%s" % (last_key, max_val)

0 comments on commit 97376f3

Please sign in to comment.
Something went wrong with that request. Please try again.