Skip to content

Commit

Permalink
Use f-strings (svenkreiss#144)
Browse files Browse the repository at this point in the history
* Use f-strings where it would become more readable.

* pylint wasn't understanding this in Py3.9 (see issue pylint-dev/pylint#3882).

* Lazy argument evaluations for logging statements.
  • Loading branch information
svaningelgem committed Feb 22, 2021
1 parent 00fa0ae commit f0e8e8d
Show file tree
Hide file tree
Showing 53 changed files with 322 additions and 395 deletions.
2 changes: 1 addition & 1 deletion pysparkling/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def __str__(self):
return str(self._value)

def __repr__(self):
return "Accumulator<value={0}>".format(self._value)
return f"Accumulator<value={self._value}>"


class AccumulatorParam:
Expand Down
6 changes: 3 additions & 3 deletions pysparkling/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ def add(self, ident, obj, storageLevel=None):
'disk_location': None,
'checksum': None,
}
log.debug('Added {0} to cache.'.format(ident))
log.debug('Added %s to cache.', ident)

def get(self, ident):
if ident not in self.cache_obj:
log.debug('{0} not found in cache.'.format(ident))
log.debug('%s not found in cache.', ident)
return None

log.debug('Returning {0} from cache.'.format(ident))
log.debug('Returning %s from cache.', ident)
return self.cache_obj[ident]['mem_obj']

def has(self, ident):
Expand Down
33 changes: 15 additions & 18 deletions pysparkling/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,22 @@ def _run_task(task_context, rdd, func, partition):
"""
task_context.attempt_number += 1

log.debug('Running stage {} for partition {} of {} (id: {}).'
''.format(task_context.stage_id,
task_context.partition_id,
rdd.name(), rdd.id()))
log.debug(
'Running stage %s for partition %s of %s (id: %s).',
task_context.stage_id, task_context.partition_id, rdd.name(), rdd.id()
)

try:
return func(task_context, rdd.compute(partition, task_context))
except Exception as e: # pylint: disable=broad-except
log.warning('Attempt {} failed for partition {} of {} (id: {}): {}'
''.format(task_context.attempt_number, partition.index,
rdd.name(), rdd.id(), traceback.format_exc()))
log.warning(
'Attempt %s failed for partition %s of %s (id: %s): %s',
task_context.attempt_number, partition.index, rdd.name(), rdd.id(), traceback.format_exc()
)

if task_context.attempt_number == task_context.max_retries:
log.error('Partition {} of {} failed.'
''.format(partition.index, rdd.name()))
log.error('Partition %s of %s failed.', partition.index, rdd.name())

if not task_context.catch_exceptions:
raise e

Expand Down Expand Up @@ -170,7 +171,7 @@ def accumulator(self, value, accum_param=None):
elif isinstance(value, complex):
accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM
else:
raise TypeError("No default accumulator param for type {0}".format(type(value)))
raise TypeError(f"No default accumulator param for type {type(value)}")
return accumulators.Accumulator(value, accum_param)

def newRddId(self):
Expand Down Expand Up @@ -251,8 +252,7 @@ def pickleFile(self, name, minPartitions=None):
"""
resolved_names = File.resolve_filenames(name)
log.debug('pickleFile() resolved "{0}" to {1} files.'
''.format(name, len(resolved_names)))
log.debug('pickleFile() resolved "%s" to %s files.', name, len(resolved_names))

n_partitions = len(resolved_names)
if minPartitions and minPartitions > n_partitions:
Expand Down Expand Up @@ -403,8 +403,7 @@ def binaryFiles(self, path, minPartitions=None):
['bellobello']
"""
resolved_names = File.resolve_filenames(path)
log.debug('binaryFile() resolved "{0}" to {1} files.'
''.format(path, len(resolved_names)))
log.debug('binaryFile() resolved "%s" to %s files.', path, len(resolved_names))

n_partitions = len(resolved_names)
if minPartitions and minPartitions > n_partitions:
Expand Down Expand Up @@ -502,8 +501,7 @@ def textFile(self, filename, minPartitions=None, use_unicode=True):
:rtype: RDD
"""
resolved_names = TextFile.resolve_filenames(filename)
log.debug('textFile() resolved "{0}" to {1} files.'
''.format(filename, len(resolved_names)))
log.debug('textFile() resolved "%s" to %s files.', filename, len(resolved_names))

