In [1]:
import importlib
import snrf
import pushranker
import numpy as np
from pathlib import Path
from typing import List, Dict, Tuple, DefaultDict

In [2]:
push_type = 'scheduled' # local, targeted, scheduled
train_day = '0516'
test_day = '0517'

LOCAL_DATA_ROOT = Path('../data/')
TRAIN_DATA_ROOT = str(LOCAL_DATA_ROOT / 'train' / 'edition=en_US' / f'push_type={push_type}' / train_day) # / 'dt=2021-04-30-00')
TEST_DATA_ROOT = str(LOCAL_DATA_ROOT / 'test' / 'edition=en_US' / f'push_type={push_type}' / test_day) # / 'dt=2021-05-01-00')

training_format = snrf.package.get_obj_from_name(pushranker, 'survival_feature_spec.binarized_format_devicetoken')

# input_module = pushranker.local_push
input_module = importlib.import_module(f'pushranker.{push_type}_push')

print(TRAIN_DATA_ROOT)
print(TEST_DATA_ROOT)

../data/train/edition=en_US/push_type=scheduled/0516
../data/test/edition=en_US/push_type=scheduled/0517


In [3]:
def prepare(root, shuffle=None):
    ds = snrf.tfrecord.read_dataset_from_files(root)
#     ds = ds.shuffle(10000)
    ds = snrf.tfrecord.prepare_dataset_for_use(ds,
                                               input_module.input_spec,
                                               shuffle=shuffle,
                                               training_format=training_format)
    return ds

train_ds = prepare(TRAIN_DATA_ROOT, 10000)
test_ds = prepare(TEST_DATA_ROOT)
type(train_ds)

tensorflow.python.data.ops.dataset_ops.PrefetchDataset

In [4]:
# Transform push-ranker data for GBDT
numeric_features = ['predicted_ctr']
dense_features = ['cgScores', 'a_stats', 'af_dense', 'uf_dense', 'u_hhs']
dense_features_len = [10, 36, 9, 10, 24]
# sparse_features = [
#     'push_id', 'u_cate','u_catev2', 'u_hist_ids', 'u_pub', 'u_pub_ctr', 'u_kw',
#     'a_catev2', 'a_features', 'a_site_id', 'a_kw', 'a_tw'
# ]

def build_numeric_feature(batch) -> List[float]:
    samples = list()
    i = 0
    while True:
        value = list()
        try:
            for f in numeric_features:
                value.append(batch[0][f][i])
        except IndexError:
            break
        else:
            samples.append(value)
            i += 1
    return samples
    

def build_dense_feature(batch) -> List[List[float]]:
    samples = list()
    i = 0
    while True:
        feat_vec = list()
        try:
            for f in dense_features:
                feat_vec.extend(list(batch[0][f][i]))
        except IndexError:
            break
        else:
            samples.append(feat_vec)
            i += 1
    return samples
    

def build_sparse_feature_dict(batch) -> List[Dict]:
    samples = list()
    i = 0
    while True:
        featureDict = dict()
        try:
            for f in sparse_features:
                if f == 'push_id':
                    featureDict[f] = str(batch[0][f][i])  
                else:
                    featureDict[f] = [str(val) for idx, val in np.ndenumerate(batch[0][f][i]) if val]       
        except IndexError:
            break
        else:
            samples.append(featureDict)
            i += 1
    return samples

In [6]:
# prepare training data
numeric_feat = list()
dense_feat = list()
sparse_samples = list()

for batch in train_ds.as_numpy_iterator():
    numeric_feat.extend(build_numeric_feature(batch))
    dense_feat.extend(build_dense_feature(batch))
#     sparse_samples.extend(build_sparse_feature_dict(batch))
    
numeric_feature = np.array(numeric_feat)
numeric_feat.clear()
dense_feature = np.array(dense_feat)
dense_feat.clear()
# dense_feature_column = [f'{df}_{i}' for df, l in zip(dense_features, dense_features_len) for i in range(l)]

# build sparse features transformer
# vec = DictVectorizer()
# sparse_feature = vec.fit_transform(sparse_samples).toarray()
# sparse_samples.clear()
# sparse_feature_column = vec.feature_names_
# sparse_feature.shape

In [7]:
# merge training data
# feature_columns = numeric_features + dense_feature_column# + sparse_feature_column
train_X = np.hstack((numeric_feature, dense_feature))#, sparse_feature))
train_X.shape

(35226290, 90)

