In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import f1_score
import tensorflow as tf
from ast import literal_eval

from sherlock import helpers
from sherlock.features.preprocessing import extract_features, convert_string_lists_to_lists, prepare_feature_extraction
from sherlock.deploy.train_sherlock import train_sherlock
from sherlock.deploy.predict_sherlock import predict_sherlock
from sherlock.features.word_embeddings import initialise_word_embeddings
from sherlock.features.paragraph_vectors import initialise_pretrained_model

from pympler import muppy, summary
from datetime import datetime

from pyarrow.parquet import ParquetFile

from sherlock.functional import as_py_str, to_literal, randomise_sample, as_str_series, dropna, extract_features

In [3]:
prepare_feature_extraction()
initialise_word_embeddings()
initialise_pretrained_model(400)

Preparing feature extraction by downloading 2 files:
        
 ../sherlock/features/glove.6B.50d.txt and 
 ../sherlock/features/par_vec_trained_400.pkl.docvecs.vectors_docs.npy.
        
All files for extracting word and paragraph embeddings are present.
Initialising word embeddings
Initialise Word Embeddings process took 0:00:06.065168 seconds.
Initialise Doc2Vec Model, 400 dim, process took 0:00:01.532057 seconds.


In [4]:
path = "../data/raw/test_values.parquet"
pf = ParquetFile(source = path)

In [5]:
row_df = pf.read_row_group(0)

In [7]:
from multiprocessing import Pool
import csv
from datetime import datetime

verify_keys = False
first_keys = None
i = 0

start = datetime.now()

with Pool(processes=6) as pool:
    with open('test.csv', "w") as outfile:
        csvwriter = csv.writer(outfile)

        for features in pool.imap(extract_features,
                                  map(dropna,
                                      map(as_str_series,
                                          map(randomise_sample,
                                              pool.imap(to_literal,
                                                       map(as_py_str, row_df['values'])))))):
            i = i+1

            keys=features.keys()

            if first_keys is None:
                first_keys = keys
                first_keys_str = ','.join(keys)

                print(f'Exporting {len(first_keys)} column features')

                csvwriter.writerow(first_keys)
            elif verify_keys:
                keys_str = ','.join(keys)
                if first_keys_str != keys_str:
                    key_list = list(keys)

                    print(f'keys are NOT equal. k1 len={len(first_keys)}, k2 len={len(keys)}')

                    for idx, k1 in enumerate(first_keys):
                        k2 = key_list[idx]

                        if k1 != k2:
                            print(f'{k1} != {k2}')

            csvwriter.writerow(list(features.values()))

print(f'Finished. Processed {i} rows in {datetime.now() - start}')

Exporting 1588 column features
Finished. Processed 137353 rows in 0:26:02.218227


In [8]:
import csv
from datetime import datetime
from functional import pseq

verify_keys = False
first_keys = None
i = 0

start = datetime.now()

print(f'Starting at {start}')

with open('test.csv', "w") as outfile:
    csvwriter = csv.writer(outfile)
        
    # Comparable performance with using pool.imap directly, but the code is *much* cleaner
    for features in pseq(row_df['values'], processes=6, partition_size=10)\
        .map(as_py_str)\
        .map(to_literal)\
        .map(randomised_sample)\
        .map(as_str_series)\
        .map(dropna)\
        .map(extract_features):
            i = i+1
            
            keys=features.keys()

            if first_keys is None:
                first_keys = keys
                first_keys_str = ','.join(keys)

                print(f'Exporting {len(first_keys)} column features')

                csvwriter.writerow(first_keys)
            elif verify_keys:
                keys_str = ','.join(keys)
                if first_keys_str != keys_str:
                    key_list = list(keys)

                    print(f'keys are NOT equal. k1 len={len(first_keys)}, k2 len={len(keys)}')

                    for idx, k1 in enumerate(first_keys):
                        k2 = key_list[idx]

                        if k1 != k2:
                            print(f'{k1} != {k2}')

            csvwriter.writerow(list(features.values()))

print(f'Finished. Processed {i} rows in {datetime.now() - start}')

Starting at 2020-12-22 09:02:16.363139
Exporting 1588 column features
Finished. Processed 137353 rows in 0:23:55.255376


In [7]:
import csv
from datetime import datetime
from functional import pseq
from sherlock.functional import values_only, black_hole
from queue import Queue
from threading import Thread


class FeatureWorker(Thread):

    def __init__(self, queue, csvwriter):
        Thread.__init__(self)
        self.queue = queue
        self.csvwriter = csvwriter
        self.verify_keys = False
        self.first_keys = []
        self.counter = 0

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            features = self.queue.get()
            
            try:
                self.counter = self.counter + 1
                
                if self.first_keys is None:
                    keys=features.keys()

                    self.first_keys = keys
                    first_keys_str = ','.join(keys)

                    print(f'Exporting {len(self.first_keys)} column features')

                    csvwriter.writerow(self.first_keys)

#                csvwriter.writerow(list(features.values()))
                csvwriter.writerow(features)
            finally:
                self.queue.task_done()


start = datetime.now()

print(f'Starting at {start}')

# Create a queue to communicate with the background thread
queue = Queue()

with open('test.csv', "w") as outfile:
    csvwriter = csv.writer(outfile)
    
    worker = FeatureWorker(queue, csvwriter)

    worker.daemon = True
    worker.start()

    # Comparable performance with using pool.imap directly, but the code is *much* cleaner
    for features in pseq(row_df['values'], processes=5, partition_size=10)\
        .map(as_py_str)\
        .map(to_literal)\
        .map(randomise_sample)\
        .map(as_str_series)\
        .map(dropna)\
        .map(extract_features)\
        .map(values_only):
            # enqueue returned features for background persistence
            queue.put(features)

    print('Waiting for background thread to complete')
            
    # wait for background tasks to complete
    queue.join()
    
    print(f'Processed {worker.counter} items')

print(f'Finished. Processed in {datetime.now() - start}')

Starting at 2020-12-23 09:32:15.332880
Waiting for background thread to complete
Processed 137353 items
Finished. Processed in 0:24:12.906544
