In [149]:

import dask.dataframe as dd
import numpy as np
from distributed import Client
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from nltk.corpus import stopwords
import scipy
from scipy.sparse import csr_matrix, hstack
from sklearn.model_selection import train_test_split

In [139]:
np.set_printoptions(suppress=True)

In [116]:
client = Client('127.0.0.1:8786')

In [117]:
df = dd.read_csv('/Users/marvin/testcases/datarevenue/data_root/raw/wine_dataset.csv', blocksize=1e6)
df = df.set_index('Unnamed: 0')

35.4476027029926

In [154]:
def make_groups(df):
    def points2group(score):
        splits = [84, 89, 95]
        for (index, split) in enumerate(splits):
            if score < split:
                return index
        return len(splits)

    df['group'] = df['points'].apply(points2group, meta=('points', 'int'))
    return df

def clean_text(line):
    line = line.lower()
    return line

df = make_groups(df)
df['description'] = df['description'].apply(clean_text, meta=('description', 'str'))
df['description_len'] = df['description'].apply(len, meta=('description', 'str'))
df['country'] = df['country'].fillna('unk')
df['price'] = df['price'].fillna(df['price'].mean().compute())

In [155]:
df['country'] = df['country'].astype('category')
df = df.categorize(columns=['country'])

In [156]:
vectorizer = TfidfVectorizer(stop_words='english', max_features=500)
vectorizer.fit(ddf['description'])
description_encodings = vectorizer.transform(ddf['description'])

In [157]:
description_encodings

<10000x500 sparse matrix of type '<class 'numpy.float64'>'
	with 165033 stored elements in Compressed Sparse Row format>

In [158]:
country_encodings = csr_matrix(dd.get_dummies(df['country'], prefix = 'country'))



In [159]:
other_columns = csr_matrix(df[['price', 'description_len']])

In [160]:
all_data = hstack((description_encodings, country_encodings, other_columns))

In [161]:
all_data

<10000x540 sparse matrix of type '<class 'numpy.float64'>'
	with 195033 stored elements in COOrdinate format>

In [146]:
scipy.sparse.save_npz('/Users/marvin/Downloads/all_data.npz', all_data)

In [147]:
all_data2 = scipy.sparse.load_npz('/Users/marvin/Downloads/all_data.npz')

In [148]:
all_data2

<10000x540 sparse matrix of type '<class 'numpy.float64'>'
	with 195033 stored elements in COOrdinate format>

In [165]:
df['group'].values.compute()

array([2, 1, 1, ..., 1, 1, 2])

In [176]:
x_train, x_test, y_train, y_test = train_test_split(all_data, df['group'].values.compute(), test_size=0.2, random_state=0)

In [181]:
df[['group']].to_parquet('/Users/marvin/Downloads/group.parquet')

KilledWorker: ('write_partition-619ad844-99f5-4589-ace4-8599adc2124e', <Worker 'tcp://127.0.0.1:53819', memory: 0, processing: 5>)

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1268, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 998, in _reconnect
    await self._close()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1268, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError


In [171]:
b

<2000x540 sparse matrix of type '<class 'numpy.float64'>'
	with 39168 stored elements in Compressed Sparse Row format>

In [172]:
c

array([1, 1, 3, ..., 1, 1, 2])

In [173]:
d

array([2, 2, 1, ..., 1, 2, 1])

In [44]:
import numpy as np
import dask.bag as db

from toolz.curried import frequencies
from toolz.curried import valmap
from toolz.curried import merge_with
from toolz.curried import unique
from toolz import merge_with as _merge_with

In [55]:
def normalize(_dict):
    return {k:v/sum(_dict.values()) for k,v in _dict.items()}

corpus = 2*[
    "a a b b b z",
    "a a a a c z",
    "c b a d e f g z",
    "a b b b c c c d d d a b z",
    "c b a d e f g z",
    "a b b b c c c d d d a b z z z z z z z z z z z z z"
]


base = (
    db
    .from_sequence(corpus, npartitions=4)
    .map(str.split)
)

tf = (
    base
    .map(frequencies)
    .map(normalize)
)

idf = (
    base
    .map(unique)
    .map(frequencies)
    .reduction(merge_with(sum), merge_with(sum), split_every=2)
    .apply(normalize)
    .apply(valmap(np.log10))
    .apply(valmap(np.negative))
)

In [107]:
tf.compute()

[{'a': 0.3333333333333333, 'b': 0.5, 'z': 0.16666666666666666},
 {'a': 0.6666666666666666, 'c': 0.16666666666666666, 'z': 0.16666666666666666},
 {'c': 0.125,
  'b': 0.125,
  'a': 0.125,
  'd': 0.125,
  'e': 0.125,
  'f': 0.125,
  'g': 0.125,
  'z': 0.125},
 {'a': 0.15384615384615385,
  'b': 0.3076923076923077,
  'c': 0.23076923076923078,
  'd': 0.23076923076923078,
  'z': 0.07692307692307693},
 {'c': 0.125,
  'b': 0.125,
  'a': 0.125,
  'd': 0.125,
  'e': 0.125,
  'f': 0.125,
  'g': 0.125,
  'z': 0.125},
 {'a': 0.08, 'b': 0.16, 'c': 0.12, 'd': 0.12, 'z': 0.52},
 {'a': 0.3333333333333333, 'b': 0.5, 'z': 0.16666666666666666},
 {'a': 0.6666666666666666, 'c': 0.16666666666666666, 'z': 0.16666666666666666},
 {'c': 0.125,
  'b': 0.125,
  'a': 0.125,
  'd': 0.125,
  'e': 0.125,
  'f': 0.125,
  'g': 0.125,
  'z': 0.125},
 {'a': 0.15384615384615385,
  'b': 0.3076923076923077,
  'c': 0.23076923076923078,
  'd': 0.23076923076923078,
  'z': 0.07692307692307693},
 {'c': 0.125,
  'b': 0.125,
  'a': 

In [56]:
idf.compute()

{'a': 0.7269987279362623,
 'b': 0.8061799739838872,
 'z': 0.7269987279362623,
 'c': 0.8061799739838872,
 'd': 0.9030899869919435,
 'e': 1.2041199826559248,
 'f': 1.2041199826559248,
 'g': 1.2041199826559248}