-
-
Notifications
You must be signed in to change notification settings - Fork 115
/
reader.py
82 lines (66 loc) · 2.42 KB
/
reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import json
import mmap
from .tile import (
deserialize_header,
deserialize_directory,
zxy_to_tileid,
tileid_to_zxy,
find_tile,
Compression,
)
import gzip
def MmapSource(f):
mapping = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
def get_bytes(offset, length):
return mapping[offset : offset + length]
return get_bytes
def MemorySource(buf):
def get_bytes(offset, length):
return buf[offset : offset + length]
return get_bytes
class Reader:
def __init__(self, get_bytes):
self.get_bytes = get_bytes
def header(self):
return deserialize_header(self.get_bytes(0, 127))
def metadata(self):
header = self.header()
metadata = self.get_bytes(header["metadata_offset"], header["metadata_length"])
if header["internal_compression"] == Compression.GZIP:
metadata = gzip.decompress(metadata)
return json.loads(metadata)
def get(self, z, x, y):
tile_id = zxy_to_tileid(z, x, y)
header = self.header()
dir_offset = header["root_offset"]
dir_length = header["root_length"]
for depth in range(0, 4): # max depth
directory = deserialize_directory(self.get_bytes(dir_offset, dir_length))
result = find_tile(directory, tile_id)
if result:
if result.run_length == 0:
dir_offset = header["leaf_directory_offset"] + result.offset
dir_length = result.length
else:
return self.get_bytes(
header["tile_data_offset"] + result.offset, result.length
)
def traverse(get_bytes, header, dir_offset, dir_length):
entries = deserialize_directory(get_bytes(dir_offset, dir_length))
for entry in entries:
if entry.run_length > 0:
for i in range(entry.run_length):
yield tileid_to_zxy(entry.tile_id + i), get_bytes(
header["tile_data_offset"] + entry.offset, entry.length
)
else:
for t in traverse(
get_bytes,
header,
header["leaf_directory_offset"] + entry.offset,
entry.length,
):
yield t
def all_tiles(get_bytes):
header = deserialize_header(get_bytes(0, 127))
return traverse(get_bytes, header, header["root_offset"], header["root_length"])