diff --git a/minio/select/errors.py b/minio/select/errors.py index fee9f195..b25867c5 100644 --- a/minio/select/errors.py +++ b/minio/select/errors.py @@ -25,11 +25,13 @@ """ + class SelectMessageError(Exception): ''' Raised in case of message type 'error' ''' + class SelectCRCValidationError(Exception): ''' Raised in case of CRC mismatch diff --git a/minio/select/helpers.py b/minio/select/helpers.py index c3b6b759..34d9f751 100644 --- a/minio/select/helpers.py +++ b/minio/select/helpers.py @@ -32,27 +32,26 @@ EVENT_RECORDS = 'Records' # Event Type is Records EVENT_PROGRESS = 'Progress' # Event Type Progress EVENT_STATS = 'Stats' # Event Type Stats -EVENT_CONT = 'Cont' # Event Type continue +EVENT_CONT = 'Cont' # Event Type continue EVENT_END = 'End' # Event Type is End -EVENT_CONTENT_TYPE = "text/xml" # Event content xml type +EVENT_CONTENT_TYPE = "text/xml" # Event content xml type EVENT = 'event' # Message Type is event ERROR = 'error' # Message Type is error + def calculate_crc(value): ''' Returns the CRC using crc32 ''' return crc32(value) & 0xffffffff + def validate_crc(current_value, expected_value): ''' Validate through CRC check ''' - crc_current = calculate_crc(current_value) - crc_expected = byte_int(expected_value) - if crc_current == crc_expected: - return True - return False + return calculate_crc(current_value) == byte_int(expected_value) + def byte_int(data_bytes): ''' diff --git a/minio/select/options.py b/minio/select/options.py index c85cda73..a12d6e3f 100644 --- a/minio/select/options.py +++ b/minio/select/options.py @@ -27,10 +27,12 @@ from .helpers import (SQL) + class CSVInput: """ CSVInput: Input Format as CSV. """ + def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n", FieldDelimiter=",", QuoteCharacter='"', QuoteEscapeCharacter='"', Comments="#", @@ -43,10 +45,12 @@ def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n", self.Comments = Comments self.AllowQuotedRecordDelimiter = AllowQuotedRecordDelimiter + class JSONInput: """ JSONInput: Input format as JSON. """ + def __init__(self, Type=None): self.Type = Type @@ -61,6 +65,7 @@ class InputSerialization: """ InputSerialization: nput Format. """ + def __init__(self, compression_type="NONE", csv=None, json=None, par=None): self.compression_type = compression_type self.csv_input = csv @@ -73,6 +78,7 @@ class CSVOutput: CSVOutput: Output as CSV. """ + def __init__(self, QuoteFields="ASNEEDED", RecordDelimiter="\n", FieldDelimiter=",", QuoteCharacter='"', QuoteEscapeCharacter='"'): @@ -87,6 +93,7 @@ class JsonOutput: """ JsonOutput- Output as JSON. """ + def __init__(self, RecordDelimiter="\n"): self.RecordDelimiter = RecordDelimiter @@ -95,6 +102,7 @@ class OutputSerialization: """ OutputSerialization: Output Format. """ + def __init__(self, csv=None, json=None): self.csv_output = csv self.json_output = json @@ -104,6 +112,7 @@ class RequestProgress: """ RequestProgress: Sends progress message. """ + def __init__(self, enabled=False): self.enabled = enabled diff --git a/minio/select/reader.py b/minio/select/reader.py index a1d82921..37ece1f9 100644 --- a/minio/select/reader.py +++ b/minio/select/reader.py @@ -30,18 +30,15 @@ import io import sys -from binascii import crc32 from xml.etree import ElementTree -from xml.etree.ElementTree import ParseError -from .helpers import (EVENT_RECORDS, EVENT_PROGRESS, - EVENT_STATS, EVENT_CONT, - EVENT, EVENT_CONTENT_TYPE, - EVENT_END, ERROR) +from .helpers import (EVENT_RECORDS, EVENT_STATS, + EVENT, EVENT_CONTENT_TYPE, ERROR) from .helpers import (validate_crc, calculate_crc, byte_int) from .errors import (SelectMessageError, SelectCRCValidationError) + def _extract_header(header_bytes): """ populates the header map after reading the header in bytes @@ -51,26 +48,28 @@ def _extract_header(header_bytes): # While loop ends when all the headers present are read # header contains multipe headers while header_byte_parsed < len(header_bytes): - header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1]) + header_name_byte_length = byte_int( + header_bytes[header_byte_parsed:header_byte_parsed+1]) header_byte_parsed += 1 - header_name = \ - header_bytes[header_byte_parsed: - header_byte_parsed+header_name_byte_length] + header_name = header_bytes[ + header_byte_parsed:header_byte_parsed+header_name_byte_length + ] header_byte_parsed += header_name_byte_length # Header Value Type is of 1 bytes and is skipped header_byte_parsed += 1 - value_string_byte_length = \ - byte_int(header_bytes[header_byte_parsed: - header_byte_parsed+2]) + value_string_byte_length = byte_int( + header_bytes[header_byte_parsed:header_byte_parsed+2] + ) header_byte_parsed += 2 - header_value = \ - header_bytes[header_byte_parsed: - header_byte_parsed+value_string_byte_length] + header_value = header_bytes[ + header_byte_parsed:header_byte_parsed+value_string_byte_length + ] header_byte_parsed += value_string_byte_length - header_map[header_name.decode("utf-8").lstrip(":")] = \ - header_value.decode("utf-8").lstrip(":") + header_map[header_name.decode( + "utf-8").lstrip(":")] = header_value.decode("utf-8").lstrip(":") return header_map + def _parse_stats(stats): """ Parses stats XML and populates the stat dict. @@ -86,12 +85,14 @@ def _parse_stats(stats): return stat + class SelectObjectReader(object): """ SelectObjectReader returns a Reader that upon read returns queried data, but stops when the response ends. LimitedRandomReader is compatible with BufferedIOBase. """ + def __init__(self, response): self.response = response self.remaining_bytes = bytes() @@ -121,12 +122,12 @@ def __extract_message(self): crc_bytes = io.BytesIO() total_bytes_len = self.response.read(4) - if len(total_bytes_len) == 0: + if not total_bytes_len: return {} total_length = byte_int(total_bytes_len) header_bytes_len = self.response.read(4) - if len(header_bytes_len) == 0: + if not header_bytes_len: return {} header_len = byte_int(header_bytes_len) @@ -145,9 +146,9 @@ def __extract_message(self): crc_bytes.write(prelude_bytes_crc) header_bytes = self.response.read(header_len) - if len(header_bytes) == 0: + if not header_bytes: raise SelectMessageError( - "Premature truncation of select message header"+ + "Premature truncation of select message header" + ", server is sending corrupt message?") crc_bytes.write(header_bytes) @@ -156,39 +157,36 @@ def __extract_message(self): payload_length = total_length - header_len - int(16) payload_bytes = b'' event_type = header_map["event-type"] + if header_map["message-type"] == ERROR: raise SelectMessageError( - header_map["error-code"] + ":\"" + \ + header_map["error-code"] + ":\"" + header_map["error-message"] + "\"") - elif header_map["message-type"] == EVENT: - if event_type == EVENT_END: - pass - elif event_type == EVENT_CONT: - pass - elif event_type == EVENT_STATS: - content_type = header_map["content-type"] - if content_type != EVENT_CONTENT_TYPE: - raise SelectMessageError( - "Unrecognized content-type {0}".format(content_type)) - else: - payload_bytes = self.response.read(payload_length) - self.stat = _parse_stats(payload_bytes) - - elif event_type == EVENT_RECORDS: - payload_bytes = self.response.read(payload_length) - else: + + if header_map["message-type"] != EVENT: raise SelectMessageError( - "Unrecognized message-type {0}".format(header_map["message-type"]) + "Unrecognized message-type {0}".format( + header_map["message-type"]) ) + if event_type == EVENT_STATS: + content_type = header_map["content-type"] + if content_type != EVENT_CONTENT_TYPE: + raise SelectMessageError( + "Unrecognized content-type {0}".format(content_type)) + + payload_bytes = self.response.read(payload_length) + self.stat = _parse_stats(payload_bytes) + elif event_type == EVENT_RECORDS: + payload_bytes = self.response.read(payload_length) + crc_bytes.write(payload_bytes) message_crc = self.response.read(4) - if len(message_crc) == 0: + if not message_crc: return {} - if not validate_crc(crc_bytes.getvalue(), - message_crc): + if not validate_crc(crc_bytes.getvalue(), message_crc): raise SelectCRCValidationError( {"Checksum Mismatch, MessageCRC of " + str(calculate_crc(crc_bytes.getvalue())) + @@ -207,14 +205,13 @@ def stream(self, num_bytes=32*1024): caller should call self.close() to close the stream. """ while not self.response.isclosed(): - if len(self.remaining_bytes) == 0: + if not self.remaining_bytes: message = self.__extract_message() - if EVENT_RECORDS in message: - self.remaining_bytes = message.get(EVENT_RECORDS, b'') - else: - # For all other events continue + if EVENT_RECORDS not in message: continue + self.remaining_bytes = message.get(EVENT_RECORDS, b'') + result = self.remaining_bytes if num_bytes < len(self.remaining_bytes): result = self.remaining_bytes[:num_bytes]