Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Bigger example

abaranau edited this page · 1 revision

TODO: provide simpler/cleaner example

One can find simple examples in unit-test dir. Here's one of them.

Suppose we need to track last 5 sales info for each company's stocks do display it to user on web-interface.

The direct solution would be for each deal to Get sales info of particular company, update the sales info and Put updated record into table. This would require performing Get operation on every single record written which may affect write performance dramatically.

With HBaseHUT we can write new data "as is" and defer updates to the time we really need it. Minor code changes needed to make use of it:

  • write code is different only by using "new HutPut()" statement;
  • reading code is different only by wrapping ResultScanner with "new HutResultScanner(originalScanner)";
  • updates processing code is very close to possible MR job code or Get&Put code: records to be processed comes using simple iterator as in working with ResultScanner or with Map function.

Example code (copied from unit-test, TODO: add better example):

...
  public static final byte[] SALE_CF = Bytes.toBytes("sale");

  StockSaleUpdateProcessor processor = new StockSaleUpdateProcessor();

  public void testSimpleScan() throws IOException, InterruptedException {
    ...
    byte[] chrysler = Bytes.toBytes("chrysler");
    byte[] ford = Bytes.toBytes("ford");

    recordSale(chrysler, 90);
    recordSale(chrysler, 100);
    recordSale(ford, 18);
    recordSale(chrysler, 120);
    verifyLastSales(chrysler, new int[] {120, 100, 90});
    verifyLastSales(ford, new int[] {18});

    recordSale(chrysler, 115);
    recordSale(ford, 22);
    recordSale(chrysler, 110);
    verifyLastSales(chrysler, new int[] {110, 115, 120, 100, 90});
    verifyLastSales(ford, new int[] {22, 18});

    recordSale(chrysler, 105);
    recordSale(ford, 24);
    recordSale(ford, 28);
    verifyLastSales(processor, chrysler, new int[] {105, 110, 115, 120, 100});
    verifyLastSales(processor, ford, new int[] {28, 24, 22, 18});
  }

  private static void recordSale(byte[] company, int price) throws InterruptedException, IOException {
    Put put = new HutPut(company);
    put.add(SALE_CF, Bytes.toBytes("lastPrice0"), Bytes.toBytes(price));
    hTable.put(put);
  }

  private static void verifyLastSales(byte[] company, int[] prices) throws IOException {
    ResultScanner resultScanner =
            new HutResultScanner(hTable.getScanner(new Scan(company)), updateProcessor);
    Result result = resultScanner.next();
    verifyLastSales(result, prices);
  }

  private static void verifyLastSales(Result result, int[] prices) {
    // if there's no records yet, then prices are empty
    if (result == null) {
      Assert.assertTrue(prices.length == 0);
      return;
    }

    for (int i = 0; i < 5; i++) {
      byte[] lastStoredPrice = result.getValue(SALE_CF, Bytes.toBytes("lastPrice" + i));
      if (i < prices.length) {
        Assert.assertEquals(prices[i], Bytes.toInt(lastStoredPrice));
      } else {
        Assert.assertNull(lastStoredPrice);
      }
    }
  }

  static class StockSaleUpdateProcessor implements UpdateProcessor {
    @Override
    public void process(Iterable<Result> records, UpdateProcessingResult processingResult) {
      // Processing records
      byte[][] lastPricesBuff = new byte[5][];
      int lastIndex = -1;
      for (Result record : records) {
        for (int i = 0; i < 5; i++) {
          // "lastPrice0" is the most recent one, hence should be added as last
          byte[] price = getPrice(record, "lastPrice" + (4 - i));
          lastIndex = addPrice(lastPricesBuff, price, lastIndex);
        }
      }

      // Writing result
      if (lastIndex == -1) { // nothing to output
        return;
      }
      for (int i = 0; i < 5; i++) {
        // iterating backwards so that "lastPrice0" is set to the most recent one
        byte[] price = lastPricesBuff[(lastIndex + 5 - i) % 5];
        if (price != null) {
          processingResult.add(SALE_CF, Bytes.toBytes("lastPrice" + i), price);
        }
      }
    }

    public int addPrice(byte[][] lastPricesBuff, byte[] price, int lastIndex) {
      if (price == null) {
        return lastIndex;
      }
      lastIndex++;
      if (lastIndex > lastPricesBuff.length - 1) {
        lastIndex = 0;
      }
      lastPricesBuff[lastIndex] = price;
      return lastIndex;
    }

    private byte[] getPrice(Result result, String priceQualifier) {
      return result.getValue(SALE_CF, Bytes.toBytes(priceQualifier));
    }
  }
...
Something went wrong with that request. Please try again.