In [811]:
import time
import pandas as pd


from ml_pipeline import *
from model_common import *

# Code to process MNIST data follows this cell

In [None]:
class ExtractMNIST(MLDerive):
    def __init__(self):
        super(ExtractMNIST, self).__init__()
        self.class_name = "ExtractMNIST"
        self.mime_type = 'text/plain'  
        self.final_loc = None
        self.data_name = None
        
        input_data = {"final_loc" : ""}
        input_json = json.dumps(input_data)
        output_data = {"Xtrain" : "", "ytrain" : "", "Xtest" : "", "ytest" : ""}
        output_json = json.dumps(output_data)    
        
        self.istr_jsons = input_json
        self.ostr_jsons = output_json
        
    def extract_mnist(self, filepath):
            
        df = pd.read_csv(filepath, header=None)
            
        y = df.iloc[:,0].copy()
        X = df.iloc[:,1:].copy()
        
        X = X.values
        y = y.values
        
        # Normalize data to range [0,1]
        X = X/255.0
        #X = np.array([[1.1, 2.2],[3.3,4.4]])
        #y = np.array([1,2])
        dprint(DPRINT_INFO, filepath + ": X.shape=" + str(X.shape) + ", y.shape=" + str(y.shape))
        return X, y

    
    def do_walk(self, filepath, input_data):
        if get_file_type(filepath, mime=True) != self.mime_type:
            raise MLException()
            
        if os.path.basename(filepath) == "mnist_train.csv":
            input_data["Xtrain"], input_data["ytrain"] = self.extract_mnist(filepath)
            self.do_save(self.data_name + "_Xtrain", input_data["Xtrain"])
            self.do_save(self.data_name + "_ytrain", input_data["ytrain"])
            input_data["Xtrain"] = self.data_name + "_Xtrain"
            input_data["ytrain"] = self.data_name + "_ytrain"
        elif os.path.basename(filepath) == "mnist_test.csv":
            input_data["Xtest"], input_data["ytest"] = self.extract_mnist(filepath)
            self.do_save(self.data_name + "_Xtest", input_data["Xtest"])
            self.do_save(self.data_name + "_ytest", input_data["ytest"])
            input_data["Xtest"] = self.data_name + "_Xtest"
            input_data["ytest"] = self.data_name + "_ytest"
        else:
            raise MLException()
            
        return {"stop" : False}
            
    def do_run(self, input_data, traversal):
        if traversal == "POST":
            if not "Xtrain" in input_data.keys():
                input_data["Xtrain"] = ""
            if not "ytrain" in input_data.keys():
                input_data["ytrain"] = ""
            if not "Xtest" in input_data.keys():
                input_data["Xtest"] = ""
            if not "ytest" in input_data.keys():
                input_data["ytest"] = ""
            return input_data
         
        if self.data_name is None or not isinstance(self.data_name, basestring):
            raise MLException()
            
        # Postgres does not like "." in table names
        if "." in self.data_name:
            raise MLException()
            
        self.final_loc = input_data['final_loc']
        
        self.walk_files(self.final_loc, input_data)
        
        return input_data
    


# Bernoulli Naive Bayes on the MNIST dataset

In [91]:
nsamples_list = [5, 10, 20, 50, 100, 250, 500, 1000, 5000, 10000, 20000, 30000, 40000]
MLRoot.init_storage(password="xypostgres", host="localhost", port="5432")

#nsamples_list = [5, 10]
myML = MLRoot()
myML.mount(mount_spec = "./mnist_bernNB_mounts.json")
myML.print_tree()
input_data = {}
input_data["remote_loc"] = ""
myML.compile(json.dumps(input_data))
input_data["remote_loc"] = "/data/mnist_original.zip"
myML.setprop("/root/fetch", {"max_size" : 1000})
myML.setprop("/root/derive/mnist_extract", {"data_name" : "bernNB"})
myML.setprop("/root/model/mnist_bernNB", {"nsamples_list" : nsamples_list})
myML.run(input_data)
myML.umount()

(INFO)creating node: resource: MLFetch, mount_subtree: /root/fetch
(INFO)creating node: resource: MLDerive, mount_subtree: /root/derive
(INFO)creating node: resource: MLModel, mount_subtree: /root/model
(INFO)creating node: resource: MLIngest, mount_subtree: /fetch/ingest
(INFO)creating node: resource: MLDecompress, mount_subtree: /fetch/decompress
(INFO)creating node: resource: MLUnarchive, mount_subtree: /fetch/unarchive
(INFO)creating node: resource: MLRaw, mount_subtree: /fetch/raw
(INFO)creating node: resource: MLHttpDownload, mount_subtree: /ingest/http_download
(INFO)creating node: resource: MLFSDownload, mount_subtree: /ingest/fs_download
(INFO)creating node: resource: MLZipDecompress, mount_subtree: /decompress/zip_decompress
(INFO)creating node: resource: MLBzip2Decompress, mount_subtree: /decompress/bzip2_decompress
(INFO)creating node: resource: MLGzipDecompress, mount_subtree: /decompress/gzip_decompress
(INFO)creating node: resource: MLTarUnarchive, mount_subtree: /unarch

# Logistic Regression on the MNIST dataset

In [None]:
myML = MLRoot()
myML.mount(mount_spec = "./mnist_logistic_mounts.json")
myML.print_tree()
input_data = {}
input_data["remote_loc"] = ""
myML.compile(json.dumps(input_data))
input_data["remote_loc"] = "mnist_original.zip"
myML.setprop("/root/fetch", {"max_size" : 1000})
myML.setprop("/root/derive/mnist_extract", {"data_name" : "Logistic"})
myML.setprop("/root/model/mnist_logistic", {"nsamples_list" : nsamples_list})
myML.run(input_data)
myML.umount()

MLRoot.destroy_storage()