Skip to content
This repository has been archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #159 from guydav/master
Browse files Browse the repository at this point in the history
Added tail length to the client.tail method
  • Loading branch information
Wouter de Bie committed Jul 4, 2015
2 parents e1bdbc3 + a3eb463 commit 81bea8b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 33 deletions.
67 changes: 48 additions & 19 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version, effective_user)
self.use_trash = use_trash
self.trash = self._join_user_path(".Trash")
self._server_defaults = None

log.debug("Created client for %s:%s with trash=%s" % (host, port, use_trash))

Expand Down Expand Up @@ -855,32 +856,43 @@ def _handle_stat(self, path, node):
"block_replication": node.block_replication,
"blocksize": node.blocksize}

def tail(self, path, append=False):
def tail(self, path, tail_length=1024, append=False):
# Note: append is currently not implemented.
''' Show the last 1KB of the file.
''' Show the end of the file - default 1KB, supports up to the Hadoop block size.
:param path: Path to read
:type path: string
:param f: Shows appended data as the file grows.
:type f: boolean
:param tail_length: The length to read from the end of the file - default 1KB, up to block size.
:type tail_length: int
:param append: Currently not implemented
:type append: bool
:returns: a generator that yields strings
'''
#TODO: Make tail support multiple files at a time, like most other methods do

if not path:
raise InvalidInputException("tail: no path given")

processor = lambda path, node, tail_only=True, append=append: self._handle_tail(path, node, tail_only, append)
block_size = self.serverdefaults()['blockSize']
if tail_length > block_size:
raise InvalidInputException("tail: currently supports length up to the block size (%d)" % (block_size,))

if tail_length <= 0:
raise InvalidInputException("tail: tail_length cannot be less than or equal to zero")

processor = lambda path, node: self._handle_tail(path, node, tail_length, append)
for item in self._find_items([path], processor, include_toplevel=True,
include_children=False, recurse=False):
if item:
yield item

def _handle_tail(self, path, node, tail_only, append):
def _handle_tail(self, path, node, tail_length, append):
data = ''
for load in self._read_file(path, node, tail_only=True, check_crc=False):
for load in self._read_file(path, node, tail_only=True, check_crc=False, tail_length=tail_length):
data += load
# We read only the necessary packets but still
# need to cut off at the packet level.
return data[max(0, len(data)-1024):len(data)]
return data[max(0, len(data)-tail_length):len(data)]

def test(self, path, exists=False, directory=False, zero_length=False):
'''Test if a path exist, is a directory or has zero length
Expand Down Expand Up @@ -988,23 +1000,37 @@ def mkdir(self, paths, create_parent=False, mode=0o755):
else:
yield {"path": path, "result": False, "error": "mkdir: `%s': File exists" % path}

