In [1]:
from azureml.core import Workspace, Experiment
from azureml.train.sklearn import SKLearn
from azureml.core.authentication import InteractiveLoginAuthentication

import pandas as pd
import numpy as np
import datetime 
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer
from sklearn.decomposition import PCA

In [2]:
interactive_auth = InteractiveLoginAuthentication(tenant_id="39288a38-ff19-432c-8011-1cd9d0dff445")
ws = Workspace(subscription_id="793146d9-d4dc-4a73-9728-76c4ffd0cc0d", resource_group="rg_dynamics_test", workspace_name="resdynml1test", auth=interactive_auth)

In [3]:
%%writefile ./src/pipe.py

from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.decomposition import PCA
import numpy as np

class DataFrameSelector(BaseEstimator, TransformerMixin):
    def __init__(self, attribute_names, dtype):
        self.attribute_names = attribute_names
        self.dtype = dtype
    def fit(self, X, y=None):
        return self        
    def transform(self, X):
        return X[self.attribute_names].astype(self.dtype).values

class MultiHotEncoder(BaseEstimator, TransformerMixin):
    def __init__(self, delimiter=None):
        self.delimiter = delimiter
    def fit(self, X, y=None):
        self.col_cats = {}
        for col in range(X.shape[1]):
            cats = set()
            for row in range(X.shape[0]):
                if self.delimiter:
                    for cat in X[row,col].split(self.delimiter):
                        if not cat.strip() == '':
                            cats.add(cat.strip())
                else:
                    cats.add(X[row,col])
            self.col_cats[col] = list(cats)
        return self
    def transform(self, X):
        X_tr = []
        for col in range(X.shape[1]):
            X_enc = np.zeros([X.shape[0], len(self.col_cats[col])])
            for row in range(X.shape[0]):
                if self.delimiter:
                    cats = str(X[row,col]).split(self.delimiter)
                    for col_cat_idx in range(len(self.col_cats[col])):
                        if self.col_cats[col][col_cat_idx] in cats:
                            X_enc[row, col_cat_idx] = 1
                else:
                    for col_cat_idx in range(len(self.col_cats[col])):
                        if self.col_cats[col][col_cat_idx] == X[row,col]:
                            X_enc[row, col_cat_idx] = 1
            X_enc = np.array(X_enc)
            X_tr.append(X_enc)
        X_tr = np.concatenate(X_tr, axis=1)
        return X_tr
    
def create_pipeline(cfg):    
    # Pipeline for multilabel features
    multi_pipe = Pipeline([
        ('multi_feat_select', DataFrameSelector(cfg['multi_cols'], str)),
#         ('multi_replace_missing', SimpleImputer(missing_values=np.nan, strategy='constant', fill_value=tuple())),
        ('multi_encode', MultiHotEncoder(delimiter=' '))
    ])
    
    # Pipeline for target features
    target_pipe = Pipeline([
        ('target_select', DataFrameSelector(cfg['target_cols'], str)),
#         ('multi_replace_missing', SimpleImputer(missing_values=np.nan, strategy='constant', fill_value=tuple())),
        ('target_encode', MultiHotEncoder(delimiter=' '))
    ])

#   # Pipeline for categories
#     cat_pipe = Pipeline([
#         ('cat_feature_select', DataFrameSelector(cfg['cat_cols'])),
#         ('cat_replace_missing', SimpleImputer(missing_values=np.nan, strategy='constant', fill_value='0')),
#         ('cat_one_hot_encode', OneHotEncoder(sparse=False))
#     ])

#     # Pipeline for numericals
#     num_pipe = Pipeline([
#         ('num_feature_select', DataFrameSelector(cfg['num_cols'])),
#         ('num_replace_missing', SimpleImputer(missing_values=np.nan, strategy='mean')),
#         #('num_normalization', MinMaxScaler())
#         ('num_standardization', StandardScaler())
#     ])

    feat_union = FeatureUnion([
#         ('num_features', num_pipe),
#         ('cat_features', cat_pipe),
        ('multi_features', multi_pipe)
    ])
    
    all_feat_pipe = Pipeline([
        ('all_features_pipe', feat_union),
        ('all_feautres_pca', PCA(n_components=0.8, svd_solver = 'full'))
    ])
    
    pipeline = FeatureUnion([
        ("all_feat_pipe", all_feat_pipe),
        ("target_pipe", target_pipe)
    ])

    return pipeline

Overwriting ./src/pipe.py


In [7]:
%%writefile ./src/prep.py

from azureml.core import Run
import pandas as pd
import datetime
from pipe import create_pipeline
import os
import numpy as np
import joblib

run = Run.get_context()

# load datasets
df_symptoms = run.input_datasets['symptomcodes'].to_pandas_dataframe()
df = run.input_datasets['df'].to_pandas_dataframe()

