# DATA

"Divorce Predictors data set Data Set" dataset publicly available [here](https://archive.ics.uci.edu/ml/datasets/Divorce+Predictors+data+set).

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/00497/divorce.rar -P ml/input/data/training/
!cd ml/input/data/training/ && unrar e divorce.rar
!cd ml/input/data/training/ && rm divorce.rar && rm divorce.xlsx

# EDA (Exploratory Data Analysis)

In [None]:
input_dir = 'ml/input/data/training/divorce.csv'

In [None]:
import pandas as pd

df = pd.read_csv(input_dir, sep=';')

In [None]:
df.head()

In [None]:
df.describe()

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 10))
# calculate the correlation matrix
corr = df.corr()

# plot the heatmap
sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns, annot=False)


# PoC / Benchmark

## Install some stuff

In [None]:
!pip install dask[dataframe]

## Define some helper functions

In [None]:
import json
import pickle
import tarfile
from abc import ABC, abstractmethod
from io import StringIO
from pathlib import Path
from typing import Dict

import dask.dataframe as dd
import yaml
from dask.dataframe import from_delayed
from dask.delayed import delayed
from pandas import read_csv


class BaseFile(ABC):

    @staticmethod
    @abstractmethod
    def read(origin, **kwargs):
        pass

    @staticmethod
    @abstractmethod
    def write(destination, content):
        pass

class PickleFile(BaseFile):

    @staticmethod
    def read(origin: str, **kwargs) -> dict:
        print(f"read data from {origin}")
        with open(origin, 'rb') as file:
            documents = pickle.load(file)
        return documents

    @staticmethod
    def write(destination: str, content) -> None:
        print(f"save data to {destination}")
        with open(destination, 'wb') as file:
            pickle.dump(content, file)


class TarFile:

    @staticmethod
    def uncompress(origin: str, path: str):
        with tarfile.open(origin) as tar:
            tar.extractall(path=path)

    @staticmethod
    def compress(destination: str, content: Dict[str, str]) -> None:
        with tarfile.open(destination, "w:gz") as tar:
            for local_path, tar_path in content.items():
                tar.add(local_path, arcname=tar_path)



## Experiment logic

In [None]:
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict

@dataclass
class ExperimentArtifacts:
    run_tag: str
    model_name: str
    base_path: Path

    def _create_if_not_exist(self):
        Path(self.output_prefix).mkdir(parents=True, exist_ok=True)

    @property
    def output_prefix(self):
        return self.base_path / self.model_name

    def generate_artifacts(self, metrics: Dict[str, Any]):
        self.metrics = metrics
        print(metrics)
        metric_names = []
        for k in metrics.keys():
            metric_names.append(k)

    def save_results(self):
        self._create_if_not_exist()

        metrics_path = str(self.output_prefix / 'metrics.pkl')
        PickleFile.write(metrics_path, self.metrics)

    def save(self):
        self.save_results()

    # Create single output for Sagemaker training-job
    @property
    def model_package_path(self):
        return self.base_path / 'model.tar.gz'

    def create_package_with_models(self):
        print(f"Loading models from {self.base_path}")
        model_paths = {}
        for p in sorted(self.base_path.glob("**/*joblib")):
            tar_path = f"{p.parent.name}/{p.name}"
            model_paths[str(p)] = tar_path
        TarFile.compress(self.model_package_path, model_paths)

In [None]:
from dataclasses import dataclass
from pathlib import Path
from typing import Any

import numpy as np
from sklearn.model_selection import train_test_split


@dataclass
class Experiment:
    model: Any
    input_dir: Path
    artifacts_handler: ExperimentArtifacts
    training_portion: float = .8
    random_state: int = 42

    def load_data(self):
        data = np.genfromtxt(self.input_dir, delimiter=';', skip_header=1)
        self.X, self.y = data[:, :-1], data[:, -1]

    def split_data(self):
        self.X_train, self.X_validation, self.y_train, self.y_validation = train_test_split(
            self.X,
            self.y,
            train_size=self.training_portion,
            random_state=self.random_state,
            stratify=self.y,
        )

    def train(self):
        self.history = self.model.train(
            self.X_train,
            self.y_train,
            self.X_validation,
            self.y_validation,
        )

    def save(self):
        self.artifacts_handler.save()
        self.model.save(self.artifacts_handler.output_prefix)

    def run(self):
        self.load_data()
        self.split_data()

        self.train()

        self.artifacts_handler.generate_artifacts(
            self.history
        )
        self.save()
        self.artifacts_handler.create_package_with_models()

## Build my model

In [None]:
from abc import ABC, abstractmethod
from dataclasses import dataclass

import joblib
from sklearn import svm
from sklearn.base import BaseEstimator
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.pipeline import Pipeline


class MLModel(ABC):

    @property
    @abstractmethod
    def model_id(self) -> str:
        pass

    @abstractmethod
    def save(self):
        pass

    @abstractmethod
    def load(self):
        pass

    @abstractmethod
    def predict(self, data):
        pass


@dataclass
class ProjectModel(MLModel):
    model: BaseEstimator = None

    @property
    def model_id(self) -> str:
        return "divorce"

    def save(self, model_prefix):
        self.model_path = model_prefix / 'model.joblib'
        joblib.dump(self.model, self.model_path)
        return self.model_path

    def load(self, model_prefix):
        self.model_path = model_prefix / self.model_id / 'model.joblib'
        try:
            self.model = joblib.load(self.model_path)
        except FileExistsError as e:
            print(e)
        except Exception as e:
            raise e
        return self.model

    def __build_model(self):
        anova_filter = SelectKBest(f_regression, k=5)
        clf = svm.SVC(kernel='linear')
        anova_svm = Pipeline(
            [
                ('anova', anova_filter),
                ('svc', clf)
            ]
        )
        self.model = anova_svm.set_params(
            anova__k=10,
            svc__C=.1,
        )

    def train(
        self,
        X_train,
        y_train,
        X_validation,
        y_validation,
    ):
        self.__build_model()

        self.model.fit(
            X_train,
            y_train,
        )
        score = self.model.score(X_validation, y_validation)
        metrics = {
            'score': score,
        }
        return metrics

    def predict(self, ids, X):
        prediction = self.model.predict(X)
        return {i: pred for i, pred in zip(ids, prediction)}

## Run experiment

In [None]:
import datetime
import time
from pathlib import Path

from collections import namedtuple


def get_arguments():
    arguments = {}
    arguments["input_dir"] = Path("ml/input/data/training/divorce.csv")
    arguments["output_dir"] = Path("ml/output")
    arguments["project_name"] = "divorce-predictor"
    arguments["run_tag"] = datetime.datetime \
            .fromtimestamp(time.time()) \
            .strftime('%Y-%m-%d-%H%M%S')
    print(type(arguments.keys()),arguments.keys())
    args = namedtuple('args', list(arguments.keys()))
    return args(**arguments)

print(f"Begin train")

args = get_arguments()

run_tag = f"{args.project_name}-{args.run_tag}"
dataset_path = args.input_dir

model_name = dataset_path.stem
model = ProjectModel()

artifacts_handler = ExperimentArtifacts(
    run_tag=run_tag,
    model_name=model_name,
    base_path=args.output_dir,
)
experiment = Experiment(
    model=model,
    input_dir=dataset_path,
    artifacts_handler=artifacts_handler,
)
experiment.run()

print(f"End train")