In [1]:
#import ray
import ray
import datetime
import pandas as pd
import numpy as np

In [2]:
ray.__version__

'2.0.0.dev0'

In [3]:
#ray init or provide cluster url
ray.init()

2021-03-12 08:11:31,653	INFO services.py:1249 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.86.120',
 'raylet_ip_address': '192.168.86.120',
 'redis_address': '192.168.86.120:44077',
 'object_store_address': '/tmp/ray/session_2021-03-12_08-11-30_201480_4812/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-03-12_08-11-30_201480_4812/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-03-12_08-11-30_201480_4812',
 'metrics_export_port': 60791,
 'node_id': '56d76046b9d1379d85f950a1e5ca354ad001d015a6c6368e89409215'}

In [4]:
#feature group Actor
@ray.remote
class FeatureGroup:
    
    def __init__(self):
        self.label = 'FeatureGroup'
        self.last_updated = datetime.datetime.now()
        self.features = None
    def register_features(self,df):
        self.features=df
        self.last_updated = datetime.datetime.now()
        return 0
    def query_features(self,names):
        try:
            return self.features[names]
        except:
            return None
        

In [5]:
#function group
@ray.remote
def mean(df,feature_name,shift,window):

    shifted=df[feature_name].shift(shift)
    window_v = shifted.rolling(window=window)
    value = window_v.mean()
    value.name= "{}_mean_shift_{}_window_{}".format(feature_name,shift,window)
    return value

@ray.remote
def median(df,feature_name,shift,window):

    shifted=df[feature_name].shift(shift)
    window_v = shifted.rolling(window=window)
    value = window_v.median()
    value.name= "{}_median_shift_{}_window_{}".format(feature_name,shift,window)
    return value

@ray.remote
def apply(df,feature_name,function_name,shift,window):

    if function_name == "mean":
        return ray.get(mean.remote(df,feature_name,shift,window))
    elif function_name == "median":
        return ray.get(median.remote(df,feature_name,shift,window))
    else:
        raise Exception("unknown function name {}".format(function_name)) 


In [6]:
#feature Master
@ray.remote
class FeatureMaster:
    
    def __init__(self):
        self.label = 'FunctionGroup'
        self.last_updated = datetime.datetime.now()
    
    def de_register(self,group_name):
        ray.kill(group_name)
        
    def resolve(self,metadata):
        
        functions = metadata['functions']
        features = metadata['featutes']
        data_frame_refs = []
        
        for feature in features:
            group_name = feature['group']
            feature_names = feature['features']
            
            feature_group = ray.get_actor(group_name)
            feature_df = feature_group.query_features.remote(feature_names)
            data_frame_refs.append(feature_df)
        
        if len(data_frame_refs) > 1:
            
            df =  pd.concat(data_frame_refs)
        else:
            df = data_frame_refs[0]
        
        feature_refs = [df]
        
        for function in functions:
            
            functs = function['functions']
            shift = function['shift']
            window = function['window']
            feature_name= function['feature']
            
            for func in functs:
                feature_val= apply.remote(df,feature_name,func,shift,window)
                feature_refs.append(feature_val)
        
        return ray.get(feature_refs)

In [7]:
#Data .Kaggle Covid19 vaccinations data

In [8]:
data = pd.read_csv('country_vaccinations.csv')

In [9]:
data["date"] = pd.to_datetime(data["date"], format = '%Y-%m-%d')
data = data.replace([np.inf, -np.inf], np.nan)
data = data.fillna(0)
data

Unnamed: 0,country,iso_code,date,total_vaccinations,people_vaccinated,people_fully_vaccinated,daily_vaccinations_raw,daily_vaccinations,total_vaccinations_per_hundred,people_vaccinated_per_hundred,people_fully_vaccinated_per_hundred,daily_vaccinations_per_million,vaccines,source_name,source_website
0,Albania,ALB,2021-01-10,0.0,0.0,0.0,0.0,0.0,0.00,0.00,0.0,0.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e...
1,Albania,ALB,2021-01-11,0.0,0.0,0.0,0.0,64.0,0.00,0.00,0.0,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e...
2,Albania,ALB,2021-01-12,128.0,128.0,0.0,0.0,64.0,0.00,0.00,0.0,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e...
3,Albania,ALB,2021-01-13,188.0,188.0,0.0,60.0,63.0,0.01,0.01,0.0,22.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e...
4,Albania,ALB,2021-01-14,266.0,266.0,0.0,78.0,66.0,0.01,0.01,0.0,23.0,Pfizer/BioNTech,Ministry of Health,https://shendetesia.gov.al/covid19-ministria-e...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
5684,Zimbabwe,ZWE,2021-03-05,31325.0,31325.0,0.0,667.0,2678.0,0.21,0.21,0.0,180.0,Sinopharm/Beijing,Ministry of Health,https://twitter.com/MoHCCZim/status/1369360555...
5685,Zimbabwe,ZWE,2021-03-06,32014.0,32014.0,0.0,689.0,2330.0,0.22,0.22,0.0,157.0,Sinopharm/Beijing,Ministry of Health,https://twitter.com/MoHCCZim/status/1369360555...
5686,Zimbabwe,ZWE,2021-03-07,32240.0,32240.0,0.0,226.0,1914.0,0.22,0.22,0.0,129.0,Sinopharm/Beijing,Ministry of Health,https://twitter.com/MoHCCZim/status/1369360555...
5687,Zimbabwe,ZWE,2021-03-08,35518.0,35518.0,0.0,3278.0,2009.0,0.24,0.24,0.0,135.0,Sinopharm/Beijing,Ministry of Health,https://twitter.com/MoHCCZim/status/1369360555...


In [10]:
#Register feature group

In [11]:
feature_group = FeatureGroup.options(name='FeatureGroup1').remote()

In [12]:
feature_group.register_features.remote(data)

ObjectRef(63964fa4841d4a2ea06b84ebf0748b9f9e5434760100000001000000)

In [13]:
#Register feature Master

In [14]:
master = FeatureMaster.options(name='featuremaster').remote() 

In [15]:
#feature metadata

In [16]:
metadata = {
    "featutes":[
        {
            "group":"FeatureGroup1",
            "features":['country','iso_code','total_vaccinations','people_vaccinated']}
    ],
    "functions":[
        {
            "functions":['mean','median'],
            'feature':'total_vaccinations',
            'shift':7,
            'window':7
        },
        {
            "functions":['mean'],
            'feature':'people_vaccinated',
            'shift':3,
            'window':3
        }
    ]
}

In [17]:
# get features

In [18]:
df = pd.concat(ray.get(master.resolve.remote(metadata)),axis=1)

In [19]:
df

Unnamed: 0,country,iso_code,total_vaccinations,people_vaccinated,total_vaccinations_mean_shift_7_window_7,total_vaccinations_median_shift_7_window_7,people_vaccinated_mean_shift_3_window_3
0,Albania,ALB,0.0,0.0,,,
1,Albania,ALB,0.0,0.0,,,
2,Albania,ALB,128.0,128.0,,,
3,Albania,ALB,188.0,188.0,,,
4,Albania,ALB,266.0,266.0,,,
...,...,...,...,...,...,...,...
5684,Zimbabwe,ZWE,31325.0,31325.0,5259.000000,4041.0,21792.000000
5685,Zimbabwe,ZWE,32014.0,32014.0,7502.571429,7872.0,24834.333333
5686,Zimbabwe,ZWE,32240.0,32240.0,10194.428571,11007.0,27901.666667
5687,Zimbabwe,ZWE,35518.0,35518.0,13071.857143,12579.0,29984.333333