run.log('# rows before', len(df))

###########################################################

# get only data from last t years
t = 5
df = df[df['Job Card.Date Start Work']>(datetime.datetime.today() - datetime.timedelta(days=t*365))]

run.log('# rows for last ' +str(t) + ' years', len(df))

############################################################

# clean data
df = df.replace(['', '0', '-', '000','N/A'], np.nan)
df = df.dropna().reset_index(drop=True)

run.log('# rows after cleaning', len(df))

#############################################################################

# combine Component/Failure Code in train data
df = pd.concat([df, pd.DataFrame(df.apply(lambda x: (x['Job Card.ComponentCode'],x['Job Card.FailureCode']), axis=1), columns=['CompFail'])], axis=1)

# combine Component/Failure Code in symptom table
df_symptoms = df_symptoms[['ComponentCode', 'FailureCode', 'Symptom1', 'Symptom2', 'Symptom3', 'Symptom4']]
df_symptoms = pd.concat([df_symptoms, pd.DataFrame(df_symptoms.apply(lambda x: (x['ComponentCode'],x['FailureCode']),axis=1), columns=['CompFail'])],axis=1)

# merge train data on symptoms
df = pd.merge(df, df_symptoms, on='CompFail', how='left')
df = pd.concat([df, pd.DataFrame(df[['Symptom1', 'Symptom2', 'Symptom3', 'Symptom4']].apply(lambda x: tuple([ x[col] for col in ['Symptom1','Symptom2','Symptom3','Symptom4'] if str(x[col]) != 'None' ]), axis=1), columns=['Symptoms'])], axis=1)

dfs = { n : g for n,g in df.groupby('Installed Base.InstalledBase ProductID')}

# merge into one row per case
dfs = { k : dfs[k].groupby('Job Card.JobCard Number').apply(lambda x: pd.Series({
#     'ProductGroup': tuple(x['Installed Base.Product Group'].unique()),
    'ProductGroup': ' '.join(x['Installed Base.Product Group'].unique()),
#     'ProductId': tuple(x['Installed Base.InstalledBase ProductID'].unique()),
    'ProductId': ' '.join(x['Installed Base.InstalledBase ProductID'].unique()),
    'Country': x['Location.Country'].unique()[0],
    'City': x['Location.City'].unique()[0],
    'LocationType': x['Location.Location Type'].unique()[0],
    'PostalCode': x['Location.Postal Code'].unique()[0],
#     'ProductName': tuple(x['Product.Product Name'].unique()), 
    'ProductName': ' '.join(x['Product.Product Name'].unique()), 
#     'ProductNr': tuple(x['Product.Product Number'].unique()),
    'ProductNr': ' '.join(x['Product.Product Number'].unique()),
#     'Quantity': tuple((x['Product.Product Number']),x['ItemResourceAppliedQuantity']),
    'Start': x['Job Card.Date Start Work'].unique()[0],
    'End': x['Job Card.Date End Work'].unique()[0],
    'Symptoms': ' '.join(map(str, list(set(x['Symptoms'].sum()))))
  })).reset_index() for k in dfs }

dfs = { k : dfs[k] for k in dfs if len(dfs[k]) > 100}

dfs_train = {}
dfs_test = {}

columns = ['CaseId','ProductGroup','ProductId','Country','City','LocationType','PostalCode','ProductName','ProductNr','Start','End','Symptoms']

for k in dfs:
    dfs_train[k] = pd.DataFrame(dfs[k].iloc[:-100].reset_index(drop=True).values.tolist(), columns=columns)
    dfs_test[k] = pd.DataFrame(dfs[k].iloc[-100:].reset_index(drop=True).values.tolist(), columns=columns)

#############################################################################

# select columns for training
cfg = {}
cfg['multi_cols'] = ['ProductGroup', 'Symptoms'] #['ProductGroup', 'ProductId', 'Symptoms']
cfg['cat_cols'] = ['Country', 'City', 'LocationType', 'PostalCode']
cfg['date_cols'] = ['Start', 'End']
cfg['num_cols'] = []
cfg['target_cols'] = ['ProductNr']

dfs_train_tr = {}
dfs_test_tr = {}
pipes = {}

for k in dfs_train:
    pipes[k] = create_pipeline(cfg)
    dfs_train_tr[k] = pipes[k].fit_transform(dfs_train[k])
    dfs_test_tr[k] = pipes[k].transform(dfs_test[k])
    columns = [ 'feat_' + str(i) if i < dfs_train_tr[k].shape[1]-len(pipes[k].transformer_list[1][1].named_steps['target_encode'].col_cats[0]) else 'target_' + str(i) for i in range(dfs_train_tr[k].shape[1]) ]
    dfs_train_tr[k] = pd.DataFrame(dfs_train_tr[k], columns=columns)
    dfs_test_tr[k] = pd.DataFrame(dfs_test_tr[k], columns=columns)

