-
Notifications
You must be signed in to change notification settings - Fork 21
/
encode.py
executable file
·178 lines (144 loc) · 5.91 KB
/
encode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# -*- coding: utf-8 -*-
import collections
import json
import six
from . import geobuf_pb2
class Encoder:
geometry_types = {
'Point': 0,
'MultiPoint': 1,
'LineString': 2,
'MultiLineString': 3,
'Polygon': 4,
'MultiPolygon': 5,
'GeometryCollection': 6,
}
def __init__(self):
self.json = None
self.data = None
self.precision = None
self.dim = None
self.e = None
self.keys = collections.OrderedDict()
def encode(self, data_json, precision=6, dim=2):
obj = self.json = data_json
data = self.data = geobuf_pb2.Data()
data.dimensions = dim
self.precision = precision
self.dim = dim
self.e = pow(10, precision) # multiplier for converting coordinates into integers
data_type = obj['type']
if data_type == 'FeatureCollection':
self.encode_feature_collection(data.feature_collection, obj)
elif data_type == 'Feature':
self.encode_feature(data.feature, obj)
else:
self.encode_geometry(data.geometry, obj)
return data.SerializeToString()
def encode_feature_collection(self, feature_collection, feature_collection_json):
self.encode_custom_properties(feature_collection, feature_collection_json, ('type', 'features'))
for feature_json in feature_collection_json.get('features'):
self.encode_feature(feature_collection.features.add(), feature_json)
def encode_feature(self, feature, feature_json):
self.encode_id(feature, feature_json.get('id'))
self.encode_properties(feature, feature_json.get('properties'))
self.encode_custom_properties(feature, feature_json, ('type', 'id', 'properties', 'geometry'))
self.encode_geometry(feature.geometry, feature_json.get('geometry'))
def encode_geometry(self, geometry, geometry_json):
gt = geometry_json['type']
coords = geometry_json.get('coordinates')
geometry.type = self.geometry_types[gt]
self.encode_custom_properties(geometry, geometry_json,
('type', 'id', 'coordinates', 'arcs', 'geometries', 'properties'))
if gt == 'GeometryCollection':
for geom in geometry_json.get('geometries'):
self.encode_geometry(geometry.geometries.add(), geom)
elif gt == 'Point':
self.add_point(geometry.coords, coords)
elif gt == 'MultiPoint':
self.add_line(geometry.coords, coords)
elif gt == 'LineString':
self.add_line(geometry.coords, coords)
elif gt == 'MultiLineString':
self.add_multi_line(geometry, coords)
elif gt == 'Polygon':
self.add_multi_line(geometry, coords, is_closed=True)
elif gt == 'MultiPolygon':
self.add_multi_polygon(geometry, coords)
def encode_properties(self, obj, props_json):
if props_json:
for key, val in props_json.items():
self.encode_property(key, val, obj.properties, obj.values)
def encode_custom_properties(self, obj, obj_json, exclude):
for key, val in obj_json.items():
if not (key in exclude):
self.encode_property(key, val, obj.custom_properties, obj.values)
def encode_property(self, key, val, properties, values):
keys = self.keys
if not (key in keys):
keys[key] = True
self.data.keys.append(key)
key_index = len(self.data.keys) - 1
else:
key_index = list(keys.keys()).index(key)
value = values.add()
if isinstance(val, dict) or isinstance(val, list):
value.json_value = json.dumps(val, separators=(',', ':'))
elif isinstance(val, six.text_type):
value.string_value = val
elif isinstance(val, float):
if val.is_integer():
self.encode_int(int(val), value)
else:
value.double_value = val
elif isinstance(val, bool):
value.bool_value = val
elif isinstance(val, six.integer_types):
self.encode_int(val, value)
properties.append(key_index)
properties.append(len(values) - 1)
@staticmethod
def encode_int(val, value):
try:
if val >= 0:
value.pos_int_value = val
else:
value.neg_int_value = -val
except ValueError:
value.double_value = val
@staticmethod
def encode_id(obj, id):
if id is not None:
if isinstance(id, int):
obj.int_id = id
else:
obj.id = str(id)
def add_coord(self, coords, coord):
coords.append(int(round(coord * self.e)))
def add_point(self, coords, point):
for x in point:
self.add_coord(coords, x)
def add_line(self, coords, points, is_closed=False):
sum = [0] * self.dim
r = range(0, len(points) - int(is_closed))
for i in r:
for j in range(0, self.dim):
n = int(round(points[i][j] * self.e) - sum[j])
coords.append(n)
sum[j] += n
def add_multi_line(self, geometry, lines, is_closed=False):
if len(lines) != 1:
for points in lines:
geometry.lengths.append(len(points) - int(is_closed))
for points in lines:
self.add_line(geometry.coords, points, is_closed)
def add_multi_polygon(self, geometry, polygons):
if len(polygons) != 1 or len(polygons[0]) != 1:
geometry.lengths.append(len(polygons))
for rings in polygons:
geometry.lengths.append(len(rings))
for points in rings:
geometry.lengths.append(len(points) - 1)
for rings in polygons:
for points in rings:
self.add_line(geometry.coords, points, is_closed=True)