def serverdefaults(self):
'''Get server defaults
def serverdefaults(self, force_reload=False):
'''Get server defaults, caching the results. If there are no results saved, or the force_reload flag is True,
it will query the HDFS server for its default parameter values. Otherwise, it will simply return the results
it has already queried.
Note: This function returns a copy of the results loaded from the server, so you can manipulate or change
them as you'd like. If for any reason you need to change the results the client saves, you must access
the property client._server_defaults directly.
:returns: dictionary
:param force_reload: Should the server defaults be reloaded even if they already exist?
:type force_reload: bool
:returns: dictionary with the following keys: blockSize, bytesPerChecksum, writePacketSize, replication,
fileBufferSize, encryptDataTransfer, trashInterval, checksumType
**Example:**
>>> client.serverdefaults()
[{'writePacketSize': 65536, 'fileBufferSize': 4096, 'replication': 1, 'bytesPerChecksum': 512, 'trashInterval': 0L, 'blockSize': 134217728L, 'encryptDataTransfer': False, 'checksumType': 2}]
'''
request = client_proto.GetServerDefaultsRequestProto()
response = self.service.getServerDefaults(request).serverDefaults
return {'blockSize': response.blockSize, 'bytesPerChecksum': response.bytesPerChecksum,

if not self._server_defaults or force_reload:
request = client_proto.GetServerDefaultsRequestProto()
response = self.service.getServerDefaults(request).serverDefaults
self._server_defaults = {'blockSize': response.blockSize, 'bytesPerChecksum': response.bytesPerChecksum,
'writePacketSize': response.writePacketSize, 'replication': response.replication,
'fileBufferSize': response.fileBufferSize, 'encryptDataTransfer': response.encryptDataTransfer,
'trashInterval': response.trashInterval, 'checksumType': response.checksumType}

# return a copy, so if the user changes any values, they won't be saved in the client
return self._server_defaults.copy()

def _is_directory(self, should_check, node):
if not should_check:
return True
Expand Down Expand Up @@ -1047,24 +1073,26 @@ def _create_file(self, path, replication, blocksize, overwrite):

return self.service.complete(request)

def _read_file(self, path, node, tail_only, check_crc):
def _read_file(self, path, node, tail_only, check_crc, tail_length=1024):
length = node.length

request = client_proto.GetBlockLocationsRequestProto()
request.src = path
request.length = length

if tail_only: # Only read last KB
request.offset = max(0, length - 1024)
if tail_only: # Only read last part, default is 1KB
request.offset = max(0, length - tail_length)
else:
request.offset = long(0)

response = self.service.getBlockLocations(request)

if response.locations.fileLength == 0: # Can't read empty file
yield ""
lastblock = response.locations.lastBlock

if tail_only:
# we assume that tail_length <= default block size due to check in Client.tail
if lastblock.b.blockId == response.locations.blocks[0].b.blockId:
num_blocks_tail = 1 # Tail is on last block
else:
Expand All @@ -1077,11 +1105,12 @@ def _read_file(self, path, node, tail_only, check_crc):
pool_id = block.b.poolId
offset_in_block = 0
block_token = block.blockToken

if tail_only:
if num_blocks_tail == 2 and block.b.blockId != lastblock.b.blockId:
offset_in_block = block.b.numBytes - (1024 - lastblock.b.numBytes)
offset_in_block = block.b.numBytes - (tail_length - lastblock.b.numBytes)
elif num_blocks_tail == 1:
offset_in_block = max(0, lastblock.b.numBytes - 1024)
offset_in_block = max(0, lastblock.b.numBytes - tail_length)

# Prioritize locations to read from
locations_queue = Queue.PriorityQueue() # Primitive queuing based on a node's past failure
Expand Down
50 changes: 36 additions & 14 deletions test/tail_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,58 @@

from minicluster_testbase import MiniClusterTestBase
import os
import random


class TailTest(MiniClusterTestBase):

# Test cases

def test_tail_on_one_block(self):
expected_output = self.cluster.tail('/test1')
client_output = list(self.client.tail('/test1'))[0]
self.assertEqual(expected_output, client_output)
self._compare_files('/test1')

def test_tail_on_file_smaller_than_1KB(self):
p = self.cluster.put_subprocess('-', '/temp_test')
path = '/temp_test'
p = self.cluster.put_subprocess('-', path)
print >> p.stdin, "just a couple of bytes"
p.communicate()

expected_output = self.cluster.tail('/temp_test')
client_output = list(self.client.tail('/temp_test'))[0]
self.assertEqual(expected_output, client_output)
self._compare_files(path)

def test_tail_over_two_blocks(self): # Last KB of file spans 2 blocks.
testfiles_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testfiles")
f = open('%s/test3' % testfiles_path)
path = '/temp_test2'
self._generate_file_over_two_blocks(path)
self._compare_files(path)

def test_with_tail_length(self):
self._compare_files('/test1', True)

def test_with_tail_length_over_two_blocks(self): # Last KB of file spans 2 blocks.
path = '/temp_test3'
self._generate_file_over_two_blocks(path)
self._compare_files(path, True, 40)

# Helper Methods:

p = self.cluster.put_subprocess('-', '/temp_test2')
def _compare_files(self, path, random_tail = False, minimal_tail_length = 1):
output = self.cluster.tail(path)
tail_length = 1024 # The default tail length

if random_tail:
tail_length = random.randint(minimal_tail_length, len(output))

expected_output = output[-1 * tail_length:]
client_output = list(self.client.tail(path, tail_length))[0]
self.assertEqual(expected_output, client_output)

def _generate_file_over_two_blocks(self, path):
f = open(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testfiles', 'test3'))

p = self.cluster.put_subprocess('-', path)
for _ in xrange(131072): # 1024 * 131072 = 134,217,728 (default block size)
f.seek(0)
for line in f.readlines():
print >> p.stdin, line
print >> p.stdin, "some extra bytes to exceed one blocksize" # +40
print >> p.stdin, 'some extra bytes to exceed one blocksize' # +40
p.communicate()

expected_output = self.cluster.tail('/temp_test2')
client_output = list(self.client.tail('/temp_test2'))[0]
self.assertEqual(expected_output, client_output)

0 comments on commit 81bea8b

Please sign in to comment.