Skip to content

Commit

Permalink
AVRO-2577: Fix Bare Excepts (apache#665)
Browse files Browse the repository at this point in the history
* AVRO-2577: Fix Bare Excepts

* AVRO-2577: Don't Count Failure as Success
  • Loading branch information
kojiromike authored and RyanSkraba committed Jan 27, 2020
1 parent 43267aa commit 70cdb4e
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 93 deletions.
2 changes: 1 addition & 1 deletion lang/py/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ known_third_party=zope

[pycodestyle]
exclude = .eggs,build
ignore = E101,E111,E114,E121,E122,E124,E125,E126,E127,E128,E129,E201,E202,E203,E222,E226,E225,E231,E241,E251,E261,E262,E265,E266,E301,E302,E303,E305,E306,E402,E501,E701,E703,E704,E711,E722,W191,W291,W292,W293,W391,W503,W504,W601
ignore = E101,E111,E114,E121,E122,E124,E125,E126,E127,E128,E129,E201,E202,E203,E222,E226,E225,E231,E241,E251,E261,E262,E265,E266,E301,E302,E303,E305,E306,E402,E501,E701,E703,E704,E711,W191,W291,W292,W293,W391,W503,W504,W601
max-line-length = 150
statistics = True
53 changes: 40 additions & 13 deletions lang/py/src/avro/datafile.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#!/usr/bin/env python

##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -13,9 +16,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Read/Write Avro File Object Containers.
"""

"""Read/Write Avro File Object Containers."""

from __future__ import absolute_import, division, print_function

import os
import random
import zlib

from avro import io, schema
Expand All @@ -29,6 +36,11 @@
has_snappy = True
except ImportError:
has_snappy = False
try:
import zstandard as zstd
has_zstandard = True
except ImportError:
has_zstandard = False
#
# Constants
#
Expand All @@ -48,6 +60,8 @@
VALID_CODECS = ['null', 'deflate']
if has_snappy:
VALID_CODECS.append('snappy')
if has_zstandard:
VALID_CODECS.append('zstandard')
VALID_ENCODINGS = ['binary'] # not used yet

CODEC_KEY = "avro.codec"
Expand Down Expand Up @@ -99,7 +113,7 @@ def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
else:
# open writer for reading to collect metadata
dfr = DataFileReader(writer, io.DatumReader())

# TODO(hammer): collect arbitrary metadata
# collect metadata
self._sync_marker = dfr.sync_marker
Expand Down Expand Up @@ -171,6 +185,9 @@ def _write_block(self):
elif self.get_meta(CODEC_KEY) == 'snappy':
compressed_data = snappy.compress(uncompressed_data)
compressed_data_length = len(compressed_data) + 4 # crc32
elif self.get_meta(CODEC_KEY) == 'zstandard':
compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
compressed_data_length = len(compressed_data)
else:
fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
raise DataFileException(fail_msg)
Expand All @@ -180,7 +197,7 @@ def _write_block(self):

# Write block
self.writer.write(compressed_data)

# Write CRC32 checksum for Snappy
if self.get_meta(CODEC_KEY) == 'snappy':
self.encoder.write_crc32(uncompressed_data)
Expand All @@ -189,7 +206,7 @@ def _write_block(self):
self.writer.write(self.sync_marker)

# reset buffer
self.buffer_writer.truncate(0)
self.buffer_writer.truncate(0)
self.block_count = 0

def append(self, datum):
Expand Down Expand Up @@ -229,7 +246,7 @@ def __init__(self, reader, datum_reader):
self._raw_decoder = io.BinaryDecoder(reader)
self._datum_decoder = None # Maybe reset at every block.
self._datum_reader = datum_reader

# read the header: magic, meta, sync
self._read_header()

Expand Down Expand Up @@ -293,7 +310,7 @@ def is_EOF(self):

def _read_header(self):
# seek to the beginning of the file to get magic block
self.reader.seek(0, 0)
self.reader.seek(0, 0)

# read header into a dict
header = self.datum_reader.read_data(
Expand Down Expand Up @@ -332,6 +349,18 @@ def _read_block_header(self):
uncompressed = snappy.decompress(data)
self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
self.raw_decoder.check_crc32(uncompressed);
elif self.codec == 'zstandard':
length = self.raw_decoder.read_long()
data = self.raw_decoder.read(length)
uncompressed = bytearray()
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(StringIO(data)) as reader:
while True:
chunk = reader.read(16384)
if not chunk:
break
uncompressed.extend(chunk)
self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
else:
raise DataFileException("Unknown codec: %r" % self.codec)

Expand Down Expand Up @@ -360,7 +389,7 @@ def next(self):
else:
self._read_block_header()

datum = self.datum_reader.read(self.datum_decoder)
datum = self.datum_reader.read(self.datum_decoder)
self.block_count -= 1
return datum

Expand All @@ -370,8 +399,6 @@ def close(self):

def generate_sixteen_random_bytes():
try:
import os
return os.urandom(16)
except:
import random
return [ chr(random.randrange(256)) for i in range(16) ]
except NotImplementedError:
return [chr(random.randrange(256)) for i in range(16)]
39 changes: 19 additions & 20 deletions lang/py/src/avro/protocol.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#!/usr/bin/env python

##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -14,18 +17,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Protocol implementation.
"""
"""Protocol implementation."""

try:
from hashlib import md5
except ImportError:
from md5 import md5
from __future__ import absolute_import, division, print_function

import hashlib
import json

from avro import schema
import avro.schema

#
# Constants
Expand All @@ -38,7 +37,7 @@
# Exceptions
#

class ProtocolParseException(schema.AvroException):
class ProtocolParseException(avro.schema.AvroException):
pass

#
Expand All @@ -50,7 +49,7 @@ class Protocol(object):
def _parse_types(self, types, type_names):
type_objects = []
for type in types:
type_object = schema.make_avsc_object(type, type_names)
type_object = avro.schema.make_avsc_object(type, type_names)
if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
raise ProtocolParseException(fail_msg)
Expand All @@ -60,7 +59,7 @@ def _parse_types(self, types, type_names):
def _parse_messages(self, messages, names):
message_objects = {}
for name, body in messages.iteritems():
if message_objects.has_key(name):
if name in message_objects:
fail_msg = 'Message name "%s" repeated.' % name
raise ProtocolParseException(fail_msg)
try:
Expand Down Expand Up @@ -93,21 +92,21 @@ def __init__(self, name, namespace=None, types=None, messages=None):

self._props = {}
self.set_prop('name', name)
type_names = schema.Names()
type_names = avro.schema.Names()
if namespace is not None:
self.set_prop('namespace', namespace)
type_names.default_namespace = namespace
if types is not None:
self.set_prop('types', self._parse_types(types, type_names))
if messages is not None:
self.set_prop('messages', self._parse_messages(messages, type_names))
self._md5 = md5(str(self)).digest()
self._md5 = hashlib.md5(str(self)).digest()

# read-only properties
name = property(lambda self: self.get_prop('name'))
namespace = property(lambda self: self.get_prop('namespace'))
fullname = property(lambda self:
schema.Name(self.name, self.namespace).fullname)
avro.schema.Name(self.name, self.namespace).fullname)
types = property(lambda self: self.get_prop('types'))
types_dict = property(lambda self: dict([(type.name, type)
for type in self.types]))
Expand All @@ -124,7 +123,7 @@ def set_prop(self, key, value):
def to_json(self):
to_dump = {}
to_dump['protocol'] = self.name
names = schema.Names(default_namespace=self.namespace)
names = avro.schema.Names(default_namespace=self.namespace)
if self.namespace:
to_dump['namespace'] = self.namespace
if self.types:
Expand All @@ -149,20 +148,20 @@ def _parse_request(self, request, names):
if not isinstance(request, list):
fail_msg = 'Request property not a list: %s' % request
raise ProtocolParseException(fail_msg)
return schema.RecordSchema(None, None, request, names, 'request')
return avro.schema.RecordSchema(None, None, request, names, 'request')

