otto_preprocessing_set_9

In [None]:
import numpy as np 
import pandas as pd 
import os
import zipfile
import json
import tensorflow as tf

from tqdm import tqdm
from typing import Dict, Text

In [None]:
print('TensorFlow version: {}'.format(tf.__version__))

In [None]:
INPUT_DIR = '/kaggle/input/otto-train-tfrecord-sets/otto_train_tfrecord_files/kaggle/working/set_A9'

In [None]:
# Create a description of the features.
feature_description = {
    'session': tf.io.FixedLenFeature([], tf.int64),
    'aid': tf.io.FixedLenFeature([], tf.int64),
    'ts': tf.io.FixedLenFeature([], tf.int64),
    'typ': tf.io.FixedLenFeature([], tf.int64),
}

window_size = 500
NUM_SHARDS = 100

In [None]:
def get_rating(idx, typ):
    temp=list(zip(idx, typ))
    temp.sort()
    t1, t2=zip(*temp)
    sorted_idx=list(t1)
    sorted_typ=list(t2)
    rating = tf.math.segment_max(tf.cast(sorted_typ,dtype=tf.float32), sorted_idx).numpy()
    return rating

@tf.function
def item2item(ds):
    itemA = ds['aid'][0]
    typ=ds['typ']
    itemB, idx, count = tf.unique_with_counts(ds['aid'])
    rating=tf.py_function(get_rating,(idx, typ),tf.float32)
   
    itemA = tf.repeat(itemA, repeats=[len(itemB)],axis=0)
    ts = tf.repeat(ds['ts'][0],repeats=[len(itemB)],axis=0)
    return {"item_A" : itemA, "item_B" : itemB, "rating": rating, "ts":ts}

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_example(item_A, item_B, rating, ts):
    feature = {
        'item_A': _int64_feature(item_A),
        'item_B': _int64_feature(item_B),
        'rating': _float_feature(rating),
        'ts': _int64_feature(ts),
        }
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

def tf_serialize_example(x):
    item_A = x['item_A'] 
    item_B = x['item_B']
    rating = x['rating']
    ts = x['ts']
    tf_string = tf.py_function(
        serialize_example,
        (item_A, item_B, rating, ts),  # Pass these args to the above function.
        tf.string)      # The return type is `tf.string`.
    return tf.reshape(tf_string, ()) # The result is a scalar.

In [None]:
def preprocess(filename, filepath):
    dataset = tf.data.TFRecordDataset(filepath, compression_type='ZLIB')
    dataset = dataset.map(lambda x: tf.io.parse_single_example(x, feature_description), num_parallel_calls=tf.data.AUTOTUNE )
    dataset = dataset.group_by_window(
                        key_func=lambda x: x['session'],
                        reduce_func=lambda key, dataset: dataset.batch(window_size),
                        window_size=window_size)
    dataset = dataset.map(item2item, num_parallel_calls=tf.data.AUTOTUNE).flat_map(tf.data.Dataset.from_tensor_slices)
#     dataset = dataset.batch(batch_size=1024, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    serialized_dataset = dataset.map(tf_serialize_example)

    writer = tf.data.experimental.TFRecordWriter(f'/kaggle/working/pp02{filename}', compression_type='ZLIB')
    writer.write(serialized_dataset)
    return 

In [None]:
%%time
for dirname, _, filenames in os.walk(INPUT_DIR):
    for filename in filenames:
        filepath = os.path.join(dirname, filename)
        print('Processing :', filename)
        preprocess(filename, filepath)
        


In [None]:
!zip -r set_A9.zip '/kaggle/working/'