In [53]:
# Data manipulation libraries
import pandas as pd
import numpy as np
# Dask for lazy loading and computation of data
import dask.dataframe as dd
import time
from dask import delayed
import dask.array as da

#Word embedding
from gensim.models import Word2Vec
# Keras DeepLearning Framework
from keras.preprocessing.sequence import pad_sequences
from keras.models import Sequential
from keras.layers import Dense , Dropout, Embedding, LSTM

In [54]:
df = dd.read_csv("log_file.csv", dtype={'event:OfferID': 'object'})

### Group events
##### - group events part of same transacations
##### - since we dont know the final event in the sequence, an event type is added:
##### This is 'End' after the last event has occured

In [55]:
event_grouped = df.groupby('Id')["event:concept:name"].apply(list)
event_grouped = event_grouped.map_partitions(lambda x: x + ["End"])

  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  event_grouped = df.groupby('Id')["event:concept:name"].apply(list)


### Lazy loading of events for training Word2Vec Embedding

In [56]:
def gen_events(event_list):
    for x in range(event_list.npartitions):
            events = event_list.get_partition(x).compute()
            events = events.tolist()
            for x in events:
                yield x

In [57]:
class generate_Sequence():
    '''
    Streaming class to generate grouped events in a lazy way to avoid issues of RAM 
    running out of Memory.
    '''
    def __init__(self, generator_function,event_list):
        self.event_list = event_list
        self.generator_function = generator_function
        self.generator = self.generator_function(self.event_list)

    def __iter__(self):
        # reset the generator
        self.generator = self.generator_function(self.event_list)
        return self

    def __next__(self):
        result = next(self.generator)
        if result is None:
            raise StopIteration
        else:
            return result

In [58]:
iterate = generate_Sequence(gen_events,event_grouped)

In [60]:
w2vmodel = Word2Vec(iterate)

TypeError: Cannot broadcast np.ndarray with operand of type <class 'list'>

In [46]:
df['event:OfferID']

Dask Series Structure:
npartitions=4
    float64
        ...
        ...
        ...
        ...
Name: event:OfferID, dtype: float64
Dask Name: getitem, 8 tasks

In [61]:
print("hey")

hey
