Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix formatting as per pep8 in minio.select #877

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions minio/select/errors.py
Expand Up @@ -25,11 +25,13 @@

"""


class SelectMessageError(Exception):
'''
Raised in case of message type 'error'
'''


class SelectCRCValidationError(Exception):
'''
Raised in case of CRC mismatch
Expand Down
13 changes: 6 additions & 7 deletions minio/select/helpers.py
Expand Up @@ -32,27 +32,26 @@
EVENT_RECORDS = 'Records' # Event Type is Records
EVENT_PROGRESS = 'Progress' # Event Type Progress
EVENT_STATS = 'Stats' # Event Type Stats
EVENT_CONT = 'Cont' # Event Type continue
EVENT_CONT = 'Cont' # Event Type continue
EVENT_END = 'End' # Event Type is End
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
EVENT_CONTENT_TYPE = "text/xml" # Event content xml type
EVENT = 'event' # Message Type is event
ERROR = 'error' # Message Type is error


def calculate_crc(value):
'''
Returns the CRC using crc32
'''
return crc32(value) & 0xffffffff


def validate_crc(current_value, expected_value):
'''
Validate through CRC check
'''
crc_current = calculate_crc(current_value)
crc_expected = byte_int(expected_value)
if crc_current == crc_expected:
return True
return False
return calculate_crc(current_value) == byte_int(expected_value)


def byte_int(data_bytes):
'''
Expand Down
9 changes: 9 additions & 0 deletions minio/select/options.py
Expand Up @@ -27,10 +27,12 @@

from .helpers import (SQL)


class CSVInput:
"""
CSVInput: Input Format as CSV.
"""

def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"', Comments="#",
Expand All @@ -43,10 +45,12 @@ def __init__(self, FileHeaderInfo=None, RecordDelimiter="\n",
self.Comments = Comments
self.AllowQuotedRecordDelimiter = AllowQuotedRecordDelimiter


class JSONInput:
"""
JSONInput: Input format as JSON.
"""

def __init__(self, Type=None):
self.Type = Type

Expand All @@ -61,6 +65,7 @@ class InputSerialization:
"""
InputSerialization: nput Format.
"""

def __init__(self, compression_type="NONE", csv=None, json=None, par=None):
self.compression_type = compression_type
self.csv_input = csv
Expand All @@ -73,6 +78,7 @@ class CSVOutput:
CSVOutput: Output as CSV.

"""

def __init__(self, QuoteFields="ASNEEDED", RecordDelimiter="\n",
FieldDelimiter=",", QuoteCharacter='"',
QuoteEscapeCharacter='"'):
Expand All @@ -87,6 +93,7 @@ class JsonOutput:
"""
JsonOutput- Output as JSON.
"""

def __init__(self, RecordDelimiter="\n"):
self.RecordDelimiter = RecordDelimiter

Expand All @@ -95,6 +102,7 @@ class OutputSerialization:
"""
OutputSerialization: Output Format.
"""

def __init__(self, csv=None, json=None):
self.csv_output = csv
self.json_output = json
Expand All @@ -104,6 +112,7 @@ class RequestProgress:
"""
RequestProgress: Sends progress message.
"""

def __init__(self, enabled=False):
self.enabled = enabled

Expand Down
95 changes: 46 additions & 49 deletions minio/select/reader.py
Expand Up @@ -30,18 +30,15 @@
import io
import sys

from binascii import crc32
from xml.etree import ElementTree
from xml.etree.ElementTree import ParseError

from .helpers import (EVENT_RECORDS, EVENT_PROGRESS,
EVENT_STATS, EVENT_CONT,
EVENT, EVENT_CONTENT_TYPE,
EVENT_END, ERROR)
from .helpers import (EVENT_RECORDS, EVENT_STATS,
EVENT, EVENT_CONTENT_TYPE, ERROR)

from .helpers import (validate_crc, calculate_crc, byte_int)
from .errors import (SelectMessageError, SelectCRCValidationError)


