Skip to content

Commit

Permalink
Merge bb6283d into adbd4ac
Browse files Browse the repository at this point in the history
  • Loading branch information
chrischamberlin committed Jul 29, 2015
2 parents adbd4ac + bb6283d commit e0b8e81
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
22 changes: 12 additions & 10 deletions kafka/partitioner/hashed.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import six

from .base import Partitioner


Expand Down Expand Up @@ -43,14 +45,16 @@ def murmur2(key):
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
Args:
key: if not a bytearray, converted via bytearray(str(key))
key: if not a bytes type, encoded using default encoding
Returns: MurmurHash2 of key bytearray
"""

# Convert key to a bytearray
if not isinstance(key, bytearray):
data = bytearray(str(key))
# Convert key to bytes or bytearray
if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
data = key
else:
data = bytearray(str(key).encode())

length = len(data)
seed = 0x9747b28c
Expand All @@ -61,7 +65,7 @@ def murmur2(key):

# Initialize the hash to a random value
h = seed ^ length
length4 = length / 4
length4 = length // 4

for i in range(length4):
i4 = i * 4
Expand All @@ -84,15 +88,13 @@ def murmur2(key):

# Handle the last few bytes of the input array
extra_bytes = length % 4
if extra_bytes == 3:
if extra_bytes >= 3:
h ^= (data[(length & ~3) + 2] & 0xff) << 16
h &= 0xffffffff

if extra_bytes == 2:
if extra_bytes >= 2:
h ^= (data[(length & ~3) + 1] & 0xff) << 8
h &= 0xffffffff

if extra_bytes == 1:
if extra_bytes >= 1:
h ^= (data[length & ~3] & 0xff)
h &= 0xffffffff
h *= m
Expand Down
23 changes: 23 additions & 0 deletions test/test_partitioner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import six
from . import unittest

from kafka.partitioner import (Murmur2Partitioner)

class TestMurmurPartitioner(unittest.TestCase):
def test_hash_bytes(self):
p = Murmur2Partitioner(range(1000))
self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))

def test_hash_encoding(self):
p = Murmur2Partitioner(range(1000))
self.assertEqual(p.partition('test'), p.partition(u'test'))

def test_murmur2_java_compatibility(self):
p = Murmur2Partitioner(range(1000))
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
self.assertEqual(681, p.partition(b''))
self.assertEqual(524, p.partition(b'a'))
self.assertEqual(434, p.partition(b'ab'))
self.assertEqual(107, p.partition(b'abc'))
self.assertEqual(566, p.partition(b'123456789'))
self.assertEqual(742, p.partition(b'\x00 '))

0 comments on commit e0b8e81

Please sign in to comment.