n_partitions = len(resolved_names)
if minPartitions and minPartitions > n_partitions:
Expand Down Expand Up @@ -550,8 +548,7 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
:rtype: RDD
"""
resolved_names = TextFile.resolve_filenames(path)
log.debug('wholeTextFiles() resolved "{0}" to {1} files.'
''.format(path, len(resolved_names)))
log.debug('wholeTextFiles() resolved "%s" to %s files.', path, len(resolved_names))

n_partitions = len(resolved_names)
if minPartitions and minPartitions > n_partitions:
Expand Down
2 changes: 1 addition & 1 deletion pysparkling/fileio/codec/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_codec(path):

for endings, codec_class in FILE_ENDINGS:
if any(path.endswith(e) for e in endings):
log.debug('Using {0} codec: {1}'.format(endings, path))
log.debug('Using %s codec: %s', endings, path)
return codec_class

return NoCodec
4 changes: 2 additions & 2 deletions pysparkling/fileio/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def resolve_filenames(all_expr):
for expr in all_expr.split(','):
expr = expr.strip()
files += fs.get_fs(expr).resolve_filenames(expr)
log.debug('Filenames: {0}'.format(files))
log.debug('Filenames: %s', files)
return files

@classmethod
Expand All @@ -53,7 +53,7 @@ def get_content(cls, all_expr):
for expr in all_expr:
expr = expr.strip()
files += fs.get_fs(expr).resolve_content(expr)
log.debug('Filenames: {0}'.format(files))
log.debug('Filenames: %s', files)
return files

def exists(self):
Expand Down
15 changes: 7 additions & 8 deletions pysparkling/fileio/fs/file_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,29 @@ def resolve_filenames(expr):
:rtype: list
"""
log.error('Cannot resolve: {0}'.format(expr))
log.error('Cannot resolve: %s', expr)

@staticmethod
def resolve_content(expr):
"""Return all the files matching expr or in a folder matching expr
:rtype: list
"""
log.error('Cannot resolve: {0}'.format(expr))
log.error('Cannot resolve: %s', expr)

def exists(self):
"""Check whether the given file_name exists.
:rtype: bool
"""
log.warning('Could not determine whether {0} exists due to '
'unhandled scheme.'.format(self.file_name))
log.warning('Could not determine whether %s exists due to unhandled scheme.', self.file_name)

def load(self):
"""Load a file to a stream.
:rtype: io.BytesIO
"""
log.error('Cannot load: {0}'.format(self.file_name))
log.error('Cannot load: %s', self.file_name)

def load_text(self, encoding='utf8', encoding_errors='ignore'):
"""Load a file to a stream.
Expand All @@ -50,19 +49,19 @@ def load_text(self, encoding='utf8', encoding_errors='ignore'):
:rtype: io.StringIO
"""
log.error('Cannot load: {0}'.format(self.file_name))
log.error('Cannot load: %s', self.file_name)

def dump(self, stream):
"""Dump a stream to a file.
:param io.BytesIO stream: Input tream.
"""
log.error('Cannot dump: {0}'.format(self.file_name))
log.error('Cannot dump: %s', self.file_name)

def make_public(self, recursive=False):
"""Make the file public (only on some file systems).
:param bool recursive: Recurse.
:rtype: FileSystem
"""
log.warning('Cannot make {0} public.'.format(self.file_name))
log.warning('Cannot make %s public.', self.file_name)
19 changes: 8 additions & 11 deletions pysparkling/fileio/fs/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ def resolve_filenames(expr):
expr = expr[expr_s:]
for k in bucket.list_blobs(prefix=prefix):
if fnmatch(k.name, expr) or fnmatch(k.name, expr + '/part*'):
files.append('{0}://{1}:{2}/{3}'.format(
scheme, project_name, bucket_name, k.name))
files.append(f'{scheme}://{project_name}:{bucket_name}/{k.name}')
return files

@staticmethod
Expand All @@ -97,9 +96,9 @@ def resolve_content(expr):

folder_path = folder_path[1:] # Remove leading slash

expr = "{0}{1}".format(folder_path, pattern)
expr = f"{folder_path}{pattern}"
# Match all files inside folders that match expr
pattern_expr = "{0}{1}*".format(expr, "" if expr.endswith("/") else "/")
pattern_expr = f"{expr}{'' if expr.endswith('/') else '/'}*"

bucket = GS._get_client(project_name).get_bucket(bucket_name)

Expand All @@ -109,7 +108,7 @@ def resolve_content(expr):
fnmatch(k.name, expr) or fnmatch(k.name, pattern_expr)
):
files.append(
'{0}://{1}/{2}'.format(scheme, raw_bucket_name, k.name)
f'{scheme}://{raw_bucket_name}/{k.name}'
)
return files

Expand All @@ -124,24 +123,22 @@ def exists(self):
blob_name = t.get_next()
bucket = GS._get_client(project_name).get_bucket(bucket_name)
return (bucket.get_blob(blob_name)
or list(bucket.list_blobs(prefix='{}/'.format(blob_name))))
or list(bucket.list_blobs(prefix=f'{blob_name}/')))

def load(self):
log.debug('Loading {0} with size {1}.'
''.format(self.blob.name, self.blob.size))
log.debug('Loading %s with size %s.', self.blob.name, self.blob.size)
return BytesIO(self.blob.download_as_string())