def _extract_header(header_bytes):
"""
populates the header map after reading the header in bytes
Expand All @@ -51,26 +48,28 @@ def _extract_header(header_bytes):
# While loop ends when all the headers present are read
# header contains multipe headers
while header_byte_parsed < len(header_bytes):
header_name_byte_length = byte_int(header_bytes[header_byte_parsed:header_byte_parsed+1])
header_name_byte_length = byte_int(
header_bytes[header_byte_parsed:header_byte_parsed+1])
header_byte_parsed += 1
header_name = \
header_bytes[header_byte_parsed:
header_byte_parsed+header_name_byte_length]
header_name = header_bytes[
header_byte_parsed:header_byte_parsed+header_name_byte_length
]
header_byte_parsed += header_name_byte_length
# Header Value Type is of 1 bytes and is skipped
header_byte_parsed += 1
value_string_byte_length = \
byte_int(header_bytes[header_byte_parsed:
header_byte_parsed+2])
value_string_byte_length = byte_int(
header_bytes[header_byte_parsed:header_byte_parsed+2]
)
header_byte_parsed += 2
header_value = \
header_bytes[header_byte_parsed:
header_byte_parsed+value_string_byte_length]
header_value = header_bytes[
header_byte_parsed:header_byte_parsed+value_string_byte_length
]
header_byte_parsed += value_string_byte_length
header_map[header_name.decode("utf-8").lstrip(":")] = \
header_value.decode("utf-8").lstrip(":")
header_map[header_name.decode(
"utf-8").lstrip(":")] = header_value.decode("utf-8").lstrip(":")
return header_map


def _parse_stats(stats):
"""
Parses stats XML and populates the stat dict.
Expand All @@ -86,12 +85,14 @@ def _parse_stats(stats):

return stat


class SelectObjectReader(object):
"""
SelectObjectReader returns a Reader that upon read
returns queried data, but stops when the response ends.
LimitedRandomReader is compatible with BufferedIOBase.
"""

def __init__(self, response):
self.response = response
self.remaining_bytes = bytes()
Expand Down Expand Up @@ -121,12 +122,12 @@ def __extract_message(self):

crc_bytes = io.BytesIO()
total_bytes_len = self.response.read(4)
if len(total_bytes_len) == 0:
if not total_bytes_len:
return {}

total_length = byte_int(total_bytes_len)
header_bytes_len = self.response.read(4)
if len(header_bytes_len) == 0:
if not header_bytes_len:
return {}

header_len = byte_int(header_bytes_len)
Expand All @@ -145,9 +146,9 @@ def __extract_message(self):
crc_bytes.write(prelude_bytes_crc)

header_bytes = self.response.read(header_len)
if len(header_bytes) == 0:
if not header_bytes:
raise SelectMessageError(
"Premature truncation of select message header"+
"Premature truncation of select message header" +
", server is sending corrupt message?")

crc_bytes.write(header_bytes)
Expand All @@ -156,39 +157,36 @@ def __extract_message(self):
payload_length = total_length - header_len - int(16)
payload_bytes = b''
event_type = header_map["event-type"]

if header_map["message-type"] == ERROR:
raise SelectMessageError(
header_map["error-code"] + ":\"" + \
header_map["error-code"] + ":\"" +
header_map["error-message"] + "\"")
elif header_map["message-type"] == EVENT:
if event_type == EVENT_END:
pass
elif event_type == EVENT_CONT:
pass
elif event_type == EVENT_STATS:
content_type = header_map["content-type"]
if content_type != EVENT_CONTENT_TYPE:
raise SelectMessageError(
"Unrecognized content-type {0}".format(content_type))
else:
payload_bytes = self.response.read(payload_length)
self.stat = _parse_stats(payload_bytes)

elif event_type == EVENT_RECORDS:
payload_bytes = self.response.read(payload_length)
else:

if header_map["message-type"] != EVENT:
raise SelectMessageError(
"Unrecognized message-type {0}".format(header_map["message-type"])
"Unrecognized message-type {0}".format(
header_map["message-type"])
)

if event_type == EVENT_STATS:
content_type = header_map["content-type"]
if content_type != EVENT_CONTENT_TYPE:
raise SelectMessageError(
"Unrecognized content-type {0}".format(content_type))

payload_bytes = self.response.read(payload_length)
self.stat = _parse_stats(payload_bytes)
elif event_type == EVENT_RECORDS:
payload_bytes = self.response.read(payload_length)

crc_bytes.write(payload_bytes)

message_crc = self.response.read(4)
if len(message_crc) == 0:
if not message_crc:
return {}

if not validate_crc(crc_bytes.getvalue(),
message_crc):
if not validate_crc(crc_bytes.getvalue(), message_crc):
raise SelectCRCValidationError(
{"Checksum Mismatch, MessageCRC of " +
str(calculate_crc(crc_bytes.getvalue())) +
Expand All @@ -207,14 +205,13 @@ def stream(self, num_bytes=32*1024):
caller should call self.close() to close the stream.
"""
while not self.response.isclosed():
if len(self.remaining_bytes) == 0:
if not self.remaining_bytes:
message = self.__extract_message()
if EVENT_RECORDS in message:
self.remaining_bytes = message.get(EVENT_RECORDS, b'')
else:
# For all other events continue
if EVENT_RECORDS not in message:
continue

self.remaining_bytes = message.get(EVENT_RECORDS, b'')

result = self.remaining_bytes
if num_bytes < len(self.remaining_bytes):
result = self.remaining_bytes[:num_bytes]
Expand Down