In [1]:
import pandas as pd

In [131]:
import inspect
import hashlib
import os
import pandas as pd
import numpy as np

def merge_two_dicts(x, y):
    '''Given two dicts, merge them into a new dict as a shallow copy.
    from http://stackoverflow.com/a/26853961/2156909'''
    z = x.copy()
    z.update(y)
    return z

class AzurePipeline(object):
    def __init__(self, meta, cache_dir='.', save_intermediate_results=True):
        self.meta = pd.DataFrame(meta)
        self.df = None
        
        self.func_hash_dir = os.path.join(os.path.realpath(cache_dir),
                                          'func_hash')
        self.df_hash_dir = os.path.join(os.path.realpath(cache_dir),
                                        'df_hash')
        self.df_cache_dir = os.path.join(os.path.realpath(cache_dir),
                                         'df_cache')
        
        self.save_intermediate_results = save_intermediate_results        
        self.func_history = []
        
    
    def __add__(self, other):        
        d = merge_two_dicts(self.meta.to_dict(), other.meta.to_dict())
        self.meta = pd.DataFrame(d)
        
        if other.df is not None:
            if self.df is None:
                self.df = other.df
            else:
                self.df = pd.concat((self.df, other.df))
        
        self.func_history = [self.func_history,
                             other.func_history]
        
        return self
        
        
    def get_df_hash_file(self, func):
        try:
            args = self.meta[func.__name__].dropna().to_dict()
        except KeyError:
            args = ''
            
        filename = os.path.join(self.df_hash_dir, 
                            hashlib.sha256('-'.join(self.func_history + [func.__name__]) 
                                            + '-' + str(args)).hexdigest()
                            + '.sha256')
        
        if not os.path.exists(self.df_hash_dir):
            os.makedirs(self.df_hash_dir)
            
        return filename
        
    
    def get_func_hash_file(self, func):
        filename = os.path.join(self.func_hash_dir, func.__name__ + '.sha256')
    
        if not os.path.exists(self.func_hash_dir):
            os.makedirs(self.func_hash_dir)
            
        return filename
        
        
    def get_df_cache_file(self, func):
        try:
            args = self.meta[func.__name__].dropna().to_dict()
        except KeyError:
            args = ''
            
        filename = os.path.join(self.df_cache_dir, 
                            hashlib.sha256('-'.join(self.func_history + [func.__name__]) 
                                            + '-' + str(args)).hexdigest()
                            + '.pkl')   
    
        if not os.path.exists(self.df_cache_dir):
            os.makedirs(self.df_cache_dir)
            
        return filename
    
    
    def is_data_unchanged(self, func):
        local_hash = hashlib.sha256(np.asarray(self.df).tostring()).hexdigest()
        
        filename = self.get_df_hash_file(func)
        
        if os.path.exists(filename):
            with open(filename, 'r') as f:
                saved_hash = f.readline()
        else:
            saved_hash = None
            
        if local_hash == saved_hash:
            return True
        else:
            if self.save_intermediate_results:
                with open(filename, 'w') as f:
                    f.write(local_hash)
            
            return False           
    
    def is_func_unchanged(self, func):
        source = inspect.getsource(func)
        local_hash = hashlib.sha256(source).hexdigest()
        
        filename = self.get_func_hash_file(func)
        
        if os.path.exists(filename):
            with open(filename, 'r') as f:
                saved_hash = f.readline()
        else:
            saved_hash = None
            
        if local_hash == saved_hash:
            return True
        else:
            if self.save_intermediate_results:
                with open(filename, 'w') as f:
                    f.write(local_hash)
            
            return False           
        
            
    def cache(self, func):
        filename = self.get_df_cache_file(func)
        self.df.to_pickle(filename)
    
    
    def load_from_cache(self, func):
        filename = self.get_df_cache_file(func)
        self.df = pd.read_pickle(filename)
        
        
    def apply(self, func, args=None):
        if args:
            d = merge_two_dicts(self.meta.to_dict(), args)
            self.meta = pd.DataFrame(d)
        
        if not self.is_data_unchanged(func) or \
        not self.is_func_unchanged(func):
            self.df, self.meta = func(self.df, self.meta,)        
            self.cache(func)
        else:
            self.load_from_cache(func)
            
        self.func_history += [func.__name__]
        

In [146]:
def read_wav(df, meta):
    args = meta['read_wav']
    
    print "read_wav"
    import scipy.io.wavfile
    sound = scipy.io.wavfile.read(args['in_filename'])
    df = pd.DataFrame(sound[1])
    return df, meta
    
def test2(df, meta):
#     args = meta['test2']
    
    print "test2"
    df /= 3
    return df, meta
    
def save(df, meta):
    args = meta['save']
    
    print "save"
    
    df.to_csv(args['out_filename'])
    
    return df, meta
    
    

In [147]:
meta = {'read_wav': {'in_filename': '../../../data/night.wav'},
        'save': {'out_filename': 'night.csv'}}

ap = AzurePipeline(meta)

ap.apply(read_wav)
ap.apply(test2)
ap.apply(save)

test2
save


In [130]:
debug

> [0;32m<ipython-input-127-0861ecf8b204>[0m(88)[0;36mis_data_unchanged[0;34m()[0m
[0;32m     87 [0;31m            [0;32mif[0m [0mself[0m[0;34m.[0m[0msave_intermediate_results[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m---> 88 [0;31m                [0;32mwith[0m [0mopen[0m[0;34m([0m[0mfilename[0m[0;34m,[0m [0;34m'w'[0m[0;34m)[0m [0;32mas[0m [0mf[0m[0;34m:[0m[0;34m[0m[0m
[0m[0;32m     89 [0;31m                    [0mf[0m[0;34m.[0m[0mwrite[0m[0;34m([0m[0mlocal_hash[0m[0;34m)[0m[0;34m[0m[0m
[0m
ipdb> self.func_hash_dir
'/Users/peter/Documents/phd/projects/azure/engaged_hackathon/core/notebooks/func_hash'
ipdb> os.path.exists(self.func_hash_dir)
False
ipdb> os.makedirs(self.func_hash_dir)
ipdb> q


In [97]:
a = pd.DataFrame({'read_wav': {'in_filename': '../../../data/day.wav'},
              'save': {'out_filename': 'test_night.csv'}})

In [116]:
a['read_wav'].dropna().to_dict()

{'in_filename': '../../../data/day.wav'}

In [103]:
pd.DataFrame(meta)

Unnamed: 0,read_wav,save
in_filename,../../../data/day.wav,
out_filename,,test_night.csv


In [63]:
'' == None

False