# Preprocessing Movielens Data for Embeddings Learning

The following are the steps of this tutorial:


1. Download Movielens data.
2. Preprocess the data and store it as TFRecord files.
3. Read the prepared data in the TFRecords using tf.data APIs

<a href="https://colab.research.google.com/github/ksalama/data2cooc2emb2ann/blob/master/movie2ann/01-Preparing_Movielens_Data_for_embeddings_learning.ipynb" target="_parent">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/> </a>

### Setup

In [11]:
import os
import math
import apache_beam as beam
import tensorflow as tf
from datetime import datetime
import pandas as pd

In [15]:
WORKSPACE = './workspace'
DATA_DIR = '{}/data'.format(WORKSPACE)
COOC_DIR = '{}/cooc'.format(WORKSPACE)

In [7]:
if tf.io.gfile.exists(WORKSPACE):
    print("Removing {} contents...".format(WORKSPACE))
    tf.io.gfile.rmtree(WORKSPACE)

print("Creating workspace: {}".format(WORKSPACE))
tf.io.gfile.makedirs(WORKSPACE)

Creating workspace: ./workspace


## 1. Download Dataset

In [27]:
DATASET = 'ml-1m'
! wget http://files.grouplens.org/datasets/movielens/{DATASET}.zip -P {DATA_DIR}/
! unzip {DATA_DIR}/{DATASET}.zip -d {DATA_DIR}/
data_file = os.path.join(DATA_DIR, '{}/ratings.dat'.format(DATASET))

In [37]:
header = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings_data = pd.read_csv(data_file, sep="::", names=header)
print("Size: {}".format(len(ratings_data)))
ratings_data.head()

  


Size: 1000209


Unnamed: 0,user_id,movie_id,rating,timestamp
0,1,1193,5,978300760
1,1,661,3,978302109
2,1,914,3,978301968
3,1,3408,4,978300275
4,1,2355,5,978824291


## 2. Preprocess the data

### Preprocessing steps

In [83]:
def read_data(pipeline, source_data_location):
    raw_data = ( 
        pipeline
        | 'Read from files'>> beam.io.ReadFromText(
            file_pattern=source_data_location)
    )
    return raw_data
    

def parse_data(raw_data, delimiter):
    
    def _parse_csv(line, delimiter):
        try:
            item1, item2, score = line.split(delimiter)[:3]
            return (item1, item2, score)
        except:
            raise ValueError("Invalid file format. A delimited data with three values is expected.")
            
    parsed_data = (
        raw_data
        | 'Parse to tuple' >> beam.Map(_parse_csv, delimiter)
    
    )
    return parsed_data

def vocabulary(parsed_data, item_index):
    
    def _extract_item(record, item_index):
        return record[item_index]
    
    vocab = (
        parsed_data
        | 'Extract item {}'.format(item_index) >> beam.Map(_extract_item, item_index)
        | 'Extract vocabulary of item {}'.format(item_index) >> beam.Distinct()
    
    )
    return vocab 


def process_data(parsed_data):
    
    def _extend_record(record):
        item1, item2, score = record
        return (item1, item2, score, 1, 'P')
       
    processed_data = (
        parsed_data
        | 'Extend record' >> beam.Map(_extend_record)
    
    )
    return processed_data

def get_info(stats):
    
    def _make_type_as_key(record):
        _, _, _, _, record_type = record
        return (record_type, 1)
    
    def _get_scores(record):
        _, _, score, _, _ = record
        return score
    
    counts = (
        stats
        | "Group by record type" >> beam.Map(_make_type_as_key)
        | "Count records" >> beam.CombinePerKey(sum)
        | "Fromat counts" >> beam.Map(lambda entry: '{}: {}'.format(entry[0], entry[1]))
    )
    
    scores = (
        stats
        | "Get scores" >> beam.Map(_get_scores)
    )
    
    mins = (
        scores
        | "Get min score" >> beam.CombineGlobally(min).without_defaults()
        | "Format min score" >> beam.Map(lambda value: 'min: {}'.format(value))
    )
    
    maxs = (
        scores
        | "Get max score" >> beam.CombineGlobally(max).without_defaults()
        | "Format max score" >> beam.Map(lambda value: 'max: {}'.format(value))
    )
    
    info = (
        (counts, mins, maxs)
        | "Combine info" >> beam.Flatten()
    )
    
    return info
    

def write_debug(data, sink_data_location):
    
    (
        data
        | 'Write debug' >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/debug")
    )
    

def write_log(info, sink_data_location):
    
    (
        info
        | 'Write logs' >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/info",
            file_name_suffix = ".log",
            shard_name_template ='',
            num_shards = 1)
    )

def write_vocab(vocab, sink_data_location, item_index):
    
    (
        vocab
        | 'Write vocabulary file {}'.format(item_index) >> beam.io.WriteToText(
            file_path_prefix = sink_data_location+"/vocab", 
            file_name_suffix = "-{}.txt".format(item_index),
            shard_name_template ='',
            num_shards = 1)
    )
    

def write_to_tfrecords(stats, sink_data_location):
    
    def _to_tf_example(record):
        item1, item2, score, weight, record_type = record
        feature = {
            'item1': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(item1)])),
            'item2': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(item2)])),
            'score': tf.train.Feature(
                float_list=tf.train.FloatList(value=[float(score)])),
            'weight': tf.train.Feature(
                float_list=tf.train.FloatList(value=[float(weight)])),
            'type': tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(record_type)])),
        }
        return tf.train.Example(features=tf.train.Features(feature=feature))
        
    (
        stats
        | 'Encode to tf.example' >> beam.Map(_to_tf_example)
        | 'Serialize to string' >> beam.Map(lambda example: example.SerializeToString(deterministic=True))
        | 'Write to TFRecords files' >> beam.io.WriteToTFRecord(
                file_path_prefix = sink_data_location+"/cooc",
                file_name_suffix = '.tfrecords')
    ) 

