# Compulsory assignment 1 - Using Dask-ml on large data
## Group #16
### Thomas Moen and Jørgen Kongsro

## Import libraries

In [None]:
%matplotlib notebook

import numpy as np
import pandas as pd
import dask
import os
import dask.dataframe as dd
import scipy
import matplotlib.pyplot as plt
import skimage.io
import dask.array as da
import matplotlib.pyplot as plt

from dask.diagnostics import ProgressBar
from dask_ml.linear_model import LogisticRegression
from dask_ml.model_selection import train_test_split
from dask_ml.datasets import make_classification
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LogisticRegression
from dask_ml.metrics import accuracy_score
from dask_ml.model_selection import IncrementalSearchCV

from sklearn.linear_model import SGDClassifier
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from sklearn import metrics

print (dask.__version__)

## Install Kaggle API and download Kaggle data

In [None]:
# Install Kaggle API
# How to setup: https://github.com/Kaggle/kaggle-api
# or visit: https://adityashrm21.github.io/Setting-Up-Kaggle/

#!pip install kaggle

In [None]:
# Download Kaggle data using Kaggle API
#!kaggle competitions download -c dat300-ca1-autumn-2019


In [None]:
# Unzip Kaggle data
#!unzip "dat300-ca1-autumn-2019.zip" -d "/tmp/whatever"

## List files in directory (adjust for different operating systems)

In [None]:
# Adjust for different os

if os.name == 'nt':
    workdir = 'C://Users//thomoe//Documents//myDAT300//dat300-ca1-autumn-2019//'
elif os.name == 'posix':
    workdir = '/Users/jorgenkongsro/Downloads/dat300-ca1-autumn-2019/'
    
os.listdir(workdir)


## Import data

In [None]:
def import_data(csv_file):
    """ Import data from csv file

    :param data: a .csv separated dataset
    :return: a pandas data array, df
    
    """
    
    
    df = dd.read_csv(csv_file)
    return df

x_train_df = import_data(workdir + 'X_train.csv')
x_test_df = import_data(workdir + 'X_test.csv')
y_train_df = import_data(workdir + 'y_train.csv')


## Check for missing data

In [None]:
def percent_missing(dataframe):
    """ Check for percent missing values in dataframe
    :param data: dataframe
    :return: dataframe
    
    """
    missing_values = dataframe.isnull().sum()
    
    with ProgressBar():
        percent_missing = ((missing_values / dataframe.index.size) * 100).compute()
        
    return percent_missing


print(percent_missing(y_train_df))

"""
note: the results indicate that the features come in "tripets", e.g. f1 to f3 have quite similar missing%. 
We could impute some values very precisely by insert the mean of the other values within the triplet, 
if only one or two values are missing
"""

## Impute missing data

In [2]:
#choose here which imputation methods to use:
imputation_methods = ['correlated_columns', 'col_means']


In [3]:
#impute by inserting the mean of the column in question, for all columns: 
 
if 'col_means' in imputation_methods:
    
    #calculate mean (note that axis needs to be 0 to get columns, which is weird)
    miin = x_train_df.mean(axis = 0).compute() # Fill with mean value
    miin2 = y_train_df.min(axis = 0).compute() # Fill with zeros

    x_train_df_imean = x_train_df.fillna(dict(miin))
    y_train_df_imean = y_train_df.fillna(dict(miin2))

    # Transfer back to dask array

['y_test_sampleSubmission.csv', 'X_train.csv', 'y_train.csv', 'X_test.csv']

In [None]:
#impute by inserting value from most closely correlated column: 

"""
I can't understand why this one doesn't work. The idea was to fill in NaN's from correlated columns.
The problem might be in the last line

"""

#will only correct if the correlation between columns is above this threshold: 
lowest_allowed_corr = 0.995

if 'correlated_columns' in imputation_methods:
    
    #if correlation matrix does not exist, first try to read it from file, if that does not work calculate it: 
    if dir().count('corrs') == 0: 
        try: 
            corrs = pd.read_csv(workdir + 'features_correlation_matrix.txt')
        except:
            corrs = x_train_df.corr('pearson')
    
    #impute for each feature feat:
    for feat in x_train_df_colnames:
        
        #order the feature names according to (absolute value of) correlations to feat:
        abscorr = [abs(a) for a in list(corrs[feat])]
        order = np.argsort(abscorr)[::-1]
        topfeatures = [x_train_df_colnames[a] for a in order]
        
        #remove features which are not sufficiently correlated to feat:
        mapper = dict(zip(x_train_df_colnames, abscorr))
        topfeatures = [a for a in topfeatures if mapper[a] >= lowest_allowed_corr]
        
        #correct using each feature in topfeatures, starting with the most strongly correlated otherfeature:
        for otherfeat in [a for a in topfeatures if a != feat]:
            print(feat, otherfeat)
            x_train_df_trim[feat] = x_train_df_trim[feat].fillna(x_train_df_trim[otherfeat]).compute()




In [None]:
def drop_rows(dataframe):
    """ Drop rows if more than 0 and less than 5% missing 
    :param data: dataframe
    :return: dataframe
    
    """
    rows_to_drop = list(percent_missing[(percent_missing > 0) & (percent_missing < 5)].index)
    data_clean = dataframe.dropna(subset=rows_to_drop)
    


## Build model and train

In [7]:
# Create dask array with chunks. Delete obsolete variables
X = x_train_df_imean
X = X.to_dask_array(lengths=True)
y = y_train_df_imean
y = y.to_dask_array(lengths=True)
y
del x_train_df_imean, x_train_df, y_train_df_imean, y_train_df

