In [1]:
%config IPCompleter.use_jedi = False

In [2]:
import pandas as pd
train = pd.read_csv('../resources/data/train_ctrUa4K.csv')
test = pd.read_csv('../resources/data/test_lAUu6dG.csv')
train = train.drop('Loan_ID', axis=1)
train.dtypes

Gender                object
Married               object
Dependents            object
Education             object
Self_Employed         object
ApplicantIncome        int64
CoapplicantIncome    float64
LoanAmount           float64
Loan_Amount_Term     float64
Credit_History       float64
Property_Area         object
Loan_Status           object
dtype: object

In [3]:
train.shape

(614, 12)

In [4]:
X = train.drop('Loan_Status', axis=1)
y = train['Loan_Status']
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [5]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder

imputer = SimpleImputer(strategy='median')
scaler = StandardScaler()

numeric_transformer = Pipeline(steps=[
    ('imputer', imputer),
    ('scaler', scaler)])

cat_imputer = SimpleImputer(strategy='constant', fill_value='missing')
cat_onehot = OneHotEncoder(handle_unknown='ignore')

categorical_transformer = Pipeline(steps=[
    ('imputer', cat_imputer),
    ('onehot', cat_onehot)])

In [6]:
numeric_features = train.select_dtypes(include=['int64', 'float64']).columns
categorical_features = train.select_dtypes(include=['object']).drop(['Loan_Status'], axis=1).columns
from sklearn.compose import ColumnTransformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)])

In [7]:
import time

start = time.time()
Xt = preprocessor.fit(X_train)
end = time.time()
print('Time taken: ' + str(end - start))

Time taken: 0.015769004821777344


In [8]:
from sklearn.ensemble import RandomForestClassifier
rf = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', RandomForestClassifier())])

In [9]:
from sklearn.base import ClassifierMixin
from sklearn.base import BaseEstimator

class ScaleTestEstimator(ClassifierMixin, BaseEstimator):
    num_iters = 100
    classifier : ClassifierMixin = None

    def __init__(self, num_iters, classifier: ClassifierMixin):
        self.num_iters = num_iters
        self.classifier = classifier

    def fit(self, X, y):
        for i in range(self.num_iters):
            self.classifier.fit(X, y)
        return self
            
    def predict(self, X):
        return self.classifier.predict(X)

    def score(self, X, y, sample_weight=None):
        return self.classifier.score(X, y, sample_weight)

In [10]:
Xt = preprocessor.fit_transform(X_train)

In [11]:
from sklearn.metrics import accuracy_score, log_loss
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC, LinearSVC, NuSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
classifiers = [
    KNeighborsClassifier(3),
    SVC(kernel="rbf", C=0.025, probability=True),
    NuSVC(probability=True),
    DecisionTreeClassifier(),
    RandomForestClassifier(),
    AdaBoostClassifier(),
    GradientBoostingClassifier()
    ]

In [12]:
classifiers[0]

KNeighborsClassifier(n_neighbors=3)

In [13]:
c_a = ScaleTestEstimator(50, DecisionTreeClassifier())
c_b = ScaleTestEstimator(50, RandomForestClassifier())
c_c = ScaleTestEstimator(50, GradientBoostingClassifier())
classifiers = [c_a, c_b, c_c]

In [14]:
import sklearn.base as base

In [15]:
base.is_classifier(c_a)

True

In [16]:
base.clone(c_a)

ScaleTestEstimator(classifier=DecisionTreeClassifier(), num_iters=50)

In [17]:
import time
start = time.time()

c_a = ScaleTestEstimator(50, DecisionTreeClassifier())
c_b = ScaleTestEstimator(50, RandomForestClassifier())
c_c = ScaleTestEstimator(50, GradientBoostingClassifier())
classifiers = [c_a, c_b, c_c]

classifier_results=[]
for classifier in classifiers:
    pipe = Pipeline(steps=[('preprocessor', preprocessor),
                      ('classifier', classifier)])
    pipe.fit(X_train, y_train)
    pipe.predict(X_train)
    
end = time.time()
tt = end - start
print('time taken: ' + str(tt))

time taken: 12.419975996017456


In [18]:
import ray
ray.shutdown()

In [19]:
ray.init()

2021-05-19 15:19:38,097	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8266[39m[22m


{'node_ip_address': '9.163.5.112',
 'raylet_ip_address': '9.163.5.112',
 'redis_address': '9.163.5.112:18141',
 'object_store_address': '/tmp/ray/session_2021-05-19_15-19-37_462865_85457/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-05-19_15-19-37_462865_85457/sockets/raylet',
 'webui_url': '127.0.0.1:8266',
 'session_dir': '/tmp/ray/session_2021-05-19_15-19-37_462865_85457',
 'metrics_export_port': 61091,
 'node_id': '5cd42b8bf9af4e9f1f9f02eb2fc56e66880b2fba'}

In [20]:
from codeflare.pipelines.Datamodel import Xy
from codeflare.pipelines.Datamodel import XYRef
import codeflare.pipelines.Datamodel as dm

In [21]:
X_ref = ray.put(X_train)
y_ref = ray.put(y_train)

Xy_ref = XYRef(X_ref, y_ref)
Xy_ref_ptr = ray.put(Xy_ref)
Xy_ref_ptrs = [Xy_ref_ptr]

In [22]:
pipeline = dm.Pipeline()

In [23]:
node_a = dm.OrNode('preprocess', preprocessor)
node_b = dm.OrNode('c_a', c_a)
node_c = dm.OrNode('c_b', c_b)
node_d = dm.OrNode('c_c', c_c)

In [24]:
pipeline.add_edge(node_a, node_b)
pipeline.add_edge(node_a, node_c)
pipeline.add_edge(node_a, node_d)

In [25]:
import codeflare.pipelines.Runtime as rt

In [26]:
from codeflare.pipelines.Runtime import ExecutionType

In [27]:
start = time.time()


in_args={node_a: Xy_ref_ptrs}

out_args = rt.execute_pipeline(pipeline, ExecutionType.FIT, in_args)

node_b_out_args = ray.get(out_args[node_b])
node_c_out_args = ray.get(out_args[node_c])
node_d_out_args = ray.get(out_args[node_d])
end = time.time()
print ('Time taken: ' + str(end - start))

Time taken: 10.062227964401245


In [29]:
b_out_xyref = node_b_out_args[0]

In [30]:
b_out_node = ray.get(b_out_xyref.get_noderef())

In [31]:
b_out_node.get_estimator()

ScaleTestEstimator(classifier=DecisionTreeClassifier(), num_iters=50)