def load_text(self, encoding='utf8', encoding_errors='ignore'):
log.debug('Loading {0} with size {1}.'
''.format(self.blob.name, self.blob.size))
log.debug('Loading %s with size %s.', self.blob.name, self.blob.size)
return StringIO(
self.blob.download_as_string().decode(
encoding, encoding_errors
)
)

def dump(self, stream):
log.debug('Dumping to {0}.'.format(self.blob.name))
log.debug('Dumping to %s.', self.blob.name)
self.blob.upload_from_string(stream.read(),
content_type=self.mime_type)
return self
Expand Down
12 changes: 6 additions & 6 deletions pysparkling/fileio/fs/hdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def client_and_path(path):
'hdfs not supported. Install the python package "hdfs".'
)
Hdfs._conn[cache_id] = hdfs.InsecureClient( # pylint: disable=no-member
'http://{0}:{1}'.format(domain, port)
f'http://{domain}:{port}'
)
return Hdfs._conn[cache_id], folder_path + file_pattern

Expand All @@ -64,7 +64,7 @@ def resolve_filenames(expr):

files = []
for fn, file_status in c.list(folder_path, status=True):
file_local_path = '{0}{1}'.format(folder_path, fn)
file_local_path = f'{folder_path}{fn}'
file_path = format_file_uri(scheme, domain, file_local_path)
part_file_expr = expr + ("" if expr.endswith("/") else "/") + 'part*'

Expand Down Expand Up @@ -107,7 +107,7 @@ def _get_folder_files_by_expr(cls, c, scheme, domain, folder_path, expr=None):
"""
file_paths = []
for fn, file_status in c.list(folder_path, status=True):
file_local_path = '{0}{1}'.format(folder_path, fn)
file_local_path = f'{folder_path}{fn}'
if expr is None or fnmatch(file_local_path, expr):
if file_status["type"] == "DIRECTORY":
file_paths += cls._get_folder_files_by_expr(
Expand Down Expand Up @@ -137,7 +137,7 @@ def resolve_content(cls, expr):
return cls._get_folder_files_by_expr(c, scheme, domain, folder_path, expr)

def load(self):
log.debug('Hdfs read for {0}.'.format(self.file_name))
log.debug('Hdfs read for %s.', self.file_name)
c, path = Hdfs.client_and_path(self.file_name)

with c.read(path) as reader:
Expand All @@ -146,7 +146,7 @@ def load(self):
return r

def load_text(self, encoding='utf8', encoding_errors='ignore'):
log.debug('Hdfs text read for {0}.'.format(self.file_name))
log.debug('Hdfs text read for %s.', self.file_name)
c, path = Hdfs.client_and_path(self.file_name)

with c.read(path) as reader:
Expand All @@ -155,7 +155,7 @@ def load_text(self, encoding='utf8', encoding_errors='ignore'):
return r

def dump(self, stream):
log.debug('Dump to {0} with hdfs write.'.format(self.file_name))
log.debug('Dump to %s with hdfs write.', self.file_name)
c, path = Hdfs.client_and_path(self.file_name)
c.write(path, stream)
return self
6 changes: 3 additions & 3 deletions pysparkling/fileio/fs/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ def exists(self):
return r.status_code == 200

def load(self):
log.debug('Http GET request for {0}.'.format(self.file_name))
log.debug('Http GET request for %s.', self.file_name)
r = requests.get(self.file_name, headers=self.headers)
if r.status_code != 200:
raise ConnectionException()
return BytesIO(r.content)

def load_text(self, encoding='utf8', encoding_errors='ignore'):
# warning: encoding and encoding_errors are ignored
log.debug('Http GET request for {0}.'.format(self.file_name))
log.debug('Http GET request for %s.', self.file_name)
r = requests.get(self.file_name, headers=self.headers)
if r.status_code != 200:
raise ConnectionException()
return StringIO(r.text)

def dump(self, stream):
log.debug('Dump to {0} with http PUT.'.format(self.file_name))
log.debug('Dump to %s with http PUT.', self.file_name)
requests.put(self.file_name, data=b''.join(stream))
return self
4 changes: 2 additions & 2 deletions pysparkling/fileio/fs/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ def dump(self, stream):
# making sure directory exists
dirname = os.path.dirname(file_path)
if dirname and not os.path.exists(dirname):
log.debug('creating local directory {0}'.format(dirname))
log.debug('creating local directory %s', dirname)
os.makedirs(dirname)

log.debug('writing file {0}'.format(file_path))
log.debug('writing file %s', file_path)
with io.open(file_path, 'wb') as f:
for c in stream:
f.write(c)
Expand Down

0 comments on commit f0e8e8d

Please sign in to comment.