Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

hbase scalding Store based on maple/storehaus #404

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from

Conversation

joshby247
Copy link

No description provided.

.write(new HBaseVersionedSource[K2, V2](table, scheme))
}

/* overridden methods for ReadableStore[K, V2] */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to add something like, .toReadableStore: ReadableStore[K, V2] here rather than make this one item subclass two things.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me how to override the get/multiGet methods if I don't subclass/extend ReadableStore. Do you have an example of what you had in mind?

@johnynek
Copy link
Collaborator

Thanks for working on this!


val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName))

implicit val b2immutable: Injection[Array[Byte], ImmutableBytesWritable] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you maybe able to use this bijection instead of writing a new one

@joshby247
Copy link
Author

I've incorporated the feedback. I removed the error check for the empty pipe in readLast. It looks like ScaldingStore's merge should handle this correctly

}


class HBaseStore [K, V2] (quorum: Seq[String],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why this class is needed? Cant you just do this?

val store:Store[K,V2]=HBaseByteArrayStore(quorum, table, columnFamily, valColumnName, createTable)
.convert[K,V2](K=>Array[Byte])(V2=>Array[Byte])?

see this for an example of how to convert a store

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks awesome! I’ll do that.

@joshby247
Copy link
Author

in addition to using a converted HBaseByteArrayStore I also renamed main() in ScaldingRunning so the "getting started" wiki instructions weren't impacted by another main() in summingbird-example



def toReadableStore: ReadableStore[K,V2] = {
hbaseStore.asInstanceOf[ReadableStore[K,V2]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think HBaseStore is already extending ReadableStore, you dont have to do hbaseStore.asInstanceOf[ReadableStore[K,V2]]

@MansurAshraf
Copy link
Contributor

any update on this PR. Would love to get this merged in as I have to use HBase with SB too

table: String)(
implicit
batcher: Batcher,
injection: Injection[(K, (BatchID,V)), (Array[Byte], Array[Byte])],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need this? Doesn't it come for free with the keyInj + valueInj?

@johnynek
Copy link
Collaborator

Sorry for the slow response. @MansurAshraf I guess you have reviewed the HBase stuff. We don't use it much at Twitter, so I'm only giving a summingbird review.

I don't see how this code is tracking the state of which batches this store has completed. That information needs to be available at planning time, and this code is currently just always claiming to have data. That will not be correct.

That said, we do need to build some kind of framework to make it easier to test Stores.

@joshby247
Copy link
Author

Added stuff to write/read the last processed BatchID from ZK. I have to confess that I don't have much experience interacting with ZK directly (mostly use systems that use ZK) so I'm quite open to feedback on how to do that better. It does seem like the ideal way to do this would be to register a Watcher with the yet-to-be-created zookeeper WaitingState so the BatchID could be written once the Scalding job has completed successfully.

table: String)
extends Watcher
{
val LastBatchIDZKPath = "/summingbird/" + table + "/lastBatchID"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we want the users to pass the path?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't incorporate this feedback. I thought that the details of how/where the store put the state in ZK were internal details to the store and I preferred not to leak them out. If you feel strongly I can add that though.

@CLAassistant
Copy link

CLAassistant commented Jul 18, 2019

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants