In [1]:
import gzip
import pickle

import numpy as np
import pandas as pd
import psycopg2 as pg

pklhp = pickle.HIGHEST_PROTOCOL

import os
cwd = os.getcwd()
print (cwd)

/Users/tianlechen/Documents/GitHub/Togepi/tianle-rxplenishment/gru_arrivaltimes/run_crusoely/blagdon_sodapop_jointwrfm


In [2]:
testing = True

# Source transactions of interest

In [3]:
import funforsql as ffs
alltrans = ffs.gettransactions()
if testing: print (alltrans.head(5))

skusfname: sodapopskus.txt
timeunit: week
      custid  ordqty  product_sku                date
0  9623203.0     2.0      10201.0 2013-02-09 20:20:47
1  9475196.0     1.0      10201.0 2013-01-08 14:16:45
2   524466.0     1.0      10201.0 2013-01-08 13:39:34
3  7646254.0     2.0      10201.0 2013-03-05 17:13:34
4  9074093.0     1.0      10201.0 2014-06-04 13:39:00


In [4]:
d = pickle.load(gzip.open('dates.pkl' + str(pklhp), 'rb'))
if testing: 
    for k in d:
        print ('%s:\t%s' % (k, d[k]))

tstart:	2014-01-01 00:00:00
ttrain:	2015-06-30 00:00:00
tend:	2016-12-31 00:00:00


In [5]:
intraining = alltrans['date'] >= d['tstart']
intraining = np.logical_and(intraining, alltrans['date'] <= d['tend'])
alltrans = alltrans.loc[intraining, :]

In [6]:
if testing: print ('len alltrans', len(alltrans))

len alltrans 2042361


In [7]:
if testing: print (alltrans.sort_values(by = 'date').head(5))

            custid  ordqty  product_sku                date
40776    9314863.0     1.0      64330.0 2014-01-01 09:21:16
58099   12698719.0     2.0     181042.0 2014-01-01 09:29:13
792836   8367321.0     2.0      83092.0 2014-01-01 09:52:32
61724   11020411.0     4.0     140363.0 2014-01-01 10:10:41
62444   10199683.0     1.0     140363.0 2014-01-01 10:14:52


# Set up some global parameters

In [8]:
def datestrrep(dt):
    return str(dt.strftime('%Y-%m-%d'))

In [9]:
allcids = np.unique(np.array(alltrans['custid']))
if testing: print ('len unique allcids', len(allcids))

len unique allcids 596811


In [10]:
allskus = np.sort(np.unique(np.array(alltrans.loc[:, 'product_sku'])))
if testing: print (allskus); print('len(allskus):', len(allskus))

[  10201.   37852.   64330.   83092.  120768.  137538.  140363.  181042.]
len(allskus): 8


In [11]:
alltimes = pd.date_range(d['tstart'], d['tend'])
if testing: print ('len alltimes', len(alltimes))

len alltimes 1096


# Source an rfm file

In [12]:
def getrfm(valdate, verbose = False):
    fname = cwd + '/rfmcache/rfm_valdate_%s.pkl2' % str(datestrrep(valdate))
    if verbose: import time; t0 = time.time()
    if pklhp == 2:
        out = pickle.load(gzip.open(fname, 'rb'))
    else:
        out = pickle.load(gzip.open(fname, 'rb'), encoding = 'latin1')
    if verbose: print ('loading took', time.time()-t0)
    return out

In [13]:
if testing:
    x = getrfm(alltimes[0])
    print ('len x:', len(x))

len x: 10


## Set up target cids, times, skus

In [14]:
tgtcids = allcids
if testing: 
    tgtcids = allcids[:10]
    print (len(tgtcids))

10


In [15]:
tgttimes = alltimes
if testing: 
    tgttimes = alltimes[:100]
    print (len(tgttimes))

100


In [16]:
tgtskus = allskus
if testing:
    tgtskus = allskus[:2]
    print (tgtskus)

[ 10201.  37852.]


## Try to get all covariates for tgtcids, tgtskus, tgttimes

data matrix is of shape:
```
len(tgtcids) BY len(tgtskus) BY len(tgttimes) BY 13
```

since there are 13 covariates:
```
(tse, tte, unc, pcs), 3*(rfmall, rfmcat, rfmthis)
```

In [17]:
import TimeSeriesTransforms as tstf

def formattedrow_tstf(cidtemp, skutemp, tgttimes, verbose = False):
    '''
    returns np.array of shape (len(tgttimes), 4)
    where 4 = #(tse,tte,unc,pcs)
    '''
    if verbose: import time
    
    skuindex = int(np.where(allskus == skutemp)[0])
    
    if verbose: t0 = time.time()
    # get event indicator for cidtemp, skutemp
    evtind = alltrans['custid'] == cidtemp
    evtind = np.logical_and(evtind, alltrans['product_sku'] == skutemp)
    # event times as number of days from (origin = min(tgttimes))
    evt = np.sort(np.array((alltrans.loc[evtind, 'date'] - min(tgttimes)).dt.days))
    evt = evt[evt < len(tgttimes)]

    # default is uncensored=False, purchstatus=False, eventindicator=False
    evind = np.zeros(len(tgttimes))
    unc   = np.zeros(len(tgttimes))
    pcs   = np.zeros(len(tgttimes))
    # if there has been an event...
    if len(evt) > 0:
        evind[evt] = 1
        unc[:max(evt)] = 1
        pcs[min(evt):] = 1
    # transform to get tse, tte
    tse = tstf.tse(evind); tte = tstf.tte(evind)
    # reshape so that is of shape (len(tgttimes), 1)
    tse = tse.reshape((len(tgttimes), 1))
    tte = tte.reshape((len(tgttimes), 1))
    unc = unc.reshape((len(tgttimes), 1))
    pcs = pcs.reshape((len(tgttimes), 1))
    if verbose: t1 = time.time(); print('(tse, tte, unc, pcs) done in:', t1-t0); t0 = t1
    
    return np.concatenate((tse, tte, unc, pcs), axis = 1)

In [18]:
def formattedrow_allskus(cidtemp, tgtskus, tgttimes, verbose = False):
    '''
    return long version of formattedrow, done for all in tgtskus
    '''
    longshape = (1, 1, len(tgttimes), 4)
    def worker(skutemp): 
        out = formattedrow_tstf(cidtemp, skutemp, tgttimes, verbose = verbose)
        return out.reshape(longshape)
        
    return np.concatenate([worker(k) for k in tgtskus], axis = 1)

In [19]:
def getrfm_matrix(tgtcids, rfmtemp, verbose = False):
    '''
    return (len(tgtcids), 3)
    where #(r, f, m) = 3
    '''
    out = np.zeros((len(tgtcids), 3))
    if verbose: import time; t0 = time.time()
    for i in range(len(tgtcids)):
        cidtemp = tgtcids[i]
        cidind = rfmtemp['custid'] == cidtemp
        if np.any(cidind):
            out[i, :] = np.array(rfmtemp.loc[cidind, ['r', 'f', 'm']]).reshape((1, 3))
    if verbose: print ('took:', time.time()-t0)
    return out

In [20]:
def getrfm_singletime(valdate, tgtcids, tgtskus, verbose = False):
    '''
    return (len(tgtcids), len(tgtskus), 9)
    where #(r, f, m) * #(rfmall, rfmcat, rfmthis) = 9
    '''
    x = getrfm(valdate, verbose = verbose)
    if verbose: import time; print ('x shape')
    # dissect the loaded file...
    rfmall = x[0]
    rfmcat = x[1]
    # get indices for tgtskus...
    tgtskusindices = np.where(np.in1d(allskus, tgtskus))[0]
    
    out = np.zeros((len(tgtcids), len(tgtskus), 3, 3))
    rfmmall = getrfm_matrix(tgtcids, rfmall, verbose = verbose)
    rfmmcat = getrfm_matrix(tgtcids, rfmcat, verbose = verbose)
    
    if verbose: t0 = time.time()
    for k in range(len(tgtskus)):
        out[:,k,0,:] = rfmmall
        out[:,k,1,:] = rfmmcat
        out[:,k,2,:] = getrfm_matrix(tgtcids, x[2+tgtskusindices[k]], verbose = verbose)
    if verbose: print ('took:', time.time()-t0)
    
    return out.reshape((len(tgtcids), len(tgtskus), 1, 9))

## Aggregator

In [21]:
def getdata(tgtcids, tgtskus, tgttimes, verbose = False):
    '''
    return array of shape (len(tgtcids), len(tgtskus), len(tgttimes), 13)
    where #(tse,tte,unc,pcs) + #(r, f, m) * #(rfmall, rfmcat, rfmthis) = 13
    '''
    
    outshape = (len(tgtcids), len(tgtskus), len(tgttimes), 13)
    if verbose:
        import time
        t0 = time.time()
        print ('outshape', outshape)
        print ('max size in MB:', 32*np.prod(outshape)/np.power(10,6))
    
    resl_tstf = list(map(lambda x: formattedrow_allskus(x, tgtskus, tgttimes), tgtcids))
    data_tstf = np.concatenate(resl_tstf, axis = 0)
    if verbose: t1 = time.time(); print('tstf took', t1-t0); t0 = t1
    
    resl_rfm = list(map(lambda x: getrfm_singletime(x, tgtcids, tgtskus), tgttimes))
    data_rfm = np.concatenate(resl_rfm, axis = 2)
    if verbose: t1 = time.time(); print('rfm took', t1-t0); t0 = t1
    
    return np.concatenate((data_tstf, data_rfm), axis = 3)

## Testing...

In [22]:
# data = getdata(tgtcids, tgtskus, tgttimes, verbose = True)
# print (data.shape)

In [23]:
batchsize = 1000
if testing: batchsize = 5

In [24]:
tgtcids_split = np.array_split(tgtcids, int(len(tgtcids)/batchsize))
if testing: print ('len split:', len(tgtcids_split))

len split: 2


In [35]:
def worker(batchno):
    fname = '%s/cache/%s.pkl%s' % (cwd, batchno, pklhp)
    import time
    t0 = time.time()
    print ('batchno:', batchno)
    out = getdata(tgtcids_split[batchno], tgtskus, tgttimes)
    pickle.dump(out, gzip.open(fname, 'wb'), -1)
    print ('batchno:%s took:%s' % (batchno, time.time()-t0))
    return fname

In [36]:
from multiprocessing import Pool

nthreads = 24
if testing: nthreads = 2

In [37]:
p = Pool(nthreads)
if testing: import time; t0 = time.time()
resl = p.map(worker, np.arange(len(tgtcids_split)))
if testing: t1=time.time(); print('done in:', t1-t0); t0 = t1

batchno: 0
batchno: 1
batchno:1 took:48.653870820999146
batchno:0 took:48.83422780036926
done in: 48.83882117271423