def _parse_response(self, response, names):
if isinstance(response, basestring) and names.has_name(response, None):
return names.get_name(response, None)
else:
return schema.make_avsc_object(response, names)
return avro.schema.make_avsc_object(response, names)

def _parse_errors(self, errors, names):
if not isinstance(errors, list):
fail_msg = 'Errors property not a list: %s' % errors
raise ProtocolParseException(fail_msg)
errors_for_parsing = {'type': 'error_union', 'declared_errors': errors}
return schema.make_avsc_object(errors_for_parsing, names)
return avro.schema.make_avsc_object(errors_for_parsing, names)

def __init__(self, name, request, response, errors=None, names=None):
self._name = name
Expand Down Expand Up @@ -191,7 +190,7 @@ def __str__(self):

def to_json(self, names=None):
if names is None:
names = schema.Names()
names = avro.schema.Names()
to_dump = {}
to_dump['request'] = self.request.to_json(names)
to_dump['response'] = self.response.to_json(names)
Expand All @@ -217,7 +216,7 @@ def parse(json_string):
"""Constructs the Protocol from the JSON text."""
try:
json_data = json.loads(json_string)
except:
except ValueError:
raise ProtocolParseException('Error parsing JSON: %s' % json_string)

# construct the Avro Protocol object
Expand Down
7 changes: 4 additions & 3 deletions lang/py/src/avro/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,9 +610,10 @@ def __init__(self, values, names=None, other_props=None):
else:
try:
values_schema = make_avsc_object(values, names)
except:
fail_msg = 'Values schema not a valid Avro schema.'
raise SchemaParseException(fail_msg)
except SchemaParseException:
raise
except Exception:
raise SchemaParseException('Values schema is not a valid Avro schema.')

self.set_prop('values', values_schema)

Expand Down
2 changes: 1 addition & 1 deletion lang/py/test/sample_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def make_requestor(server_host, server_port, protocol):

try:
num_messages = int(sys.argv[4])
except:
except IndexError:
num_messages = 1

# build the parameters for the request
Expand Down

0 comments on commit 70cdb4e

Please sign in to comment.