<a href="https://colab.research.google.com/github/jeffheaton/present/blob/master/youtube/automl/simple-automl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Simple AutoML

Copyright 2023 by [Jeff Heaton](https://youtube.com/@HeatonResearch), LGPL


In [None]:
#DATA_SOURCE = "https://data.heatonresearch.com/data/t81-558/auto-mpg.csv"; TARGET = "mpg";IS_REGRESSION=True
#DATA_SOURCE = "https://data.heatonresearch.com/data/t81-558/iris.csv"; TARGET = "species";IS_REGRESSION=False
DATA_SOURCE = "https://data.heatonresearch.com/data/t81-558/crx.csv"; TARGET = 'a16'; IS_REGRESSION=False

# Analyze Data

The following code implements the ```analyze``` function that determines the types and encodings of all columns in the dataset.

In [None]:
from scipy.stats import shapiro
import pandas as pd

CONFIG_MAX_DUMMY = "max_dummy"
CONFIG_MAX_DUMMY_PCT = "pct_dummy"

CONFIG = {
  CONFIG_MAX_DUMMY: 1000,
  CONFIG_MAX_DUMMY_PCT: 0.75
}

def isnumeric(datatype):
  return datatype in [FIELD_TYPE_FLOAT,FIELD_TYPE_INT]

FIELDS = "fields"
FIELD_ACTION = "action"
FIELD_ACTION_COPY = "copy"
FIELD_ACTION_IGNORE = "ignore"
FIELD_ACTION_ZSCORE = "zscore"
FIELD_ACTION_NORMALIZE = "normalize"
FIELD_ACTION_DUMMY = "dummy"
FIELD_ACTION_TARGET = "target"
FIELD_NAME = "name"
FIELD_SUM = "sum"
FIELD_TYPE = "type"
FIELD_MEAN = "mean"
FIELD_NUM = "n"
FIELD_MISSING = "missing"
FIELD_MIN = "min"
FIELD_MAX = "max"
FIELD_VAR = "var"
FIELD_SD = "sd"
FIELD_UNIQUE = "unique"
FIELD_MEDIAN = "median"
FIELD_MODE = "mode"
FIELD_SHAPIRO_STAT = "shapiro-stat"
FIELD_SHAPIRO_P = "shapiro-p"
META_TARGET = "target"
META_TYPE = "type"
META_TYPE_BINARY_CLASSIFICATION = "binary-classification"
META_TYPE_CLASSIFICATION = "classification"
META_TYPE_REGRESSION = "regression"
META_SOURCE = "source"
META_POSITIVE_TOKEN = "positive-token"
META_EARLY_STOP = "early-stop"

FIELD_TYPE_FLOAT = "float"
FIELD_TYPE_INT = "int"
FIELD_TYPE_STR = "str"

def find_positive(s):
  s = set(s.str.upper().tolist())
  if len(s) != 2: return None
  if "+" in s and "-" in s: return "+"
  if "0" in s and "1" in s: return "1"
  if "t" in s and "f" in s: return "t"
  if "y" in s and "n" in s: return "y"
  if "true" in s and "false" in s: return "true"
  if "yes" in s and "no" in s: return "yes"
  if "p" in s and "n" in s: return "p"
  if "positive" in s and "negative" in s: return "positive"
  s = list(s)
  s.sort()
  return s[0]

def analyze(data_source, target, is_regression=True):
  df = pd.read_csv(data_source,na_values=['NA', '?'])

  metadata = {
      FIELDS: {},
      META_TARGET: target,
      META_SOURCE: data_source,
      META_EARLY_STOP: True
  }

  fields = metadata[FIELDS]

  for field_name,csv_type in zip(df.columns,df.dtypes):
    #print(name,csv_type)
    if "float" in csv_type.name:
      dtype = FIELD_TYPE_FLOAT
      action = FIELD_ACTION_COPY
    elif "int" in csv_type.name:
      dtype = FIELD_TYPE_INT
      action = FIELD_ACTION_COPY
    else:
      dtype = FIELD_TYPE_STR
      action = FIELD_ACTION_IGNORE

    missing_count = sum(df[field_name].isnull())
    col = df[field_name]
    unique_count = len(pd.unique(col))

    if isnumeric(dtype):
      stat, p = shapiro(col)

      # less than or equal to 0.05 not normal
      action = FIELD_ACTION_ZSCORE if p>0.05 else FIELD_ACTION_NORMALIZE

      fields[field_name] = {
          FIELD_TYPE:dtype,
          FIELD_MEDIAN:col.median(),
          FIELD_MEAN:col.mean(),
          FIELD_SD:col.std(),
          FIELD_MAX:col.max(),
          FIELD_MIN:col.min(),
          FIELD_SHAPIRO_STAT:stat,
          FIELD_SHAPIRO_P:p,
          FIELD_ACTION:action,
          FIELD_MISSING:missing_count,
          FIELD_UNIQUE:unique_count}

    else:
      fields[field_name] = {
          FIELD_TYPE:dtype,
          FIELD_MODE:col.mode()[0],
          FIELD_ACTION:action,
          FIELD_MISSING:missing_count,
          FIELD_UNIQUE:unique_count}

    # Determine action
    field = fields[field_name]
    if (field[FIELD_TYPE] == FIELD_TYPE_STR) and (field[FIELD_UNIQUE]<CONFIG[CONFIG_MAX_DUMMY]) and (field[FIELD_UNIQUE]/len(df)<CONFIG[CONFIG_MAX_DUMMY_PCT]):
      field[FIELD_ACTION] = FIELD_ACTION_DUMMY
    if field_name == target:
      field[FIELD_ACTION] = FIELD_ACTION_TARGET
  
  # Determine model type
  is_binary = (metadata[FIELDS][target][FIELD_UNIQUE]==2) and not is_regression

  if is_regression:
    metadata[META_TYPE] = META_TYPE_REGRESSION
  else:
    if metadata[FIELDS][target][FIELD_UNIQUE]==2:
      metadata[META_TYPE] = META_TYPE_BINARY_CLASSIFICATION

      metadata[META_POSITIVE_TOKEN] = find_positive(df[target])
    else:
      metadata[META_TYPE] = META_TYPE_CLASSIFICATION

  return metadata

COLS = [FIELD_MEAN, FIELD_SD, FIELD_MEDIAN, FIELD_MODE, FIELD_MAX, FIELD_ACTION, FIELD_UNIQUE, FIELD_SHAPIRO_P,FIELD_MISSING]

def field_summary(metadata, cols=COLS):
  data = {}

  data['name'] = []
  for col in cols:
    data[col] = []

  for field_name in metadata[FIELDS]:
    field = metadata[FIELDS][field_name]
    data['name'].append(field_name)
    for col in cols:
      data[col].append(field.get(col, None))

  return pd.DataFrame(data)[['name']+COLS]  

# Generate Code

The following code generates Keras Python code for the data that was analzyed in the previous step.

In [None]:
from dataclasses import MISSING
from pandas.core.dtypes.inference import is_re
def tolist(obj):
  if isinstance(obj,list) or isinstance(obj, tuple):
    return obj
  else:
    return [obj]

class PythonFile:
  def __init__(self):
    self.imports = []
    self.lines = []

  def add_import(self, name, alias=None):
    if alias:
      self.imports.append({"name": name, "alias": alias})
    else:
      self.imports.append({"name": name})

  def add_from(self, _from, _import):
    self.imports.append({"from": _from, "import": _import})
    
  def generate(self):
    src = ""
    for obj in self.imports:
      if "name" in obj and "alias" in obj:
        src += f"import {obj['name']} as {obj['alias']}"
      elif "name" in obj and "alias" not in obj:
        src += f"import {obj['name']}"
      elif "from" in obj and "import" in obj:
        imports = ", ".join(tolist(obj['import']))
        src += f"from {obj['from']} import {imports}"

      src+="\n"

    for line in self.lines:
      src+=line+"\n"
    return src

  def add_line(self, str):
    self.lines.append(str)

  def comment(self, str):
    return f"# {str}"

  def call(self, name, *args):
    src = name + "("

    formatted_args = []
    started_named = False
    for arg in args:
      if isinstance(arg,dict):
        formatted_args += [f"{name}={arg[name]}" for name in arg.keys()]
        started_named = True
      else: 
        if started_named: raise ValueError("positional argument follows keyword argument")
        formatted_args.append(str(arg))

    src += ", ".join(formatted_args)
    src += ")"
    return src

  def assign(self, left, right):
    return f"{left} = {right}"

  def str(self, str):
    return f"\"{str}\""

  def index(self, name, indexes, dot=None):
    src = name
    for idx in indexes:
      src+=f'[{idx}]'

    if dot:
      src+='.'
      src+=dot
    return src
    
def generate_keras(metadata):
  na_values = ['NA', '?']
  target = metadata[META_TARGET]
  is_regression = metadata[META_TYPE] == META_TYPE_REGRESSION
  is_binary = (metadata[FIELDS][target][FIELD_UNIQUE]==2) and (metadata[META_TYPE]==META_TYPE_CLASSIFICATION)

  if metadata[META_TYPE] == META_TYPE_REGRESSION:
    loss = "mean_squared_error"
  elif metadata[META_TYPE] == META_TYPE_BINARY_CLASSIFICATION:
    loss = "binary_crossentropy"
  else:
    loss = "categorical_crossentropy"

  py = PythonFile()
  # Imports
  py.add_import("pandas", "pd")
  py.add_import("io")
  py.add_import("requests")
  py.add_import("numpy", "np")
  py.add_from("tensorflow.keras.models", "Sequential")
  py.add_from("tensorflow.keras.layers", ["Dense", "Activation"])
  py.add_from("tensorflow.keras.callbacks", "EarlyStopping")
  py.add_from("scipy.stats", "zscore")
  py.add_from("sklearn.preprocessing", "MinMaxScaler")

  py.add_line(py.assign("df", py.call("pd.read_csv",py.str(metadata[META_SOURCE]),{'na_values':na_values})))
  x_fields = [x for x in metadata[FIELDS] if x != target and metadata[FIELDS][x][FIELD_ACTION] in [FIELD_ACTION_COPY]]
  py.add_line(py.assign("x_fields",x_fields))
  
  # Analyze input columns
  for field_name in metadata[FIELDS]:
    field = metadata[FIELDS][field_name]
    if field[FIELD_MISSING]>0:
      if isnumeric(field[FIELD_TYPE]):
        fn = "median"
        suffix = ""
      else:
        fn = "mode"
        suffix = "[0]"
      py.add_line(py.assign(py.index("df",[py.str(field_name)]),
                py.index("df",[py.str(field_name)],py.call("fillna",
                py.index("df",[py.str(field_name)],py.call(fn)+suffix)
                ))))
    if field[FIELD_ACTION] == FIELD_ACTION_ZSCORE:
      py.add_line(py.assign(py.index("df",[py.str(field_name)]),
                py.call("zscore",py.index("df",[py.str(field_name)]))))
      py.add_line(py.call("x_fields.append",py.str(field_name)))
    elif field[FIELD_ACTION] == FIELD_ACTION_NORMALIZE:
      f1 = py.index("df",[py.str(field_name)])
      f2 = py.index("df",[[field_name]])
      py.add_line(py.assign(f1,py.call("MinMaxScaler().fit_transform",f2)))
      py.add_line(py.call("x_fields.append",py.str(field_name)))
    elif field[FIELD_ACTION] == FIELD_ACTION_DUMMY:
      py.add_line(py.assign("dummies", 
            py.call("pd.get_dummies",
            py.index('df',[py.str(field_name)]),
            {'prefix':py.str(field_name),'drop_first':'True'})))
      py.add_line("df = pd.concat([df,dummies],axis=1)")
      py.add_line("x_fields += dummies.columns.tolist()")
      


  py.add_line(py.assign("x",py.index("df",["x_fields"],"values")))

  if metadata[META_TYPE] == META_TYPE_CLASSIFICATION:
    py.add_line(py.assign("dummies", py.call("pd.get_dummies", py.index("df", [py.str(target)]))))
    py.add_line(py.assign("species", "dummies.columns"))
    py.add_line(py.assign("y", "dummies.values"))
  elif metadata[META_TYPE] == META_TYPE_BINARY_CLASSIFICATION:
    t = py.index("df",[py.str(target)])
    pos = metadata[META_POSITIVE_TOKEN]
    py.add_line(py.assign(t,f"({t}=={py.str(pos)}).astype(int)"))
    py.add_line(py.assign("y", f"df.{target}.values"))
  else:
    py.add_line(py.assign("y", f"df.{target}.values"))

  py.add_line(py.comment("Construct model"))
  # Early stop
  if metadata[META_EARLY_STOP]:
    x_train, y_train, x_test, y_test = "x_train", "y_train", "x_test", "y_test"
    py.add_from("sklearn.model_selection", "train_test_split")
    py.add_line(py.comment("Split into validation and training sets"))
    py.add_line(py.assign(f"{x_train}, {x_test}, {y_train}, {y_test}",
        py.call("train_test_split","x","y",{"test_size":0.25,"random_state":42})))
  else:
    x_train, y_train, x_test, y_test = "x", "y", "x", "y"

  py.add_line(py.assign("model",py.call("Sequential")))
  py.add_line(py.call("model.add", py.call("Dense",50,{"input_dim":"x.shape[1]", "activation":py.str('relu')})))
  py.add_line(py.call("model.add", py.call("Dense",25,{"activation":py.str('relu')})))
  if metadata[META_TYPE] == META_TYPE_REGRESSION:
    py.add_line(py.call("model.add", py.call("Dense","1")))
  elif metadata[META_TYPE] == META_TYPE_BINARY_CLASSIFICATION:
    py.add_line(py.call("model.add", py.call("Dense","1",{"activation":py.str('sigmoid')})))
  else:  
    py.add_line(py.call("model.add", py.call("Dense","y.shape[1]",{"activation":py.str('softmax')})))
  py.add_line(py.call("model.compile", {"loss":py.str(loss), "optimizer":py.str('adam')}))

  py.add_line(py.comment("Train model"))
  if metadata[META_EARLY_STOP]:
    py.add_line(py.assign("monitor",py.call("EarlyStopping",{"monitor":py.str('val_loss'), "min_delta":"1e-3", "patience":5, 
        "verbose":1, "mode":py.str('auto'), "restore_best_weights":True})))
    py.add_line(py.call("model.fit", x_train, y_train, 
        {"validation_data": f"({x_test},{y_test})", 'callbacks':'[monitor]', 'verbose':'2','epochs':1000}))
  else:
    py.add_line(py.call("model.fit", x_train, y_train, {'verbose':'2','epochs':100}))
  
  py.add_line(py.comment("Evaluate model"))
  py.add_from("sklearn", "metrics")
  py.add_line(py.assign("pred", py.call("model.predict", x_test)))
  if metadata[META_TYPE] == META_TYPE_REGRESSION:
    py.add_line(py.comment("Measure RMSE error.  RMSE is common for regression."))
    py.add_line(py.assign("score", py.call("np.sqrt", py.call("metrics.mean_squared_error", "pred", y_test))))
    py.add_line("print(f\"Root mean square (RMSE): {score}\")")
  if metadata[META_TYPE] == META_TYPE_CLASSIFICATION:
    py.add_line(py.assign("predict_classes", py.call("np.argmax", "pred", {"axis":1})))
    py.add_line(py.assign("expected_classes", py.call("np.argmax", y_test, {"axis":1})))
    py.add_line(py.assign("correct", py.call("accuracy_score", "expected_classes", "predict_classes")))
    py.add_line("print(f\"Accuracy: {correct}\")")
  elif metadata[META_TYPE] == META_TYPE_BINARY_CLASSIFICATION :
    py.add_line(py.assign("predict_classes", py.call("np.argmax", "pred", {"axis":1})))
    py.add_line(py.assign("correct", py.call("accuracy_score", y_test, "predict_classes")))
    py.add_line("print(f\"Accuracy: {correct}\")")
    py.add_line(py.assign("fpr, tpr, thresholds", py.call("metrics.roc_curve", y_test, "pred", {"pos_label":1})))
    py.add_line(py.assign("score",py.call("metrics.auc", "fpr", "tpr")))
    py.add_line("print(f\"Area Under Curve: {score}\")")
  if metadata[META_TYPE] == META_TYPE_CLASSIFICATION or metadata[META_TYPE] == META_TYPE_BINARY_CLASSIFICATION :
    py.add_from("sklearn.metrics", "accuracy_score")
    py.add_line(py.assign("score", py.call("metrics.log_loss", y_test, "pred", {'eps': 1e-7})))
    py.add_line("print(f\"Log loss: {score}\")")

  return py.generate()

# Running the AutoML Generator

We begin by analyzing the dataset specified at the top of this notebook. We display the summary statustics on the dataset. You can change the "action" for any of these objects if you do not like the preprocessing action detected.

In [None]:
metadata = analyze(DATA_SOURCE, TARGET, IS_REGRESSION)
print(metadata)
summary = field_summary(metadata)
display(summary)

Next we generate the code.

In [None]:
metadata[META_EARLY_STOP] = False
python_code = generate_keras(metadata)
print(python_code)