diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py index caa2baa32a364..4fda2a9b950b8 100644 --- a/python/pyspark/cloudpickle.py +++ b/python/pyspark/cloudpickle.py @@ -55,7 +55,7 @@ import dis import traceback -# relevant opcodes +#relevant opcodes STORE_GLOBAL = chr(dis.opname.index('STORE_GLOBAL')) DELETE_GLOBAL = chr(dis.opname.index('DELETE_GLOBAL')) LOAD_GLOBAL = chr(dis.opname.index('LOAD_GLOBAL')) @@ -70,11 +70,7 @@ try: import ctypes except (MemoryError, ImportError): - logging.warning( - ('Exception raised on importing ctypes. Likely python bug.. ' - 'some functionality will be disabled'), - exc_info=True - ) + logging.warning('Exception raised on importing ctypes. Likely python bug.. some functionality will be disabled', exc_info = True) ctypes = None PyObject_HEAD = None else: @@ -91,11 +87,9 @@ except ImportError: from StringIO import StringIO - # These helper functions were copied from PiCloud's util module. def islambda(func): - return getattr(func, 'func_name') == '' - + return getattr(func,'func_name') == '' def xrange_params(xrangeobj): """Returns a 3 element tuple describing the xrange start, step, and len @@ -108,32 +102,31 @@ def xrange_params(xrangeobj): """ xrange_len = len(xrangeobj) - if not xrange_len: # empty - return (0, 1, 0) + if not xrange_len: #empty + return (0,1,0) start = xrangeobj[0] - if xrange_len == 1: # one element + if xrange_len == 1: #one element return start, 1, 1 return (start, xrangeobj[1] - xrangeobj[0], xrange_len) -# debug variables intended for developer use: +#debug variables intended for developer use: printSerialization = False printMemoization = False -useForcedImports = True # Should I use forced imports for tracking? +useForcedImports = True #Should I use forced imports for tracking? + class CloudPickler(pickle.Pickler): dispatch = pickle.Pickler.dispatch.copy() savedForceImports = False - savedDjangoEnv = False # hack tro transport django environment + savedDjangoEnv = False #hack tro transport django environment - def __init__(self, file, protocol=None, min_size_to_save=0): - pickle.Pickler.__init__(self, file, protocol) - self.modules = set() # set of modules needed to depickle - # map ids to dictionary. used to ensure that - # functions can share global env - self.globals_ref = {} + def __init__(self, file, protocol=None, min_size_to_save= 0): + pickle.Pickler.__init__(self,file,protocol) + self.modules = set() #set of modules needed to depickle + self.globals_ref = {} # map ids to dictionary. used to ensure that functions can share global env def dump(self, obj): # note: not thread safe @@ -157,57 +150,59 @@ def dump(self, obj): def save_buffer(self, obj): """Fallback to save_string""" - pickle.Pickler.save_string(self, str(obj)) + pickle.Pickler.save_string(self,str(obj)) dispatch[buffer] = save_buffer - # block broken objects + #block broken objects def save_unsupported(self, obj, pack=None): raise pickle.PicklingError("Cannot pickle objects of type %s" % type(obj)) dispatch[types.GeneratorType] = save_unsupported - # python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it + #python2.6+ supports slice pickling. some py2.5 extensions might as well. We just test it try: - slice(0, 1).__reduce__() - except TypeError: # can't pickle - + slice(0,1).__reduce__() + except TypeError: #can't pickle - dispatch[slice] = save_unsupported - # itertools objects do not pickle! + #itertools objects do not pickle! for v in itertools.__dict__.values(): if type(v) is type: dispatch[v] = save_unsupported + def save_dict(self, obj): """hack fix If the dict is a global, deal with it in a special way """ - # print 'saving', obj + #print 'saving', obj if obj is __builtins__: self.save_reduce(_get_module_builtins, (), obj=obj) else: pickle.Pickler.save_dict(self, obj) dispatch[pickle.DictionaryType] = save_dict + def save_module(self, obj, pack=struct.pack): """ Save a module as an import """ - # print 'try save import', obj.__name__ + #print 'try save import', obj.__name__ self.modules.add(obj) - self.save_reduce(subimport, (obj.__name__,), obj=obj) - dispatch[types.ModuleType] = save_module # new type + self.save_reduce(subimport,(obj.__name__,), obj=obj) + dispatch[types.ModuleType] = save_module #new type def save_codeobject(self, obj, pack=struct.pack): """ Save a code object """ - # print 'try to save codeobj: ', obj + #print 'try to save codeobj: ', obj args = ( obj.co_argcount, obj.co_nlocals, obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, obj.co_varnames, obj.co_filename, obj.co_name, obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, obj.co_cellvars ) self.save_reduce(types.CodeType, args, obj=obj) - dispatch[types.CodeType] = save_codeobject # new type + dispatch[types.CodeType] = save_codeobject #new type def save_function(self, obj, name=None, pack=struct.pack): """ Registered with the dispatch to handle all function types. @@ -219,12 +214,10 @@ def save_function(self, obj, name=None, pack=struct.pack): name = obj.__name__ modname = pickle.whichmodule(obj, name) - # print 'which gives %s %s %s' % (modname, obj, name) + #print 'which gives %s %s %s' % (modname, obj, name) try: themodule = sys.modules[modname] - except KeyError: - # eval'd items such as namedtuple give invalid items - # for their function __module__ + except KeyError: # eval'd items such as namedtuple give invalid items for their function __module__ modname = '__main__' if modname == '__main__': @@ -234,29 +227,28 @@ def save_function(self, obj, name=None, pack=struct.pack): self.modules.add(themodule) if not self.savedDjangoEnv: - # hack for django - if we detect the settings module, we transport it + #hack for django - if we detect the settings module, we transport it django_settings = os.environ.get('DJANGO_SETTINGS_MODULE', '') if django_settings: django_mod = sys.modules.get(django_settings) if django_mod: - cloudLog.debug( - 'Transporting django settings %s during save of %s', - django_mod, name) + cloudLog.debug('Transporting django settings %s during save of %s', django_mod, name) self.savedDjangoEnv = True self.modules.add(django_mod) write(pickle.MARK) self.save_reduce(django_settings_load, (django_mod.__name__,), obj=django_mod) write(pickle.POP_MARK) + # if func is lambda, def'ed at prompt, is in main, or is nested, then # we'll pickle the actual function object rather than simply saving a # reference (as is done in default pickler), via save_function_tuple. if islambda(obj) or obj.func_code.co_filename == '' or themodule is None: - # Force server to import modules that have been imported in main + #Force server to import modules that have been imported in main modList = None if themodule is None and not self.savedForceImports: mainmod = sys.modules['__main__'] - if useForcedImports and hasattr(mainmod, '___pyc_forcedImports__'): + if useForcedImports and hasattr(mainmod,'___pyc_forcedImports__'): modList = list(mainmod.___pyc_forcedImports__) self.savedForceImports = True self.save_function_tuple(obj, modList) @@ -298,12 +290,12 @@ def save_function_tuple(self, func, forced_imports): if forced_imports: write(pickle.MARK) save(_modules_to_main) - # print 'forced imports are', forced_imports + #print 'forced imports are', forced_imports forced_names = map(lambda m: m.__name__, forced_imports) save((forced_names,)) - # save((forced_imports,)) + #save((forced_imports,)) write(pickle.REDUCE) write(pickle.POP_MARK) @@ -350,7 +342,7 @@ def extract_code_globals(co): extended_arg = oparg*65536L if op in GLOBAL_OPS: out_names.add(names[oparg]) - # print 'extracted', out_names, ' from ', names + #print 'extracted', out_names, ' from ', names return out_names def extract_func_data(self, func): @@ -364,14 +356,13 @@ def extract_func_data(self, func): func_global_refs = CloudPickler.extract_code_globals(code) if code.co_consts: # see if nested function have any global refs for const in code.co_consts: - if isinstance(const, types.CodeType) and const.co_names: - func_global_refs = func_global_refs.union( - CloudPickler.extract_code_globals(const)) + if type(const) is types.CodeType and const.co_names: + func_global_refs = func_global_refs.union( CloudPickler.extract_code_globals(const)) # process all variables referenced by global environment f_globals = {} for var in func_global_refs: - # Some names, such as class functions are not global - we don't need them - if var in func.func_globals: + #Some names, such as class functions are not global - we don't need them + if func.func_globals.has_key(var): f_globals[var] = func.func_globals[var] # defaults requires no processing @@ -380,10 +371,9 @@ def extract_func_data(self, func): def get_contents(cell): try: return cell.cell_contents - except ValueError, e: # cell is empty error on not yet assigned - raise pickle.PicklingError( - ('Function to be pickled has free variables that are referenced before ' - 'assignment in enclosing scope')) + except ValueError, e: #cell is empty error on not yet assigned + raise pickle.PicklingError('Function to be pickled has free variables that are referenced before assignment in enclosing scope') + # process closure if func.func_closure: @@ -395,7 +385,7 @@ def get_contents(cell): dct = func.func_dict if printSerialization: - outvars = ['code: ' + str(code)] + outvars = ['code: ' + str(code) ] outvars.append('globals: ' + str(f_globals)) outvars.append('defaults: ' + str(defaults)) outvars.append('closure: ' + str(closure)) @@ -420,7 +410,7 @@ def save_global(self, obj, name=None, pack=struct.pack): try: __import__(modname) themodule = sys.modules[modname] - except (ImportError, KeyError, AttributeError): # should never occur + except (ImportError, KeyError, AttributeError): #should never occur raise pickle.PicklingError( "Can't pickle %r: Module %s cannot be found" % (obj, modname)) @@ -433,48 +423,46 @@ def save_global(self, obj, name=None, pack=struct.pack): sendRef = True typ = type(obj) - # print 'saving', obj, typ + #print 'saving', obj, typ try: - try: # Deal with case when getattribute fails with exceptions + try: #Deal with case when getattribute fails with exceptions klass = getattr(themodule, name) except (AttributeError): - if modname == '__builtin__': # new.* are misrepeported + if modname == '__builtin__': #new.* are misrepeported modname = 'new' __import__(modname) themodule = sys.modules[modname] try: klass = getattr(themodule, name) except AttributeError, a: - # print themodule, name, obj, type(obj) + #print themodule, name, obj, type(obj) raise pickle.PicklingError("Can't pickle builtin %s" % obj) else: raise except (ImportError, KeyError, AttributeError): - if isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType): + if typ == types.TypeType or typ == types.ClassType: sendRef = False - else: # we can't deal with this + else: #we can't deal with this raise else: - if klass is not obj and - (isinstance(obj, types.TypeType) or isinstance(obj, types.ClassType)): + if klass is not obj and (typ == types.TypeType or typ == types.ClassType): sendRef = False if not sendRef: - # note: Third party types might crash this - add better checks! - d = dict(obj.__dict__) # copy dict proxy to a dict - - # don't extract dict that are properties - if not isinstance(d.get('__dict__', None), property): - d.pop('__dict__', None) - d.pop('__weakref__', None) + #note: Third party types might crash this - add better checks! + d = dict(obj.__dict__) #copy dict proxy to a dict + if not isinstance(d.get('__dict__', None), property): # don't extract dict that are properties + d.pop('__dict__',None) + d.pop('__weakref__',None) # hack as __new__ is stored differently in the __dict__ new_override = d.get('__new__', None) if new_override: d['__new__'] = obj.__new__ - self.save_reduce(type(obj), (obj.__name__, obj.__bases__, d), obj=obj) - # print 'internal reduce dask %s %s' % (obj, d) + self.save_reduce(type(obj),(obj.__name__,obj.__bases__, + d),obj=obj) + #print 'internal reduce dask %s %s' % (obj, d) return if self.proto >= 2: @@ -484,7 +472,7 @@ def save_global(self, obj, name=None, pack=struct.pack): if code <= 0xff: write(pickle.EXT1 + chr(code)) elif code <= 0xffff: - write("%c%c%c" % (pickle.EXT2, code & 0xff, code >> 8)) + write("%c%c%c" % (pickle.EXT2, code&0xff, code>>8)) else: write(pickle.EXT4 + pack("= 2 and getattr(func, "__name__", "") == "__newobj__": - # Added fix to allow transient + #Added fix to allow transient cls = args[0] if not hasattr(cls, "__new__"): raise pickle.PicklingError( @@ -606,7 +596,7 @@ def save_reduce(self, func, args, state=None, args = args[1:] save(cls) - # Don't pickle transient entries + #Don't pickle transient entries if hasattr(obj, '__transient__'): transient = obj.__transient__ state = state.copy() @@ -637,44 +627,46 @@ def save_reduce(self, func, args, state=None, self._batch_setitems(dictitems) if state is not None: - # print 'obj %s has state %s' % (obj, state) + #print 'obj %s has state %s' % (obj, state) save(state) write(pickle.BUILD) + def save_xrange(self, obj): """Save an xrange object in python 2.5 Python 2.6 supports this natively """ range_params = xrange_params(obj) - self.save_reduce(_build_xrange, range_params) + self.save_reduce(_build_xrange,range_params) - # python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it + #python2.6+ supports xrange pickling. some py2.5 extensions might as well. We just test it try: xrange(0).__reduce__() - except TypeError: # can't pickle -- use PiCloud pickler + except TypeError: #can't pickle -- use PiCloud pickler dispatch[xrange] = save_xrange def save_partial(self, obj): """Partial objects do not serialize correctly in python2.x -- this fixes the bugs""" self.save_reduce(_genpartial, (obj.func, obj.args, obj.keywords)) - if sys.version_info < (2, 7): # 2.7 supports partial pickling + if sys.version_info < (2,7): #2.7 supports partial pickling dispatch[partial] = save_partial + def save_file(self, obj): """Save a file""" - import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute + import StringIO as pystringIO #we can't use cStringIO as it lacks the name attribute from ..transport.adapter import SerializingAdapter - if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): + if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") if obj.name == '': - return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) + return self.save_reduce(getattr, (sys,'stdout'), obj=obj) if obj.name == '': - return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) + return self.save_reduce(getattr, (sys,'stderr'), obj=obj) if obj.name == '': raise pickle.PicklingError("Cannot pickle standard input") - if hasattr(obj, 'isatty') and obj.isatty(): + if hasattr(obj, 'isatty') and obj.isatty(): raise pickle.PicklingError("Cannot pickle files that map to tty objects") if 'r' not in obj.mode: raise pickle.PicklingError("Cannot pickle files that are not opened for reading") @@ -685,10 +677,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be stat" % name) if obj.closed: - # create an empty closed string io + #create an empty closed string io retval = pystringIO.StringIO("") retval.close() - elif not fsize: # empty file + elif not fsize: #empty file retval = pystringIO.StringIO("") try: tmpfile = file(name) @@ -697,13 +689,10 @@ def save_file(self, obj): raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) tmpfile.close() if tst != '': - raise pickle.PicklingError( - ("Cannot pickle file %s as it does not appear to map to " - "a physical, real file") % name) + raise pickle.PicklingError("Cannot pickle file %s as it does not appear to map to a physical, real file" % name) elif fsize > SerializingAdapter.max_transmit_data: - raise pickle.PicklingError( - "Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % - (name, SerializingAdapter.max_transmit_data)) + raise pickle.PicklingError("Cannot pickle file %s as it exceeds cloudconf.py's max_transmit_data of %d" % + (name,SerializingAdapter.max_transmit_data)) else: try: tmpfile = file(name) @@ -716,7 +705,7 @@ def save_file(self, obj): retval.seek(curloc) retval.name = name - self.save(retval) # save stringIO + self.save(retval) #save stringIO self.memoize(obj) dispatch[file] = save_file @@ -729,7 +718,6 @@ def inject_numpy(self): self.dispatch[numpy.ufunc] = self.__class__.save_ufunc numpy_tst_mods = ['numpy', 'scipy.special'] - def save_ufunc(self, obj): """Hack function for saving numpy ufunc objects""" name = obj.__name__ @@ -739,8 +727,7 @@ def save_ufunc(self, obj): if name in tst_mod.__dict__: self.save_reduce(_getobject, (tst_mod_name, name)) return - raise pickle.PicklingError( - 'cannot save %s. Cannot resolve what module it is defined in' % str(obj)) + raise pickle.PicklingError('cannot save %s. Cannot resolve what module it is defined in' % str(obj)) def inject_timeseries(self): """Handle bugs with pickling scikits timeseries""" @@ -754,22 +741,20 @@ def save_timeseries(self, obj): func, reduce_args, state = obj.__reduce__() if func != ts._tsreconstruct: - raise pickle.PicklingError( - 'timeseries using unexpected reconstruction function %s' % str(func)) - state = ( - 1, - obj.shape, - obj.dtype, - obj.flags.fnc, - obj._data.tostring(), - ts.getmaskarray(obj).tostring(), - obj._fill_value, - obj._dates.shape, - obj._dates.__array__().tostring(), - obj._dates.dtype, # added -- preserve type - obj.freq, - obj._optinfo, - ) + raise pickle.PicklingError('timeseries using unexpected reconstruction function %s' % str(func)) + state = (1, + obj.shape, + obj.dtype, + obj.flags.fnc, + obj._data.tostring(), + ts.getmaskarray(obj).tostring(), + obj._fill_value, + obj._dates.shape, + obj._dates.__array__().tostring(), + obj._dates.dtype, #added -- preserve type + obj.freq, + obj._optinfo, + ) return self.save_reduce(_genTimeSeries, (reduce_args, state)) def inject_email(self): @@ -787,12 +772,12 @@ def inject_addons(self): """Python Imaging Library""" def save_image(self, obj): - if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name and - not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): - # if image not loaded yet -- lazy load - self.save_reduce(_lazyloadImage, (obj.fp,), obj=obj) + if not obj.im and obj.fp and 'r' in obj.fp.mode and obj.fp.name \ + and not obj.fp.closed and (not hasattr(obj, 'isatty') or not obj.isatty()): + #if image not loaded yet -- lazy load + self.save_reduce(_lazyloadImage,(obj.fp,), obj=obj) else: - # image is loaded - just transmit it over + #image is loaded - just transmit it over self.save_reduce(_generateImage, (obj.size, obj.mode, obj.tostring()), obj=obj) """ @@ -803,35 +788,34 @@ def memoize(self, obj): """ + # Shorthands for legacy support def dump(obj, file, protocol=2): CloudPickler(file, protocol).dump(obj) - def dumps(obj, protocol=2): file = StringIO() - cp = CloudPickler(file, protocol) + cp = CloudPickler(file,protocol) cp.dump(obj) - # print 'cloud dumped', str(obj), str(cp.modules) + #print 'cloud dumped', str(obj), str(cp.modules) return file.getvalue() -# hack for __import__ not working as desired +#hack for __import__ not working as desired def subimport(name): __import__(name) return sys.modules[name] - -# hack to load django settings: +#hack to load django settings: def django_settings_load(name): modified_env = False if 'DJANGO_SETTINGS_MODULE' not in os.environ: - os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps + os.environ['DJANGO_SETTINGS_MODULE'] = name # must set name first due to circular deps modified_env = True try: module = subimport(name) @@ -841,28 +825,24 @@ def django_settings_load(name): if modified_env: del os.environ['DJANGO_SETTINGS_MODULE'] else: - # add project directory to sys,path: - if hasattr(module, '__file__'): + #add project directory to sys,path: + if hasattr(module,'__file__'): dirname = os.path.split(module.__file__)[0] + '/' sys.path.append(dirname) - # restores function attributes def _restore_attr(obj, attr): for key, val in attr.items(): setattr(obj, key, val) return obj - def _get_module_builtins(): return pickle.__builtins__ - def print_exec(stream): ei = sys.exc_info() traceback.print_exception(ei[0], ei[1], ei[2], None, stream) - def _modules_to_main(modList): """Force every module in modList to be placed into main""" if not modList: @@ -873,24 +853,22 @@ def _modules_to_main(modList): if type(modname) is str: try: mod = __import__(modname) - except Exception, i: # catch all... + except Exception, i: #catch all... sys.stderr.write('warning: could not import %s\n. Your function may unexpectedly error due to this import failing; \ A version mismatch is likely. Specific error was:\n' % modname) print_exec(sys.stderr) else: - setattr(main, mod.__name__, mod) + setattr(main,mod.__name__, mod) else: - # REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) - # In old version actual module was sent - setattr(main, modname.__name__, modname) - + #REVERSE COMPATIBILITY FOR CLOUD CLIENT 1.5 (WITH EPD) + #In old version actual module was sent + setattr(main,modname.__name__, modname) -# object generators: +#object generators: def _build_xrange(start, step, len): """Built xrange explicitly""" return xrange(start, start + step*len, step) - def _genpartial(func, args, kwds): if not args: args = () @@ -914,13 +892,12 @@ def _fill_function(func, globals, defaults, closure, dict): return func - -def _make_skel_func(code, num_closures, base_globals=None): +def _make_skel_func(code, num_closures, base_globals = None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ - # build closure (cells): + #build closure (cells): if not ctypes: raise Exception('ctypes failed to import; cannot build function') @@ -948,21 +925,17 @@ def _make_skel_func(code, num_closures, base_globals=None): (), (), ('newval',), '', 'cell_changer', 1, '', ('c',), () ) - def _change_cell_value(cell, newval): """ Changes the contents of 'cell' object to newval """ return new.function(cell_changer_code, {}, None, (), (cell,))(newval) - """Constructors for 3rd party libraries Note: These can never be renamed due to client compatibility issues""" - def _getobject(modname, attribute): mod = __import__(modname, fromlist=[attribute]) return mod.__dict__[attribute] - def _generateImage(size, mode, str_rep): """Generate image from string representation""" import Image @@ -970,34 +943,32 @@ def _generateImage(size, mode, str_rep): i.fromstring(str_rep) return i - def _lazyloadImage(fp): import Image - fp.seek(0) # works in almost any case + fp.seek(0) #works in almost any case return Image.open(fp) - """Timeseries""" - - def _genTimeSeries(reduce_args, state): import scikits.timeseries.tseries as ts from numpy import ndarray from numpy.ma import MaskedArray + time_series = ts._tsreconstruct(*reduce_args) - # from setstate modified + #from setstate modified (ver, shp, typ, isf, raw, msk, flv, dsh, dtm, dtyp, frq, infodict) = state - # print 'regenerating %s' % dtyp + #print 'regenerating %s' % dtyp MaskedArray.__setstate__(time_series, (ver, shp, typ, isf, raw, msk, flv)) _dates = time_series._dates - # _dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ - ndarray.__setstate__(_dates, (dsh, dtyp, isf, dtm)) + #_dates.__setstate__((ver, dsh, typ, isf, dtm, frq)) #use remote typ + ndarray.__setstate__(_dates,(dsh,dtyp, isf, dtm)) _dates.freq = frq _dates._cachedinfo.update(dict(full=None, hasdups=None, steps=None, toobj=None, toord=None, tostr=None)) # Update the _optinfo dictionary time_series._optinfo.update(infodict) return time_series +