In [8]:
# Convert blocks in dask array x for new chunks
X = X.rechunk((100000, 162))
y = y.rechunk((100000, 1))

X

Unnamed: 0,Array,Chunk
Bytes,4.09 GB,129.60 MB
Shape,"(3155759, 162)","(100000, 162)"
Count,509 Tasks,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 4.09 GB 129.60 MB Shape (3155759, 162) (100000, 162) Count 509 Tasks 32 Chunks Type float64 numpy.ndarray",162  3155759,

Unnamed: 0,Array,Chunk
Bytes,4.09 GB,129.60 MB
Shape,"(3155759, 162)","(100000, 162)"
Count,509 Tasks,32 Chunks
Type,float64,numpy.ndarray


In [9]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)
del X, y
y_train

Unnamed: 0,Array,Chunk
Bytes,22.72 MB,720.00 kB
Shape,"(2840183, 1)","(90000, 1)"
Count,258 Tasks,32 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 22.72 MB 720.00 kB Shape (2840183, 1) (90000, 1) Count 258 Tasks 32 Chunks Type float64 numpy.ndarray",1  2840183,

Unnamed: 0,Array,Chunk
Bytes,22.72 MB,720.00 kB
Shape,"(2840183, 1)","(90000, 1)"
Count,258 Tasks,32 Chunks
Type,float64,numpy.ndarray


### Parallelize Scikit-Learn

In [None]:
from dask.distributed import Client
import joblib

client = Client()  # Connect to a Dask Cluster

from sklearn.ensemble import RandomForestClassifier 
model = RandomForestClassifier(n_estimators=100, max_depth=2,random_state=0)
with joblib.parallel_backend('dask'):
    model.fit(X_train, y_train)
    y_true = y_test
    y_pred = model.predict(X_test)

    
"""

We ran into memory error for the model.predict part of the code

"""

### Reimplement Scalable Algorithms with Dask Array

In [None]:
from dask_ml.linear_model import LogisticRegression

lr = LogisticRegression()
lr.fit(data, labels)

"""

Takes a lot of time to compute

"""

In [None]:
def diagnostics(y_test, y_pred):
    """
    Compute the AUC and F1 metric for the model test data
    Args:
        y_test (list): Measured target test data
        y_pred (list): Predicted target data based on model test X

    Returns:
        auc (int): area under curve metric
        F1 (int): F1 score
    """
accuracy_score(y_true, y_pred)
fpr, tpr, thresholds = metrics.roc_curve(y_true, y_pred)
AUC = metrics.auc(fpr, tpr)
F1 = f1_score(y_true, y_pred)

return AUC, F1


In [None]:
# Confusion matrix of test data

confmat_test = confusion_matrix(y_true=y_test, y_pred=y_pred)
print(confmat_test)

fig, ax = plt.subplots(figsize=(2.5, 2.5))
ax.matshow(confmat_test, cmap=plt.cm.Blues, alpha=0.3)
for i in range(confmat_test.shape[0]):
    for j in range(confmat_test.shape[1]):
        ax.text(x=j, y=i, s=confmat_test[i, j], va='center', ha='center')

plt.xlabel('Predicted label')
plt.ylabel('True label')

plt.tight_layout()
plt.show()

In [None]:
from dask_ml.linear_model import LogisticRegression

lr = LogisticRegression()
lr.fit(X_train, y_train)

  return np.log1p(A)
  return np.log1p(A)


In [None]:
from sklearn.linear_model import LogisticRegressionCV
from dask_ml.wrappers import ParallelPostFit

clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")

clf.fit(X_train, y_train)

y_true = y_train
y_pred = clf.predict(X_train)
accuracy_score(y_true, y_pred)

fpr, tpr, thresholds = metrics.roc_curve(y_test, y_pred)
print('AUC_lr: %.3f' % metrics.auc(fpr, tpr))
print('F1_lr: %.3f' % f1_score(y_true, y_pred))


In [None]:
y_true = y_train
y_pred = clf.predict(X_train)
accuracy_score(y_true, y_pred)

fpr, tpr, thresholds = metrics.roc_curve(y_true, y_pred)
print('AUC_lr: %.3f' % metrics.auc(fpr, tpr))
print('F1_lr: %.3f' % f1_score(y_true, y_pred))

In [None]:
# Compute the AUC, F1 and confusion matrix for the logistic regression model on the training data
from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression()
lr.fit(X_train,y_train)
y_true = y_train
y_pred = lr.predict(X_train)
accuracy_score(y_true, y_pred)

fpr, tpr, thresholds = metrics.roc_curve(y_test, y_pred)
print('AUC_lr: %.3f' % metrics.auc(fpr, tpr))
print('F1_lr: %.3f' % f1_score(y_true, y_pred))

In [None]:
y_true = y_test
y_pred = lr.predict(X_test)
#accuracy_score(y_true, y_pred)

fpr, tpr, thresholds = metrics.roc_curve(y_test, y_pred)
print('AUC_lr: %.3f' % metrics.auc(fpr, tpr))
print('F1_lr: %.3f' % f1_score(y_true, y_pred))

In [None]:
# Confusion matrix of test data

confmat_test = confusion_matrix(y_true=y_test, y_pred=y_pred)
print(confmat_test)

fig, ax = plt.subplots(figsize=(2.5, 2.5))
ax.matshow(confmat_test, cmap=plt.cm.Blues, alpha=0.3)
for i in range(confmat_test.shape[0]):
    for j in range(confmat_test.shape[1]):
        ax.text(x=j, y=i, s=confmat_test[i, j], va='center', ha='center')

plt.xlabel('Predicted label')
plt.ylabel('True label')

plt.tight_layout()
plt.show()

In [None]:
len(y_pred)