In [8]:
np.save(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-trainX-{train_day}.npy'), train_X)

In [9]:
# prepare testing data
numeric_feat = list()
dense_feat = list()
sparse_samples = list()
for batch in test_ds.as_numpy_iterator():
    numeric_feat.extend(build_numeric_feature(batch))
    dense_feat.extend(build_dense_feature(batch))
#     sparse_samples.extend(build_sparse_feature_dict(batch))
numeric_feature = np.array(numeric_feat)
numeric_feat.clear()
dense_feature = np.array(dense_feat)
dense_feat.clear()
# sparse_feature = vec.transform(sparse_samples).toarray()
# sparse_samples.clear()
test_X = np.hstack((numeric_feature, dense_feature))#, sparse_feature))
test_X.shape

(36052789, 90)

In [10]:
np.save(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-testX-{test_day}.npy'), test_X)

In [11]:
# prepare labels
train_y = list()
test_y = list()

for train in train_ds:
    train_y.extend(list(np.array(train[1])))
train_y = np.array(train_y)
print(len(train_y))
np.save(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-trainy-{train_day}.npy'), train_y)

for test in test_ds:
    test_y.extend(list(np.array(test[1])))
test_y = np.array(test_y)
print(len(test_y))
np.save(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-testy-{test_day}.npy'), test_y)


35226290
36052789


In [12]:
sum(train_y)

2149168

In [13]:
sum(test_y)

2501688

## Data Analysis

In [5]:
train_X = np.load(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-trainX-{train_day}.npy'))
train_y = np.load(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-trainy-{train_day}.npy'))

In [6]:
test_X = np.load(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-testX-{test_day}.npy'))
test_y = np.load(str(LOCAL_DATA_ROOT / 'gbdt' / f'{push_type}-testy-{test_day}.npy'))

In [7]:
print(train_X.shape, train_y.shape)
print(test_X.shape, test_y.shape)

(35226290, 90) (35226290,)
(36052789, 90) (36052789,)


In [8]:
def getCTRLabelDevtok(tfdata) -> Tuple[List]:
    fy_gbdt_ctr = list()
    label = list()
    devtok = list()    

    for feature_batch, label_batch, devtok_batch in tfdata.as_numpy_iterator():
        for f, l, dtk in zip(feature_batch['predicted_ctr'], label_batch, devtok_batch):
            fy_gbdt_ctr.append(f)
            label.append(l)
            devtok.append(dtk[0])
    assert len(fy_gbdt_ctr) == len(label) == len(devtok)
    return fy_gbdt_ctr, label, devtok

In [23]:
%%time
import concurrent.futures

tfdata = [train_ds, test_ds]
with concurrent.futures.ThreadPoolExecutor() as executor:
#     future = executor.submit(getCTRLabelDevtok, train_ds)
#     return_value = future.result()
    futures = [executor.submit(getCTRLabelDevtok, param) for param in tfdata]


CPU times: user 46min 8s, sys: 52.9 s, total: 47min 1s
Wall time: 13min 1s


In [24]:
%%time
for d in tfdata:
    a, b, c = getCTRLabelDevtok(d)

CPU times: user 45min 9s, sys: 46.1 s, total: 45min 55s
Wall time: 22min 13s


In [25]:
train_fy_gbdt_ctr, train_label, train_devtok = futures[0].result()
test_fy_gbdt_ctr, test_label, test_devtok = futures[1].result()

In [26]:
print(len(train_label)) #examples
print(sum(train_label)) #pos examples

35226290
2149168


In [27]:
print(len(test_label))
print(sum(test_label))

36052789
2501688


In [28]:
%%time
# user cnt
distUsers_train = set(train_devtok)
print(len(distUsers_train))
distUsers_test = set(test_devtok)
print(len(distUsers_test))

interUsers = set.intersection(distUsers_train, distUsers_test)
print(len(interUsers))

15236203
15313733
13707402
CPU times: user 20.9 s, sys: 1.31 s, total: 22.2 s
Wall time: 22.1 s


In [30]:
%%time

import collections
# from itertools import repeat
# from multiprocessing import Pool

len_samples_train = len(train_devtok)
len_samples_test = len(test_devtok)
user_fy_gbdt_train = collections.defaultdict(lambda: collections.defaultdict(list))
user_fy_gbdt_test = collections.defaultdict(lambda: collections.defaultdict(list))

def getScoreLabel(i: int, user_fy_gbdt: DefaultDict):
    user_fy_gbdt[devtok[i]].append((fy_gbdt_ctr[i], label[i]))
    
# num_processors = 32
# p = Pool(processes=num_processors)
# p.starmap(getScoreLabel, zip(range(len_samples), repeat(user_fy_gbdt)))
# p.close()

# train
for i in range(len_samples_train):
    user_fy_gbdt_train[train_devtok[i]]['score'].append(train_fy_gbdt_ctr[i])
    user_fy_gbdt_train[train_devtok[i]]['label'].append(train_label[i])

# test
for i in range(len_samples_test):
    user_fy_gbdt_test[test_devtok[i]]['score'].append(test_fy_gbdt_ctr[i])
    user_fy_gbdt_test[test_devtok[i]]['label'].append(test_label[i])


CPU times: user 7min 6s, sys: 7.27 s, total: 7min 14s
Wall time: 7min 13s


In [31]:
# %%time

# from multiprocessing import Pool
# # #users have more than one news and at least one positive example

# def moreThanOneNewsAndAtLeastOnePositive(newsList: List) -> int:
#     return 1 if len(newsList) > 1 and sum(l for _, l in newsList) > 0 else 0

# num_processors = 32
# p = Pool(processes=num_processors)
# r = p.map(moreThanOneNewsAndAtLeastOnePositive, user_fy_gbdt.values())
# sum(r)

In [33]:
%%time
s = sum(1 for news in user_fy_gbdt_test.values() if (len(news['label']) > 1) and sum(news['label'])>0)
s

CPU times: user 13.3 s, sys: 0 ns, total: 13.3 s
Wall time: 13.3 s


991764

In [None]:
## candidates distribution