### Preprocessing pipeline

In [84]:
def run_preproc_pipeline(args):

    source_data_location = args['source_data_location']
    sink_data_location = args['sink_data_location']
    delimiter = args['delimiter']
    
    pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)
    
    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        
        # Read data from source files
        raw_data = read_data(pipeline, source_data_location)
        
        # Parse data to (item_1, item_2, score)
        parsed_data = parse_data(raw_data, delimiter)
        
        # Process data to (item_1, item_2, score, weight, type)
        processed_data = process_data(parsed_data)
        #write_debug(processed_data, sink_data_location)
        
        # Extract distinct list of items 1 (vocabulary)
        vocab1 = vocabulary(parsed_data, 0)
        write_vocab(vocab1, sink_data_location, 0)

        # Extract distinct list of items 2 (vocabulary)
        vocab2 = vocabulary(parsed_data, 1)
        write_vocab(vocab2, sink_data_location, 1)
        
        # Write processed data to tfrecords
        write_to_tfrecords(processed_data, sink_data_location)
        
        # Log information about the created dataset
        info = get_info(processed_data)
        write_log(info, sink_data_location)


###  Run pipeline

In [85]:
runner = 'DirectRunner'
job_name = 'test-cooc-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))

args = {
    'job_name': job_name,
    'runner': runner,
    'source_data_location': data_file,
    'sink_data_location': COOC_DIR,
    'delimiter': '::',
    'num_shards': 100,
}
print("Pipeline args are set.")

Pipeline args are set.


In [86]:
time_start = datetime.utcnow() 
print("Running preproc pipeline...")
run_preproc_pipeline(args)
print("Pipeline is done.")
time_end = datetime.utcnow() 
time_elapsed = time_end - time_start
print("Execution elapsed time: {} seconds".format(time_elapsed.total_seconds()))

Running preproc pipeline...
Pipeline is done.
Execution elapsed time: 327.089793 seconds


In [92]:
!ls {COOC_DIR}

[34mbeam-temp-cooc-0d852e3ae0b411e99210784f439392c6[m[m
[34mbeam-temp-cooc-40a3b13ae0b411e9bdc1784f439392c6[m[m
[34mbeam-temp-cooc-6f5f21bae0b411e9bcc5784f439392c6[m[m
[34mbeam-temp-cooc-84e1c474e0b411e98006784f439392c6[m[m
[34mbeam-temp-info-0d7daa9ae0b411e9b859784f439392c6[m[m
[34mbeam-temp-info-40a11fe2e0b411e989b9784f439392c6[m[m
cooc-00000-of-00001.tfrecords
info.log
vocab-0.txt
vocab-1.txt


In [91]:
!head {COOC_DIR}/info.log

max: 5
P: 1000209
min: 1


## 3. Read TFRecords using tf.data APIs

In [87]:
def make_input_fn(file_pattern, batch_size):
    
    features = {
        'item1': tf.FixedLenFeature(dtype=tf.string, shape=()),
        'item2': tf.FixedLenFeature(dtype=tf.string, shape=()),
        'score': tf.FixedLenFeature(dtype=tf.float32, shape=()),
        'weight': tf.FixedLenFeature(dtype=tf.float32, shape=()),
        'type': tf.FixedLenFeature(dtype=tf.string, shape=())
    }

    def _input_fn():
        dataset = tf.data.experimental.make_batched_features_dataset(
            file_pattern,
            batch_size,
            features,
            reader=tf.data.TFRecordDataset,
            label_key=None,
            num_epochs=1,
            shuffle=True
        )
        return dataset
    
    return _input_fn

In [88]:
tf.enable_eager_execution()

DATA_FILES = "{}/cooc-*".format(COOC_DIR)

dataset = make_input_fn(DATA_FILES, batch_size=5)()
for i, features in enumerate(dataset.take(5)):
    print()
    print("Record {}:".format(i+1))
    for key in features:
        print("-{}:{}".format(key, features[key]))

Instructions for updating:
Use `tf.data.Dataset.interleave(map_func, cycle_length, block_length, num_parallel_calls=tf.data.experimental.AUTOTUNE)` instead. If sloppy execution is desired, use `tf.data.Options.experimental_determinstic`.


Instructions for updating:
Use `tf.data.Dataset.interleave(map_func, cycle_length, block_length, num_parallel_calls=tf.data.experimental.AUTOTUNE)` instead. If sloppy execution is desired, use `tf.data.Options.experimental_determinstic`.



Record 1:
-item1:[b'62' b'59' b'36' b'62' b'62']
-item2:[b'3481' b'1028' b'1376' b'2407' b'1267']
-score:[4. 3. 3. 4. 4.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 2:
-item1:[b'8' b'19' b'39' b'7' b'6']
-item2:[b'1673' b'1265' b'2770' b'1573' b'1688']
-score:[5. 4. 4. 4. 5.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 3:
-item1:[b'68' b'18' b'22' b'66' b'53']
-item2:[b'2908' b'1215' b'2302' b'661' b'1848']
-score:[5. 5. 3. 2. 3.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 4:
-item1:[b'48' b'10' b'46' b'26' b'53']
-item2:[b'2396' b'1196' b'1717' b'315' b'764']
-score:[4. 5. 5. 3. 5.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]

Record 5:
-item1:[b'56' b'18' b'29' b'15' b'56']
-item2:[b'2997' b'1013' b'3527' b'3773' b'2786']
-score:[4. 3. 5. 2. 1.]
-type:[b'P' b'P' b'P' b'P' b'P']
-weight:[1. 1. 1. 1. 1.]