data = (dfs_train_tr, dfs_test_tr, pipes)    

############################################################################

# save prepared data to csv
os.makedirs('outputs', exist_ok=True)
# df_train.to_csv('./outputs/train_data.csv', sep=';', header=True, index=False)
# df_test.to_csv('./outputs/test_data.csv', sep=';', header=True, index=False)
joblib.dump(data, './outputs/data')

############################################################################

run.complete()

Overwriting ./src/prep.py


In [8]:
est = SKLearn(entry_script='prep.py', source_directory='src', 
              inputs=[   ws.datasets['symptomcodes.csv'].as_named_input('symptomcodes'), 
                         ws.datasets['ItemResourceData.csv'].as_named_input('df')       ],
              pip_packages=['pyarrow==0.12.0 '], compute_target='local')

In [9]:
exp = Experiment(ws, 'ProductPredictionOnePerID')
run = exp.submit(est)
run.wait_for_completion(show_output=True)

RunId: ProductPredictionOnePerID_1592243223_7bd77c20
Web View: https://ml.azure.com/experiments/ProductPredictionOnePerID/runs/ProductPredictionOnePerID_1592243223_7bd77c20?wsid=/subscriptions/793146d9-d4dc-4a73-9728-76c4ffd0cc0d/resourcegroups/rg_dynamics_test/workspaces/resdynml1test

Streaming azureml-logs/70_driver_log.txt

Entering context manager injector. Current time:2020-06-15T17:47:05.861226
Starting the daemon thread to refresh tokens in background for process with pid = 9
Entering Run History Context Manager.
Preparing to call script [ prep.py ] with arguments: []
After variable expansion, calling script [ prep.py ] with arguments: []

  explained_variance_ratio_ = explained_variance_ / total_var
  explained_variance_ratio_ = explained_variance_ / total_var
Starting the daemon thread to refresh tokens in background for process with pid = 9


The experiment completed successfully. Finalizing run...
Logging experiment finalizing status in history service.
Cleaning up all outs

{'runId': 'ProductPredictionOnePerID_1592243223_7bd77c20',
 'target': 'local',
 'status': 'Completed',
 'startTimeUtc': '2020-06-15T17:47:04.998691Z',
 'endTimeUtc': '2020-06-15T17:57:55.239555Z',
 'properties': {'_azureml.ComputeTargetType': 'local',
  'ContentSnapshotId': '134d9377-c3e1-49ec-a49e-410d6cd7b528'},
 'inputDatasets': [{'dataset': {'id': '02e6cb83-4d0c-42b2-bbef-e103c74b3a3c'}, 'consumptionDetails': {'type': 'RunInput', 'inputName': 'df', 'mechanism': 'Direct'}}, {'dataset': {'id': '88af5740-1a1b-4e09-8129-d3c538680909'}, 'consumptionDetails': {'type': 'RunInput', 'inputName': 'symptomcodes', 'mechanism': 'Direct'}}],
 'runDefinition': {'script': 'prep.py',
  'useAbsolutePath': False,
  'arguments': [],
  'sourceDirectoryDataStore': None,
  'framework': 'Python',
  'communicator': 'None',
  'target': 'local',
  'dataReferences': {},
  'data': {'df': {'dataLocation': {'dataset': {'id': '02e6cb83-4d0c-42b2-bbef-e103c74b3a3c',
      'name': None,
      'version': None},
    

In [12]:
# azureml-core 1.0.72 oder höher erforderlich
from azureml.core import Workspace, Dataset

subscription_id = '793146d9-d4dc-4a73-9728-76c4ffd0cc0d'
resource_group = 'rg_dynamics_test'
workspace_name = 'resdynml1test'

workspace = Workspace(subscription_id, resource_group, workspace_name)

dataset = Dataset.get_by_name(workspace, name='oneperid_data')
dataset.download(target_path='.', overwrite=False)

Credentials are not provided to access data from source. Please sign in using identity with required permission granted.
To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code CPMP5RKVQ to authenticate.


ExecutionError: 
Error Code: ScriptExecution.StreamAccess.Authentication
Failed Step: 20e862f3-15f9-4f57-a294-aca69428a761
Error Message: ScriptExecutionException was caused by StreamAccessException.
  StreamAccessException was caused by AuthenticationException.
    Identity authentication failed for 'AzureBlob GetReference' operation at 'https://resdynml1test6456542521.blob.core.windows.net/azureml/ExperimentRun/dcid.ProductPredictionOnePerID_1592243223_7bd77c20/outputs/data' with '403: AuthenticationFailed'. Please make sure the compute or login identity has 'Storage Blob Data Reader' or 'Storage Blob Data Owner' role in the storage IAM.
      Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature.
| session_id=ad4d3679-b067-42a3-a32f-0d092f46bb23