-
Notifications
You must be signed in to change notification settings - Fork 5
/
hdfs_fun.py
138 lines (120 loc) · 5.29 KB
/
hdfs_fun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import snakebite.client as client
import snakebite.protobuf.ClientNamenodeProtocol_pb2 as client_proto
from snakebite.config import HDFSConfig
from snakebite.namenode import Namenode
from snakebite.client import Client
from snakebite.channel import DataXceiverChannel
import Queue
class HDFSFun(object):
def __init__(self):
self.client = self.create_client()
@staticmethod
def fields(proto):
"""Helper function to list protocol buffer fields"""
return [x[0].name for x in proto.ListFields()]
@staticmethod
def create_client():
configs = HDFSConfig.get_external_config()
namenodes = []
config = configs[0]
namenode, port = config['namenode'], config['port']
return client.Client(namenode, port=port)
def print_blocks(self, blocks):
print "Pool:", set(block.b.poolId for block in blocks)
fmt = "%10s | %10s | %11s | %15s"
print fmt % ("Bytes", "Block ID", "# Locations", "Hostnames")
for block in blocks:
num_bytes, block_id = str(block.b.numBytes), str(block.b.blockId)
hostnames = [loc.id.hostName.split('.')[0] for loc in block.locs]
print fmt % (num_bytes, block_id, len(block.locs), ','.join(hostnames))
def find_blocks(self, path):
client = self.client
fileinfo = client._get_file_info(path)
node = fileinfo.fs
length = node.length
request = client_proto.GetBlockLocationsRequestProto()
request.src = path
request.length = length
request.offset = 0L
response = client.service.getBlockLocations(request)
return list(response.locations.blocks)
@staticmethod
def get_locations(blocks):
return set(loc.id.ipAddr for block in blocks for loc in block.locs)
@staticmethod
def read_block(block):
location = block.locs[0]
host = location.id.ipAddr
port = int(location.id.xferPort)
data_xciever = DataXceiverChannel(host, port)
if not data_xciever.connect():
return
offset_in_block = 0
check_crc = False
length = block.b.numBytes
block_gen = data_xciever.readBlock(length, block.b.poolId, block.b.blockId, block.b.generationStamp, offset_in_block, check_crc)
return block_gen
def find_out_things(self, path, tail_only=False, check_crc=False):
client = self.client
fileinfo = client._get_file_info(path)
node = fileinfo.fs
length = node.length
print "Length: ", length
request = client_proto.GetBlockLocationsRequestProto()
request.src = path
request.length = length
if tail_only: # Only read last KB
request.offset = max(0, length - 1024)
else:
request.offset = 0L
response = client.service.getBlockLocations(request)
lastblock = response.locations.lastBlock
#if tail_only:
# if lastblock.b.blockId == response.locations.blocks[0].b.blockId:
# num_blocks_tail = 1 # Tail is on last block
# else:
# num_blocks_tail = 2 # Tail is on two blocks
failed_nodes = []
total_bytes_read = 0
for block in response.locations.blocks:
length = block.b.numBytes
pool_id = block.b.poolId
print "Length: %s, pool_id: %s" % (length, pool_id)
offset_in_block = 0
if tail_only:
if num_blocks_tail == 2 and block.b.blockId != lastblock.b.blockId:
offset_in_block = block.b.numBytes - (1024 - lastblock.b.numBytes)
elif num_blocks_tail == 1:
offset_in_block = max(0, lastblock.b.numBytes - 1024)
# Prioritize locations to read from
locations_queue = Queue.PriorityQueue() # Primitive queuing based on a node's past failure
for location in block.locs:
if location.id.storageID in failed_nodes:
locations_queue.put((1, location)) # Priority num, data
else:
locations_queue.put((0, location))
# Read data
successful_read = False
while not locations_queue.empty():
location = locations_queue.get()[1]
host = location.id.ipAddr
port = int(location.id.xferPort)
data_xciever = DataXceiverChannel(host, port)
if data_xciever.connect():
try:
for load in data_xciever.readBlock(length, pool_id, block.b.blockId, block.b.generationStamp, offset_in_block, check_crc):
offset_in_block += len(load)
total_bytes_read += len(load)
successful_read = True
yield load
except Exception, e:
log.error(e)
if not location.id.storageID in failed_nodes:
failed_nodes.append(location.id.storageID)
successful_read = False
else:
raise Exception
if successful_read:
break
if successful_read is False:
raise Exception("Failure to read block %s" % block.b.blockId)