In [1]:
import warnings
warnings.filterwarnings("ignore")
import os
import pandas as pd
import time

from tqdm import tqdm
import numpy as np
from sklearn.metrics import f1_score
from tqdm import tqdm
import numpy as np

In [2]:
TRAIN_LABEL_PATH = "data/train_labels.csv"
TRAIN_PATH = "data/train/"

# label dataframe 
label = pd.read_csv(TRAIN_LABEL_PATH)



feture_to_extract = {"mean":np.mean,
                    "max":np.max,
                    "min":np.min,
                    "var":np.var}

def get_feature(args):
    _file_path = args
    _df = pd.read_csv(_file_path)
    _file_name = _file_path.split('/')[-1]
    _feature_values = []
    
    # Extracting the mean from the time series as a feature
    for key in feture_to_extract.keys():
        _feature_values.extend(list(feture_to_extract[key](_df).values))

 
    return _feature_values


# Original

In [4]:
# extract features from train (time series)
dict_result = {}
for folds in tqdm(os.listdir(TRAIN_PATH)):
    dict_temp = {}
    nargs = os.listdir(TRAIN_PATH + folds)
    nargs = [(TRAIN_PATH + folds + '/' + _i) for _i in nargs]
    for item in nargs:
        dict_temp[item.split("/")[-1]] = get_feature(item)
    dict_result.update(dict_temp)
        



100%|██████████| 25/25 [20:00<00:00, 39.22s/it]


In [5]:
_df = pd.read_csv(item)
columns =  _df.columns
feature_columns = []
for key in feture_to_extract.keys():
    for column in columns:
        feature_columns.append(key+"_"+column)
        
feature_columns.append("ret")

In [6]:
train = pd.DataFrame(dict_result)
# train.head()
train = train.T
train['file_name'] = train.index
train = train.reset_index(drop=True)

train = pd.merge(train, label[['file_name', 'ret']], on='file_name', how='left')
train = train.reset_index(drop=True)

del train["file_name"]

train.columns = feature_columns

# Joblib

In [3]:
from joblib import Parallel, delayed
from datetime import datetime
from tqdm import tqdm
import time

In [4]:
# method 1: parallel along folds
def extract_features_from_files_in_folds(folds):
    dict_result = {}
    dict_temp = {}
    nargs = os.listdir(TRAIN_PATH + folds)
    nargs = [(TRAIN_PATH + folds + '/' + _i) for _i in nargs]
    for item in nargs:
        dict_temp[item.split("/")[-1]] = get_feature(item)
    dict_result.update(dict_temp)
    return dict_result

In [5]:
# method 2: parallel along files
files = []
for folds in os.listdir(TRAIN_PATH):
    nargs = os.listdir(TRAIN_PATH + folds)
    nargs = [(TRAIN_PATH + folds + '/' + _i) for _i in nargs]
    files.extend(nargs)
def extract_features_from_file(file):
    dict_temp = {}
    dict_temp[file.split("/")[-1]] = get_feature(file)
    return dict_temp

## Method1: parallel along folds

In [27]:
# method 1: parallel along folds
time_start = time.time()
results = Parallel(n_jobs=8)(delayed(extract_features_from_files_in_folds)(i) for i in tqdm(os.listdir(TRAIN_PATH)))
time_finish = time.time()
print('The total time taken for Parallel Method 1 is %d'%(time_finish - time_start))


  0%|          | 0/25 [00:00<?, ?it/s][A
 32%|███▏      | 8/25 [00:00<00:00, 30.44it/s][A
 64%|██████▍   | 16/25 [00:02<00:00,  9.48it/s][A
 96%|█████████▌| 24/25 [01:17<00:02,  2.88s/it][A
100%|██████████| 25/25 [01:17<00:00,  3.09s/it][A

234.10961723327637

## method 2: parallel along files

In [4]:
time_start = time.time()
results = Parallel(n_jobs=8)(delayed(extract_features_from_file)(i) for i in tqdm(files))
time_finish = time.time()
print('The total time taken for Parallel Method 2 is: %d'%(time_finish - time_start))

100%|██████████| 49647/49647 [02:38<00:00, 314.08it/s]


The total time taken for Parallel Method 2 is: 158


# Dask

In [6]:
from dask.distributed import Client, progress
import dask
import random

In [7]:
client = Client(n_workers = 25)
client

0,1
Client  Scheduler: tcp://127.0.0.1:39534  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 25  Cores: 50  Memory: 269.89 GB


## Method 1: parallel along folds

In [8]:
time_start = time.time()
tmp = client.map(extract_features_from_files_in_folds, os.listdir(TRAIN_PATH))
def combine(results):
    dic = {}
    for i in results:
        dic.update(i)
output = client.submit(combine, tmp)
#result = output.result()
time_finish = time.time()

In [10]:
time_start = time.time()
result = output.result()
time_finish = time.time()
print('The total time taken for Parallel Method 1 is: %d'%(time_finish - time_start))

The total time taken for Parallel Method 1 is: 165


## Method 2: parallel along files

In [11]:
time_start = time.time()
tmp = client.map(extract_features_from_file, files)
def combine(results):
    dic = {}
    for i in results:
        dic.update(i)
output = client.submit(combine, tmp)
result = output.result()
time_finish = time.time()

In [12]:
print('The total time taken for Parallel Method 2 is: %d'%(time_finish - time_start))

The total time taken for Parallel Method 2 is: 135


# Intake + Dask

In [5]:
train = train.dropna()

In [6]:
x = train.iloc[:,:-1].get_values()
y = train.iloc[:,-1].get_values()

In [7]:
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split( x, y, test_size=0.33, random_state=42)

In [8]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import f1_score
classifier = RandomForestClassifier()

In [9]:
classifier.fit(X_train, y_train)

y_train_prediction = classifier.predict(X_train)


f1 = f1_score(y_train, y_train_prediction)
print("The f1 score of training is ", f1)

The f1 score of training is  0.9998422264996372


In [10]:


y_test_prediction = classifier.predict(X_test)


f1 = f1_score(y_test, y_test_prediction)
print("The f1 score of test is ", f1)

The f1 score of test is  0.970970206264324
