# Logistic Regression
---
This notebook uses Cirrus to run logistic regression on the Criteo dataset.

## Setup
---

In [1]:
# To ease development, each time a cell is run, all modules will be reloaded.
%load_ext autoreload
%autoreload 2

In [12]:
import logging
import sys
import atexit
import time

from test import *
from cirrus import instance, automate, lr

In [3]:
# Cirrus produces logs, but they will not show unless we add a handler that prints.
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[%(funcName)16s | %(threadName)15s] %(message)s")
handler.setFormatter(formatter)
logger = logging.getLogger("cirrus")
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

## Instance, server, and task
---

First, we start an EC2 instance.

In [4]:
inst = instance.Instance(
    name="lr_example_instance",
    disk_size=32,
    typ="m5a.2xlarge",
    username="ubuntu",
    ami_owner_name=("self", "cirrus_server_image")
)
inst.start()

[        __init__ |      MainThread] __init__: Resolving AMI owner/name to AMI ID.
[    ec2_resource |      MainThread] ClientManager: Initializing EC2 resource.
[          config |      MainThread] config: Configuration loaded.
[        __init__ |      MainThread] __init__: Done.
[         _exists |      MainThread] _exists: Listing instances.
[         _exists |      MainThread] _exists: No existing instance with the same name was found.
[ _start_and_wait |      MainThread] _start_and_wait: Starting a new instance.
[ _start_and_wait |      MainThread] _start_and_wait: Waiting for instance to enter running state.
[ _start_and_wait |      MainThread] _start_and_wait: Fetching instance metadata.
[ _start_and_wait |      MainThread] _start_and_wait: Done.
[           start |      MainThread] start: Done.


Second, we create a parameter server to run on our instance.

In [5]:
server = automate.ParameterServer(
    instance=inst,
    ps_port=1337,
    error_port=1338,
    num_workers=64
)

Third, we define our machine learning task.

In [6]:
task = lr.LogisticRegression(
    n_workers=16,
    n_ps=1,
    dataset="criteo-kaggle-19b",
    learning_rate=0.0001,
    epsilon=0.0001,
    progress_callback=None,
    train_set=(0, 799),
    test_set=(800, 915),
    minibatch_size=200,
    model_bits=19,
    ps=server,
    opt_method="adagrad",
    timeout=60,
    lambda_size=192
)

## Run
---

Next, we run our machine learning task.

In [7]:
task.run()

[           start |      MainThread] start: Uploading configuration.
[     run_command |      MainThread] run_command: Calling _connect_ssh.
[    _connect_ssh |      MainThread] _connect_ssh: Configuring.
[    _connect_ssh |      MainThread] _connect_ssh: Making connection attempt #1 out of 20.
[    _connect_ssh |      MainThread] _connect_ssh: Connection attempt timed out after 5s.
[    _connect_ssh |      MainThread] _connect_ssh: Making connection attempt #2 out of 20.
[    _connect_ssh |      MainThread] _connect_ssh: Connection attempt failed. Sleeping for 5s.
[    _connect_ssh |      MainThread] _connect_ssh: Making connection attempt #3 out of 20.
[     run_command |      MainThread] run_command: Running `echo 'load_input_path: /mnt/efs/criteo_kaggle/train.csv 
load_input_type: csv
dataset_format: binary
num_classes: 2 
num_features: 13 
limit_cols: 14 
normalize: 0 
limit_samples: 10000 
s3_size: 50000 
use_bias: 1 
model_type: LogisticRegression 
minibatch_size: 200 
learning_

Run this cell to see the present accuracy of the model.

In [9]:
for line in server.error_output().split("\n")[-10:]:
    print(line)

[     run_command |      MainThread] run_command: Running `cat error_out_1337`.
[     run_command |      MainThread] run_command: Waiting for completion.
[     run_command |      MainThread] run_command: Fetching stdout and stderr.
[     run_command |      MainThread] run_command: stdout had length 24017.
[     run_command |      MainThread] run_command: stderr had length 0.
[     run_command |      MainThread] run_command: Exit code was 0.
[     run_command |      MainThread] run_command: Done.
[ERROR_TASK] computing loss.
[ERROR_TASK] Loss (Total/Avg): 3.82638e+06/0.665457 Accuracy: 0.745537 time(us): 1544048106102798 time from start (sec): 18.1391
[ERROR_TASK] getting the full model
[ERROR_TASK] received the model
[ERROR_TASK] computing loss.
[ERROR_TASK] Loss (Total/Avg): 3.81651e+06/0.663741 Accuracy: 0.745579 time(us): 1544048110753799 time from start (sec): 22.7901
[ERROR_TASK] getting the full model
[ERROR_TASK] received the model
[ERROR_TASK] computing loss.



## Test
---

In [14]:
test(server)

[     run_command |      MainThread] run_command: Running `cat error_out_1337`.
[     run_command |      MainThread] run_command: Waiting for completion.
[     run_command |      MainThread] run_command: Fetching stdout and stderr.
[     run_command |      MainThread] run_command: stdout had length 72298.
[     run_command |      MainThread] run_command: stderr had length 0.
[     run_command |      MainThread] run_command: Exit code was 0.
[     run_command |      MainThread] run_command: Done.
Test passed


## Cleanup
---

When we're satisfied with the results, we kill our task.

In [15]:
task.kill()

[     run_command |      MainThread] run_command: Running `kill -n 9 $(cat error_1337.pid)`.
[   delete_lambda | Exp #00 Cleanup] delete_lambda: Deleting Lambda function cirrus_worker_0_2018-12-5_14-14-19-582000.
[     run_command |      MainThread] run_command: Waiting for completion.
[     run_command |      MainThread] run_command: Fetching stdout and stderr.
[     run_command |      MainThread] run_command: stdout had length 0.
[     run_command |      MainThread] run_command: stderr had length 93.
[     run_command |      MainThread] run_command: Exit code was 2.


RuntimeError: `kill -n 9 $(cat error_1337.pid)` returned nonzero exit code 2.

We also need to terminate our instance in order to avoid continuing charges.

In [16]:
inst.cleanup()

[         cleanup |      MainThread] cleanup: Closing SSH client.
[         cleanup |      MainThread] cleanup: Terminating instance.
[         cleanup |      MainThread] cleanup: Waiting for instance to terminate.
[   launch_worker | Exp #00 Wkr #05] launch_worker: Task 50003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #10] launch_worker: Task 100003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #06] launch_worker: Task 60003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #03] launch_worker: Task 30003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #11] launch_worker: Task 110003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #13] launch_worker: Task 130003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #04] launch_worker: Task 40003 completed with status code 200.
[   launch_worker | Exp #00 Wkr #14] launch_worker: Task 140003 completed with status code 200.
[   launch_worker | E