-
Notifications
You must be signed in to change notification settings - Fork 1
/
fmeutil.py
904 lines (714 loc) · 32 KB
/
fmeutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
"""Utilities sharable between internal plugin python modules."""
#--- Import FME and sys requirements
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import re
import sys
import locale
import traceback
from collections import OrderedDict
from datetime import datetime, tzinfo, timedelta
import fme
from fmeobjects import FMEFeature, FMESession
import six
from six import string_types, iteritems, text_type, binary_type, PY2, PY3
import fmeobjects
from fmeobjects import FME_INFORM
from fmegeneral import fmeconstants
class FMELocale(object):
"""Singleton for handling encoding on Mac.
This class serves no purpose for end users. Use
:func:`getSystemLocale` instead.
"""
# See PR#53541 and PR#52908.
def __init__(self):
pass
#: Name of the detected system locale.
#:
#: :type: str
detectedSystemLocale = None
def fmeBoolToBool(boolean):
"""
Convert an FME boolean to an actual boolean,
where FME boolean true = 1.
:type boolean: int or str
:param boolean: value to convert
:rtype: bool
"""
return int(boolean) == 1
def choiceToBool(boolean):
"""Convert from string yes/no to boolean.
:param str boolean: yes|no. Case-insensitive.
:rtype: bool
"""
return str(boolean).lower() == 'yes'
def boolToChoice(boolean):
"""Convert from boolean to string Yes/No.
:param bool boolean: value to convert
:return: Yes|No
:rtype: str
"""
return 'Yes' if boolean else 'No'
def stringArrayToDict(stringArray):
"""Given a list, convert odd indicies to keys, and even indicies to values.
Useful for converting IFMEStringArray-equivalents from the FMEObjects
Python API into something easier to manipulate.
:param list stringArray: Must have an even length
:rtype: dict
"""
result = {}
for index in range(0, len(stringArray), 2):
result[stringArray[index]] = stringArray[index + 1]
return result
def stringToBool(string):
"""Converts a string to boolean, and returns None if conversion fails.
:param str string: Value to convert.
:rtype: bool
"""
# This method is modeled after STF_stringToBoolean from stfutil2.cpp.
if len(string) == 0:
return None
first_char = string[0].lower()
# Check if first character contains "t" or "y".
if first_char in ["t", "y"]:
return True
# Check if first character contains "f" or "n".
if first_char in ["f", "n"]:
return False
# Attempt to convert string to float.
else:
try:
# Returns True if string casts to float as a non-zero number. False if not equal to zero.
return bool(float(string))
except (TypeError, ValueError):
# Conversion failed.
return None
def exceptionToDebugStr():
"""Get the last raised exception trace, type, and message as one line.
:return: The last raised exception as a one line combined exception trace, type and message.
:rtype: str
"""
return repr(
traceback.format_exception(sys.exc_info()[0],
sys.exc_info()[1],
sys.exc_info()[2]))
def exceptionToStr():
"""Get the last raised exception message.
:return: The last raised exception message.
:rtype: str
"""
return str(sys.exc_info()[1])
def getSystemLocale():
"""Get the system locale of the process. The return value is cached after the first time
this function is called.
:return: The system locale name.
:rtype: str
"""
if FMELocale.detectedSystemLocale is None:
# FME may change the locale of the process, so query FME to get the system locale truth.
FMELocale.detectedSystemLocale = fme.systemEncoding
return FMELocale.detectedSystemLocale
def unicodeToSystem(original):
"""Try to convert the given Unicode string to the system encoding.
Characters that could not be converted are replaced with '?'. If input is
already a non-Unicode string, return unchanged. In Python 3, unicode
strings are returned.
:type original: `unicode <https://docs.python.org/2.7/library/functions.html#unicode>`_
:param original: Unicode string to convert.
:return: The converted string.
:rtype: six.binary_type or type(original)
"""
# See PRs #52906-52909.
if (PY2 and isinstance(original, binary_type)) or \
(PY3 and isinstance(original, text_type)):
# If input is already a non-Unicode string, return it as-is.
# In Py3, Unicode strings are returned.
return original
return original.encode(getSystemLocale(), 'replace')
def castToUnicode(string):
"""Method that will catch any edge cases, like floats in attribute
contents, when encoding in unicode.
:param str string: Content that is being encoded.
:return: Unicode content.
:rtype: six.text_type
"""
if isinstance(string, string_types):
return systemToUnicode(string)
else:
return text_type(string)
def systemToUnicode(original):
"""Try to convert a system-encoded string to a Unicode string. Characters
that could not be converted are replaced with '?'.
:type original: str
:param original: System encoded string to convert.
:return: The converted string.
:rtype: six.text_type
"""
# See PRs #52906-52909.
if isinstance(original, text_type):
# If input is already a Unicode string, return it as-is.
return original
return original.decode(getSystemLocale(), 'replace')
#-----------------------------------------------------------------------------
# Attempts to convert the given system value to utf-8 encoding
# All characters that failed to encode will be replaced by ?
# Initially added for PRs #53185
#-----------------------------------------------------------------------------
def systemToUtf8(original):
"""Try to convert the given system-encoded string to a UTF-8 encoded
string. Characters that could not be converted are replaced with '?'.
:type original: six.binary_type
:param original: System encoded string to convert.
:return: UTF-8 encoded string - not a `unicode` string.
:rtype: six.binary_type
"""
unicodeVal = original.decode(getSystemLocale(), 'replace')
return unicodeVal.encode('utf8', 'replace')
def utf8ToSystem(original):
"""Try to convert the given UTF-8 string to system encoding. Characters
that could not be converted are replaced with '?'.
:param six.binary_type original: UTF-8 encoded string.
:return: System encoded string.
:rtype: six.binary_type
"""
# See PR#53185.
unicodeVal = original.decode('utf8', 'replace')
return unicodeVal.encode(getSystemLocale(), 'replace')
def decodeWWJDString(encoded):
"""Decode the input WWJD encoded string to a six.text_type. If encoded
is not a six.string_types it is returned unchanged.
:param six.string_types encoded: WWJD encoded string.
:return: Decoded WWJD string or input value unchanged.
:rtype: six.text_type or type(encoded)
"""
return FMESession().decodeFromFMEParsableText(encoded) \
if isinstance(encoded, six.string_types) else encoded
class Logger(object):
"""Helper class for logging functionality.
A wrapper around :class:`fmeobjects.FMELogFile`.
"""
def __init__(self, debug=False):
"""
:param bool debug: Whether this instance should emit debug messages.
"""
self.debug_ = debug
self.fmeLogfile_ = fmeobjects.FMELogFile()
def setDebugMode(self, debug=True):
"""Tells the logger whether to emit debug messages or not.
:param bool debug: Whether this instance should emit debug messages.
"""
self.debug_ = debug
def logMessageString(self, message, level=FME_INFORM, debug=False):
"""Write message string to the FME logfile.
:param str message: Message to write to the log.
:param int level: Message severity level.
:param bool debug: If True, then this message will only be logged if this
:class:`Logger` instance is in debug mode.
"""
# Output non-debug messages
if not debug:
self.fmeLogfile_.logMessageString(message, level)
# Output debug messages
elif self.debug_:
self.fmeLogfile_.logMessageString('DEBUG: %s' % message, level)
def logMessage(self, messageID, params=None, level=FME_INFORM,
debug=False):
"""Write message based on its message ID or string to the FME logfile.
:type messageID: str or int
:param messageID: Message ID or message string to write to the log.
:param list params: List of string substitution arguments for the given message.
:param int level: Message severity level.
:param bool debug: If True, then this message will only be logged if this
:class:`Logger` instance is in debug mode.
"""
if params is None:
params = []
for index, value in enumerate(params):
# All message parameters must be strings, or else the logger will not perform the substitution.
if not isinstance(value, string_types):
params[index] = str(value)
# Output non-debug messages
if not debug:
if params is None:
self.fmeLogfile_.logMessageString(messageID, level)
else:
self.fmeLogfile_.logMessage(messageID, params, level)
# Output debug messages
elif self.debug_:
if params is None:
self.fmeLogfile_.logMessageString(messageID, level)
else:
self.fmeLogfile_.logMessage(messageID, params, level)
def logFeature(self, feature, level=FME_INFORM, debug=False):
"""Write a feature to the log.
:param fmeobjects.FMEFeature feature: Feature to log.
:param int level: Message severity level.
:param bool debug: If True, then this feature will only be logged if this
:class:`Logger` instance is in debug mode.
"""
# Output non-debug messages
if not debug:
self.fmeLogfile_.logFeature(feature, level)
# Output debug messages
elif self.debug_:
self.fmeLogfile_.logFeature(feature, level)
def logProxy(self, log_prefix, proxy_url):
"""Log the given proxy server URL using the appropriate message, but
with any credentials present in the URL stripped out.
:param str log_prefix: String to prefix the log message.
Usually the format name and direction of the invoking format.
:param str proxy_url: Proxy URL, which may contain the username and password.
:rtype: None
"""
credentials_separator_index = proxy_url.rfind('@')
if credentials_separator_index > -1:
# Strip out credentials if they're present.
proxy_url = proxy_url[:proxy_url.find(
'://') + 3] + proxy_url[credentials_separator_index + 1:]
self.logMessage(fmeconstants.kFME_MSGNUM_USING_PROXY,
[log_prefix, proxy_url])
def allowDuplicateMessages(self, allowDuplicateMessages):
"""If True, tells the logger not to hide repeated messages. If False
the logger will hide repeated lines.
:param bool allowDuplicateMessages: If True display repeated lines in log
"""
self.fmeLogfile_.allowDuplicateMessages(allowDuplicateMessages)
def getAllowDuplicateMessages(self):
"""Returns the status of the handling of duplicated messages in the
logger. If True the logger is set to display repeated lines. If False
the logger will hide repeated lines.
:rtype: bool
"""
return self.fmeLogfile_.getAllowDuplicateMessages()
def getMessage(self, messageNumber, messageParameters):
"""Returns a formatted message using the given params.
:type messageParameters: list[six.text_type]
:param int messageNumber: The message number to look up.
:param messageParameters: The list of arguments.
:rtype: six.text_type
"""
return self.fmeLogfile_.getMessage(messageNumber, messageParameters)
class FMETZInfo(tzinfo):
"""Rudimentary class that represents an arbitrary time zone offset.
Used in :func:`fmeDateToPython`, but probably not useful for end
users.
"""
def __init__(self, offset):
"""
:param int offset: Time zone offset, in minutes
"""
super(FMETZInfo, self).__init__()
self.offset = offset
def utcoffset(self, dt):
return timedelta(minutes=self.offset)
def dst(self, dt):
return timedelta(0)
def tzname(self, dt):
return None
def fmeDateToPython(dateStr):
"""Convert an FME datetime/date/time to a Python datetime/date/time object.
If time or datetime successfully parsed, the result will always have microseconds,
as Python doesn't have a concept of null microseconds.
:type dateStr: six.text_type
:param dateStr: An FME datetime string
:returns: Python datetime object, has date, has time
:rtype: datetime.datetime
"""
def microsecond_format(value):
"""
:param value: Microseconds or nanoseconds. Can be `None`.
Though FME datetime format supports nanoseconds, Python datetime does not, so it's truncated.
:rtype: str
:returns: 6-character value suitable for parsing as `%f` in :meth:`datetime.strptime`.
"""
if value is None:
value = ''
# Python 2 rejects unicode fill character, and we have unicode_literals on.
return value.ljust(6, b'0' if six.PY2 else '0')[:6]
# Ensure it's a string.
if not isinstance(dateStr, string_types):
dateStr = str(dateStr)
regex = r'(?P<dt>\d+)(?:\.(?P<us>\d+))?(?:(?P<tzs>[\-+])(?P<tzh>[01][0-9])(?::(?P<tzm>[0-5][0-9]))?)?'
match = re.match(regex, dateStr)
if match is None:
# If regex doesn't match, then it's unparseable.
return None, False, False
dt, us, tzs, tzh, tzm = match.group('dt', 'us', 'tzs', 'tzh', 'tzm')
tz = None
if tzs is not None:
# If timezone sign is present, convert time zone offset to minutes,
# and make a tzinfo object to represent it.
tz = int(tzh) * 60
if tzm:
tz += int(tzm)
if tzs == '-':
tz *= -1
tz = FMETZInfo(tz)
fmeDateFormat, fmeTimeFormat = '%Y%m%d', '%H%M%S%f'
dtSize = len(dt)
try:
if dtSize == 8:
# Parse date.
return datetime.strptime(dt, fmeDateFormat), True, False
if dtSize == 6:
# Parse time plus microseconds, and time zone if present.
dt += microsecond_format(us)
theTime = datetime.strptime(dt, fmeTimeFormat)
theTime = theTime.replace(tzinfo=tz)
return theTime, False, True
if dtSize == 14:
# Parse datetime plus microseconds, and time zone if present.
dt += microsecond_format(us)
fmt = fmeDateFormat + fmeTimeFormat
theDateTime = datetime.strptime(dt, fmt)
theDateTime = theDateTime.replace(tzinfo=tz)
return theDateTime, True, True
except ValueError:
# Any kind of parsing error, including impossible dates/times,
# shall return None.
return None, False, False
# Failed to parse.
return None, False, False
def pythonDateTimeToFMEFormat(theDateTime):
"""Convert a Python datetime to a string in FME date format. Works around
issue of strftime() refusing to output anything for datetimes before 1900.
:param datetime.datetime theDateTime: A valid Python datetime
:return: The converted value.
:rtype: six.text_type
"""
# Get the time tuple and concatenate the first 6 values (date and time).
# Time tuple doesn't contain time zone or microseconds, so those parts are added separately.
timetuple = theDateTime.timetuple()
result = str(timetuple.tm_year).zfill(4)
result += ''.join(str(timetuple[i]).zfill(2) for i in range(1, 6))
# Add microseconds if they're not zero.
microseconds = theDateTime.microsecond
if microseconds > 0:
result += '.' + str(microseconds).zfill(6)
# Add time zone offset if the datetime has a time zone.
tz = theDateTime.tzinfo
if tz:
# Get time zone offset in minutes. The // operator is discard floor division.
totalOffsetMinutes = int(tz.utcoffset(False).total_seconds()) // 60
offsetH, offsetM = abs(totalOffsetMinutes) // 60, abs(
totalOffsetMinutes) % 60
result += '+' if totalOffsetMinutes >= 0 else '-' # Sign.
result += str(offsetH).zfill(2) # Hours.
if offsetM > 0: # Include minutes if they're not zero.
result += str(offsetM).zfill(2)
return result
UTC_TZ = FMETZInfo(0)
def unixtimeToPython(timestamp_ms):
"""Parse millisecond unix timestamps to Python datetime in UTC. Supports
negative timestamps. Use this function instead of
:meth:`datetime.date.fromtimestamp`.
:param int timestamp_ms: Unix timestamp in milliseconds. Negative values are okay.
:returns: Python datetime, with UTC timezone.
:rtype: datetime.datetime
"""
timestamp_s, timestamp_ms_part = timestamp_ms // 1000, timestamp_ms % 1000
if timestamp_ms < 0:
return datetime(
1970, 1, 1, tzinfo=UTC_TZ) + timedelta(
seconds=timestamp_s, milliseconds=timestamp_ms_part)
return datetime.fromtimestamp(
timestamp_s, tz=UTC_TZ).replace(microsecond=timestamp_ms_part * 1000)
def isoTimestampToFMEFormat(isoTimestamp):
"""Convert an ISO timestamp to FME format. Doesn't try to actually parse
and validate the timestamp. Since the standard library doesn't include a
way to parse ISO 8601 timestamps, this function is a simple but imperfect
way/ to avoid using another third-party library.
Examples:
============================= =======================
ISO 8601 FME format
============================= =======================
2014-04-15T16:54:20Z 20140415165420+00
2014-04-15T16:54:20.123+05:30 20140415165420.123+0530
============================= =======================
:type isoTimestamp: six.text_type
:param isoTimestamp: ISO 8601-formatted timestamp.
:return: Timestamp in FME format, or None if timestamp could not be parsed
:rtype: six.text_type or None
"""
if not isinstance(isoTimestamp, text_type):
try:
isoTimestamp = systemToUnicode(isoTimestamp)
except UnicodeEncodeError:
return None
# Remove date/time separator, time part, and timezone part separator.
formatted = isoTimestamp.replace('T', '')
formatted = formatted.replace(' ', '')
formatted = formatted.replace(':', '')
formatted = formatted.replace(
'-', '', 2) # Remove first 2 dashes: the date part separator.
# Minimum timestamp length is 14 characters/digits to represent both date and time.
if len(formatted) < 14:
return None
# Replace Z timezone with explicit offset.
if formatted[-1] == 'Z':
formatted = formatted.replace('Z', '+00')
return formatted
def parse_gui_date(value, raise_on_error=True):
"""Parse a string representation of a date into an object. The string
representation can be:
* ``YYYYMMDD`` - as set by ``GUI DATE`` and ``GUI DYNAMIC_MULTI_SELECT`` with ``FME_RESULT_TYPE,DATE``
* ``YYYY-MM-DD`` - as shown in the UI, and what users are expected to type manually
when the GUI types above become plaintext fields in the Navigator.
:param str value: Date string to parse, in either ``YYYYMMDD`` or ``YYYY-MM-DD`` form.
:param bool raise_on_error: If False, return None instead of raising errors.
:rtype: datetime.datetime
:raises ValueError: If value couldn't be parsed, and `raise_on_error` is True.
"""
# For details about this discrepancy, see PR77997.
value = value.replace('-', '')
try:
return datetime.strptime(value, '%Y%m%d')
except:
if not raise_on_error:
return
raise
def retryOnException(exception,
maxTries,
logWrapper=lambda attempt, maximum: None,
action=lambda *x: None,
*actionArgs,
**actionKwargs):
"""Function generating a decorator to retry a function several times,
taking an action on a specific exception. When maximum retries have been
made, the exception is raised again.
:param class exception: The exception class
:param int maxTries: The maximum number of times to try. On a failed attempt 'maxTries', the exception is re-raised
:param function logWrapper: A logging function which will be called with logWrapper(attempt, maxRetries) on each retry
:param function action: A function to call in case of exception, before retrying
:param actionArgs: Arguments to pass to the action function
:param actionKwargs: Keyword arguments to pass to the action function
:return: The decorator
"""
def actualDecorator(function):
"""The actual decorator which uses the arguments from
retryOnException."""
def decorated_func(*args, **kwargs):
"""The decorated function which will be returned by
actualDecorator."""
for i in range(0, maxTries):
if i > 0:
logWrapper(i, maxTries - 1)
try:
return function(*args, **kwargs)
except exception as e:
if i < maxTries - 1:
action(*actionArgs, **actionKwargs)
else:
raise e
return decorated_func
return actualDecorator
def mangleDuplicateName(candidateName, usedNames):
"""Generate unique names for otherwise duplicated feature type or attribute
names, in the same way FME Workbench would.
:param str candidateName: The input name,
just as if the user typed it into the corresponding field on the User Attributes tab.
The input value needs to be encoding-mangled, if applicable.
:param set[str] usedNames: Set of already-assigned names.
The caller is responsible for adding the name returned by this function to this set.
:return: Mangled name, guaranteed unique among `usedNames`.
:rtype: str
"""
if candidateName not in usedNames:
# Name not duplicated.
return candidateName
mangleIndex = 0
while True:
mangledName = "%s%02d" % (candidateName, mangleIndex)
if mangledName not in usedNames:
return mangledName
mangleIndex += 1
def parseMultiParam(multiparam, delim=';', decode=False):
"""Creates a dictionary from an FME MULTIPARAM
Example:
``SMTP_GROUP;;HOST;email3;PORT;25;ENCRYPTION;None;TIMEOUT;5;``
``AUTHENTICATION;NO;USERNAME;<Unused>;PASSWORD;<Unused>``
:param str multiparam: The multiparam string to be parsed
:param str delim: The delimiter (optional), defaults to ``;``
:return: A dictionary with key-value PARAM=value
Example: ``{ HOST: "email3", PORT: "25", ... }``
:rtype: dict
"""
def generatePairs(iterable):
iterator = iter(iterable)
try:
while True:
key, value = next(iterator), next(iterator)
# PR70661: Generically apply @Concatenate() to decrypt fme_decrypt() values.
if value.startswith('fme_decrypt('):
value = FMEFeature().performFunction(
'@Concatenate({})'.format(value))
if value != '':
yield key, value
else:
continue
except StopIteration:
pass
# multiparam = multiparam.split(delim * 2)[-1]
multiparam = multiparam.split(delim)
return dict(generatePairs(multiparam))
def replaceMultiparam(multi, param, value, delim=';'):
"""Sets the multiparameter with a new value for attribute.
:param str multi: The multiparam string
:param str param: The name of the parameter
:param str value: The new value of the parameter
:param str delim: The delimiter, default ``;``
:returns: New value for multiparam.
:rtype: str
"""
# Adapted from slocke's fmegeocoder unit tests.
attr = param + delim
findIndex = multi.find(attr)
if findIndex < 0:
findIndex = len(multi)
attr = delim + attr
prefix = multi[0:findIndex]
remove = multi[findIndex:len(multi)]
findIndex = remove.find(delim, len(attr))
if findIndex < 0:
findIndex = len(remove)
suffix = remove[findIndex:len(remove)]
return prefix + attr + str(value) + suffix
def zipProperties(category, properties):
"""Return a list where each odd index is category, and each even index is a
property.
Example:
``[category, property[0], category, property[1], ..., category, property[n]]``
:param str category: Property category.
:param list[str]|None properties: Property values for the category. Can be empty or None.
:returns: None if properties list is empty. Otherwise, the returned list has an even number of string elements.
:rtype: list[str]
"""
if not properties:
return None
categories = [category] * len(properties)
completed = categories + properties
completed[::2] = categories # Odd indexes.
completed[1::2] = properties # Even indexes.
return completed
def build_feature(feature_type,
attrs=None,
attr_types=None,
geometry=None,
coord_sys=None):
"""Build an :class:`fmeobjects.FMEFeature` instance with the most frequently used
parameters.
This helper function reduces verbosity and boilerplate code associated with FMEFeature construction.
It also helps avoid common pitfalls such as:
* Undefined geometry on feature
* Calling :meth:`fmeobjects.FMEFeature.setAttribute` with a `None` value
To build schema features, use :func:`build_schema_feature` instead.
:param str feature_type: The feature type.
:param dict attrs: Attribute names and values.
:param dict attr_types: Attribute names and their fmeobjects type.
This is used for setting null attribute values.
If not specified, or if there's no mapping for a given attribute name,
then the null value will be set with :data:`fmeobjects.FME_ATTR_STRING`.
:param fmeobjects.FMEGeometry geometry: Geometry to put on the feature.
:param str coord_sys: Coordinate system name to set.
:rtype: fmeobjects.FMEFeature
"""
feature = FMEFeature()
feature.setFeatureType(feature_type)
feature.setGeometry(geometry)
if coord_sys:
feature.setCoordSys(coord_sys)
if attrs:
for attr_name, value in iteritems(attrs):
if value is None:
if attr_types is None:
attr_types = {}
# FME_ATTR_UNDEFINED is silently interpreted as FME_ATTR_STRING.
feature.setAttributeNullWithType(
attr_name,
attr_types.get(attr_name, fmeobjects.FME_ATTR_STRING))
else:
feature.setAttribute(attr_name, value)
return feature
def build_schema_feature(feature_type, schema_attrs=None, fme_geometries=None):
"""Build an :class:`fmeobjects.FMEFeature` suitable for returning from
:meth:`pluginbuilder.FMEReader.readSchema`. Helps avoid common pitfalls such as:
* Setting any geometry on the feature
* Setting non-user attributes as sequenced attributes
* Setting user attributes as regular attributes
:param str feature_type: The feature type.
:param collections.OrderedDict schema_attrs: Ordered schema attributes for the feature type.
Keys are attribute names, and values are format-specific attribute types.
:param list fme_geometries: Format-specific geometry types for this feature type.
:rtype: fmeobjects.FMEFeature
"""
assert isinstance(schema_attrs, OrderedDict) or not schema_attrs
if schema_attrs is None:
schema_attrs = {}
feature = FMEFeature()
feature.setFeatureType(feature_type)
if fme_geometries:
feature.setAttribute(fmeobjects.kFMERead_Geometry, fme_geometries)
for attr_name, value in iteritems(schema_attrs):
assert value
feature.setSequencedAttribute(attr_name, value)
return feature
def set_list_attribute_with_properties(feature,
index,
property_attrs,
attr_types=None):
"""Set a list attribute entry onto a feature, where the entry is comprised
of one or more properties, e.g.: ``name{i}.property``.
To set a property-less list attribute comprised of strings,
use :meth:`fmeobjects.FMEFeature.setAttribute` instead.
:param fmeobjects.FMEFeature feature: Feature to receive the list attribute.
:param int index: Index into the list attribute to set.
:param dict property_attrs: List attribute names and values.
All attribute names must follow the format ``name{}.property``.
The empty braces will get filled with the index.
:param dict attr_types: Attribute names and their fmeobjects type.
This is used for setting null attribute values.
If not specified, or if there's no mapping for a given attribute name,
then the null value will be set with :data:`fmeobjects.FME_ATTR_STRING`.
"""
for attr_name, value in iteritems(property_attrs):
assert '{}' in attr_name
final_attr_name = attr_name.replace('{}', '{%s}' % index, 1)
if value is None:
if attr_types is None:
attr_types = {}
# FME_ATTR_UNDEFINED is silently interpreted as FME_ATTR_STRING.
feature.setAttributeNullWithType(
final_attr_name,
attr_types.get(attr_name, fmeobjects.FME_ATTR_STRING))
else:
feature.setAttribute(final_attr_name, value)
def aggressive_normpath(path):
"""
Path normalization that accepts Windows or Unix paths and normalizes them to the current
platform.
This means you can use backslashes on Linux, which is something FME might give us on a cross-
platform workspace.
:type path: six.text_type
:param path: The input path
:return: The normalized path.
:rtype: six.text_type
"""
standard_seps_path = os.path.sep.join(re.split(r"[\\/]", path))
return os.path.normpath(standard_seps_path)
def remove_invalid_path_chars(path, allow_separators=True):
"""
Replaces all potentially questionable characters from a path with underscores (_).
:param str path: the raw path
:param bool allow_separators: whether to allow slashes in the path (i.e. is this just a filename
or is it a full path?)
:return: the sanitized path
:rtype: str
"""
pattern = r'[\r\n\t\0'
# Windows is stricter than posix
if os.name == 'nt':
pattern += r'<>:"|?*'
if not allow_separators:
pattern += r'\\\/'
pattern += r']'
return re.sub(pattern, '_', path)