# Beam conversion from Bigquery to TF Records

In this notebook we use Apache Beam to convert to tfrecords
The applications can be found in `beam_candidates` and `beam_training` for candidate generation and training

## Load env config

In [3]:
# naming convention for all cloud resources
VERSION        = "v1"                  # TODO
PREFIX         = f'ndr-{VERSION}'      # TODO

print(f"PREFIX = {PREFIX}")

PREFIX = ndr-v1


In [4]:
# staging GCS
GCP_PROJECTS             = !gcloud config get-value project
PROJECT_ID               = GCP_PROJECTS[0]

# GCS bucket and paths
BUCKET_NAME              = f'{PREFIX}-{PROJECT_ID}-bucket'
BUCKET_URI               = f'gs://{BUCKET_NAME}'

config = !gsutil cat {BUCKET_URI}/config/notebook_env.py
print(config.n)
exec(config.n)


PROJECT_ID               = "myproject32549"
PROJECT_NUM              = "683169793466"
LOCATION                 = "us-central1"

REGION                   = "us-central1"
BQ_LOCATION              = "US"
VPC_NETWORK_NAME         = "ucaip-haystack-vpc-network"

VERTEX_SA                = "683169793466-compute@developer.gserviceaccount.com"

PREFIX                   = "ndr-v1"
VERSION                  = "v1"

APP                      = "sp"
MODEL_TYPE               = "2tower"
FRAMEWORK                = "tfrs"
DATA_VERSION             = "v1"
TRACK_HISTORY            = "5"

BUCKET_NAME              = "ndr-v1-myproject32549-bucket"
BUCKET_URI               = "gs://ndr-v1-myproject32549-bucket"
SOURCE_BUCKET            = "spotify-million-playlist-dataset"

DATA_GCS_PREFIX          = "data"
DATA_PATH                = "gs://ndr-v1-myproject32549-bucket/data"
VOCAB_SUBDIR             = "vocabs"
VOCAB_FILENAME           = "vocab_dict.pkl"

CANDIDATE_PREFIX         = "candidates"
TRAIN_DIR_PREFIX  

In [6]:
BQ_DATASET

'spotify_e2e_test'

In [7]:
! gsutil ls -al gs://$BUCKET_NAME

                                 gs://ndr-v1-myproject32549-bucket/config/
                                 gs://ndr-v1-myproject32549-bucket/data/
                                 gs://ndr-v1-myproject32549-bucket/local-train-v2/
                                 gs://ndr-v1-myproject32549-bucket/matching_engine/
                                 gs://ndr-v1-myproject32549-bucket/scale-training-v1/


## pip & package

In [5]:
# !pip install --upgrade 'apache-beam[gcp]' --user

In [10]:
import os
import time
from pprint import pprint

import logging
logging.disable(logging.WARNING)

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 

import tensorflow as tf

from google.cloud import storage

import warnings
warnings.filterwarnings('ignore')

In [11]:
storage_client = storage.Client(project=PROJECT_ID)

# Run Dataflow to convert BQ to TFrecords

Candidate generation can be found in `beam_candidates`
Training and Validation generation can be found in `beam_training`

Usage:
* Candidate generation

> `beam_candidates\python3 main.py $PROJECT_ID $NETWORK $REGION $VERSION $BUCKET_NAME $CANDIDATE_PREFIX $BQ_DATASET $BQ_TABLE_CANDIDATES`
   
* Training generation
  
> `beam_training\python3 main-train.py <BQ_table> <gcs data subfolder> <desired partition size MB> <BQ dataset size MB> <version tag>`

In [12]:
!tree beam_training

[01;34mbeam_training[00m
├── README.MD
├── __init__.py
├── [01;34mcreate_tfrecords_training.egg-info[00m
│   ├── PKG-INFO
│   ├── SOURCES.txt
│   ├── dependency_links.txt
│   ├── requires.txt
│   └── top_level.txt
├── main-train.py
├── setup.py
└── [01;34mtrain_pipeline[00m
    ├── __init__.py
    ├── [01;34m__pycache__[00m
    │   ├── __init__.cpython-310.pyc
    │   ├── __init__.cpython-37.pyc
    │   ├── train_pipe.cpython-37.pyc
    │   └── train_pipe_shape.cpython-310.pyc
    └── train_pipe_shape.py

3 directories, 15 files


In [13]:
%cd beam_training


/home/jupyter/spotify_mpd_two_tower/beam_training


In [14]:
!echo $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $CANDIDATE_PREFIX $BQ_DATASET $BQ_TABLE_CANDIDATES

myproject32549 ucaip-haystack-vpc-network us-central1 v1 ndr-v1-myproject32549-bucket candidates spotify_e2e_test candidates


### Candidates

In [18]:
# VPC_NETWORK_NAME="default"
BQ_DATASET = 'spotify_e2e_test2'


start_time = time.time()
print("VPC_NETWORK_NAME:", VPC_NETWORK_NAME)

! python3 main.py $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $CANDIDATE_PREFIX $BQ_DATASET $BQ_TABLE_CANDIDATES

end_time = time.time()
runtime_mins = int((end_time - start_time) / 60)
print(f"total runtime_mins: {runtime_mins}")

### Validation set

In [15]:
%cd ../beam_training

/home/jupyter/spotify_mpd_two_tower/beam_training


In [16]:
REGION='asia-east1'

TARGET_SHARD_SIZE_MB_VALID = 250
TOTAL_MB_VALID = 500
NUM_TF_RECORDS = int(TOTAL_MB_VALID) // int(TARGET_SHARD_SIZE_MB_VALID)
NUM_TF_RECORDS

2

In [20]:
! echo $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $VALID_DIR_PREFIX $TOTAL_MB_VALID $TARGET_SHARD_SIZE_MB_VALID $BQ_DATASET $BQ_TABLE_VALID


myproject32549 ucaip-haystack-vpc-network asia-east1 v1 ndr-v1-myproject32549-bucket valid 500 250 spotify_e2e_test train_flatten_valid_last_5


In [20]:
start_time = time.time()
BQ_DATASET = 'spotify_e2e_test2'

! python3 main-train.py $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $VALID_DIR_PREFIX $TOTAL_MB_VALID $TARGET_SHARD_SIZE_MB_VALID $BQ_DATASET $BQ_TABLE_VALID

end_time = time.time()
runtime_mins = int((end_time - start_time) / 60)
print(f"total runtime_mins: {runtime_mins}")

Number of Expected TFRecords: 2
GoogleCloudOptions(create_from_snapshot=None, dataflow_endpoint=https://dataflow.googleapis.com, dataflow_kms_key=None, dataflow_service_options=None, enable_artifact_caching=False, enable_hot_key_logging=False, enable_streaming_engine=False, flexrs_goal=None, gcp_oauth_scopes=['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], impersonate_service_account=None, job_name=spotify-bq-tfrecords-v1-240422-175517, labels=None, no_auth=False, project=myproject32549, region=asia-east1, service_account_email=None, staging_location=gs://ndr-v1-myproject32549-bucket/data/v1/job/staging/, temp_location=gs://ndr-v1-myproject32549-bucket/data/v1/job/temp/, template_location=None

### Tain set

In [22]:
TARGET_SHARD_SIZE_MB_TRAIN = 2000
TOTAL_MB_TRAIN = 44_000
NUM_TF_RECORDS = int(TOTAL_MB_TRAIN) // int(TARGET_SHARD_SIZE_MB_TRAIN)
NUM_TF_RECORDS

22

In [23]:
!echo $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $TRAIN_DIR_PREFIX $TOTAL_MB_TRAIN $TARGET_SHARD_SIZE_MB_TRAIN $BQ_DATASET $BQ_TABLE_TRAIN

myproject32549 ucaip-haystack-vpc-network asia-east1 v1 ndr-v1-myproject32549-bucket train 44000 2000 spotify_e2e_test train_flatten_last_5


In [22]:
start_time = time.time()

! python3 main-train.py $PROJECT_ID $VPC_NETWORK_NAME $REGION $DATA_VERSION $BUCKET_NAME $TRAIN_DIR_PREFIX $TOTAL_MB_TRAIN $TARGET_SHARD_SIZE_MB_TRAIN $BQ_DATASET $BQ_TABLE_TRAIN

end_time = time.time()
runtime_mins = int((end_time - start_time) / 60)
print(f"total runtime_mins: {runtime_mins}")

Number of Expected TFRecords: 22
GoogleCloudOptions(create_from_snapshot=None, dataflow_endpoint=https://dataflow.googleapis.com, dataflow_kms_key=None, dataflow_service_options=None, enable_artifact_caching=False, enable_hot_key_logging=False, enable_streaming_engine=False, flexrs_goal=None, gcp_oauth_scopes=['https://www.googleapis.com/auth/bigquery', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/devstorage.full_control', 'https://www.googleapis.com/auth/userinfo.email', 'https://www.googleapis.com/auth/datastore', 'https://www.googleapis.com/auth/spanner.admin', 'https://www.googleapis.com/auth/spanner.data'], impersonate_service_account=None, job_name=spotify-bq-tfrecords-v1-240422-180304, labels=None, no_auth=False, project=myproject32549, region=asia-east1, service_account_email=None, staging_location=gs://ndr-v1-myproject32549-bucket/data/v1/job/staging/, temp_location=gs://ndr-v1-myproject32549-bucket/data/v1/job/temp/, template_location=Non

In [26]:
%cd ..

/home/jupyter/spotify_mpd_two_tower


# Test TFRecords

In [27]:
from util import feature_set_utils as feature_utils

## Candidates

### Candidate tower features

In [28]:
candidate_features = feature_utils.get_candidate_features()
candidate_features

{'track_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'track_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'album_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'album_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'duration_ms_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'track_pop_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'artist_pop_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'artist_genres_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_followers_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'track_danceability_can': FixedLenFeature(shape=(), dtype=tf.float32, defa

### Candidate files

In [29]:
candidate_files = []

for blob in storage_client.list_blobs(f"{BUCKET_NAME}", prefix=f'data/{DATA_VERSION}/{CANDIDATE_PREFIX}'):
    candidate_files.append(blob.public_url.replace("https://storage.googleapis.com/", "gs://"))

print("candidate_files: ", candidate_files)
candidate_dataset = tf.data.TFRecordDataset(candidate_files)

parsed_candidate_dataset = candidate_dataset.map(feature_utils.parse_candidate_tfrecord_fn)

candidate_files:  ['gs://ndr-v1-myproject32549-bucket/data/v1/candidates/-00000-of-00001.tfrecords']


In [31]:
for x in parsed_candidate_dataset.batch(1).take(1):
    pprint(x)

{'album_name_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Pihana'], dtype=object)>,
 'album_uri_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'spotify:album:4f3ZzEECOPykndvzaLlcg0'], dtype=object)>,
 'artist_followers_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
 'artist_genres_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'NONE'], dtype=object)>,
 'artist_name_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Napua'], dtype=object)>,
 'artist_pop_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
 'artist_uri_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'spotify:artist:3411tg52DNMfAuJiwZIfjW'], dtype=object)>,
 'duration_ms_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([219346.], dtype=float32)>,
 'track_acousticness_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.917], dtype=float32)>,
 'track_danceability_can': <tf.Tensor: shape=(1,), dty

## Train & Valid datasets

### Get all features

In [33]:
print("TRACK_HISTORY: ", TRACK_HISTORY)
feats = feature_utils.get_all_features(TRACK_HISTORY, ranker=True)
feats

TRACK_HISTORY:  5


{'track_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'track_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'album_uri_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'album_name_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'duration_ms_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'track_pop_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'artist_pop_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'artist_genres_can': FixedLenFeature(shape=(), dtype=tf.string, default_value=None),
 'artist_followers_can': FixedLenFeature(shape=(), dtype=tf.float32, default_value=None),
 'track_danceability_can': FixedLenFeature(shape=(), dtype=tf.float32, defa

### Choose feature set mappings

> TODO: define mappings

* `towers`
* `rank`
* `audio-rank`
* `lw-audio-rank`

```python
def get_feature_mapping(key):
    """
    returns chosen parse function
    
    example:
        desired_mapping = get_feature_mapping(MY_CHOICE)
    """
    
    map_dict = {
        "towers": feature_utils.parse_towers_tfrecord,
        "rank": feature_utils.parse_rank_tfrecord,
        "audio-rank": feature_utils.parse_audio_rank_tfrecord,
        "lw-audio-rank": feature_utils.parse_lw_audio_rank_tfrecord,
    }
    return map_dict[key]
```

### Valid files

In [34]:
FEATURE_MAP = feature_utils.parse_rank_tfrecord


<function util.feature_set_utils.parse_rank_tfrecord(example)>

In [43]:
valid_files = []

for blob in storage_client.list_blobs(f"{BUCKET_NAME}", prefix=f'data/{DATA_VERSION}/{VALID_DIR_PREFIX}/'):
    if '.tfrecords' in blob.name:
        valid_files.append(blob.public_url.replace("https://storage.googleapis.com/", "gs://"))

print("valid_files: ", valid_files)

valid = tf.data.TFRecordDataset(valid_files)

print(f"valid size: {len(list(valid))}")

valid_parsed = valid.map(FEATURE_MAP)
valid_parsed

valid_files:  ['gs://ndr-v1-myproject32549-bucket/data/v1/valid/-00000-of-00002.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/valid/-00001-of-00002.tfrecords']
valid size: 5


<MapDataset element_spec={'album_name_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'album_name_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'album_uri_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'album_uri_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_followers_can': TensorSpec(shape=(), dtype=tf.float32, name=None), 'artist_genres_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_genres_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_name_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_name_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_pop_can': TensorSpec(shape=(), dtype=tf.float32, name=None), 'artist_pop_pl': TensorSpec(shape=(5,), dtype=tf.float32, name=None), 'artist_uri_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_uri_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artists_followers_pl': TensorSpec(shape=(5,), dtype=tf.float32, name

In [36]:
for x in valid_parsed.batch(1).take(1):
    pprint(x)

{'album_name_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'Tiller Gang'], dtype=object)>,
 'album_name_pl': <tf.Tensor: shape=(1, 5), dtype=string, numpy=
array([[b'Tiller Gang', b'Tiller Gang', b'Tiller Gang', b'Tiller Gang',
        b'Tiller Gang']], dtype=object)>,
 'album_uri_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'spotify:album:0NJcvtv173uUae6NFosCIl'], dtype=object)>,
 'album_uri_pl': <tf.Tensor: shape=(1, 5), dtype=string, numpy=
array([[b'spotify:album:0NJcvtv173uUae6NFosCIl',
        b'spotify:album:0NJcvtv173uUae6NFosCIl',
        b'spotify:album:0NJcvtv173uUae6NFosCIl',
        b'spotify:album:0NJcvtv173uUae6NFosCIl',
        b'spotify:album:0NJcvtv173uUae6NFosCIl']], dtype=object)>,
 'artist_followers_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([190565.], dtype=float32)>,
 'artist_genres_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b"'country rap', 'redneck'"], dtype=object)>,
 'artist_genres_pl': <tf.Tensor: shape=(1

### Train files

In [39]:
train_files = []
for blob in storage_client.list_blobs(f"{BUCKET_NAME}", prefix=f'data/{DATA_VERSION}/{TRAIN_DIR_PREFIX}/', delimiter="/"):
    if '.tfrecords' in blob.name:
        train_files.append(blob.public_url.replace("https://storage.googleapis.com/", "gs://"))

print("train_files: ", train_files)
train = tf.data.TFRecordDataset(train_files)

print(f"train size: {len(list(train))}")

train_parsed = train.map(FEATURE_MAP)

train_files:  ['gs://ndr-v1-myproject32549-bucket/data/v1/train/-00000-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00001-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00002-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00003-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00004-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00005-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00006-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00007-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00008-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00009-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00010-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00011-of-00022.tfrecords', 'gs://ndr-v1-myproject32549-bucket/data/v1/train/-00012-of-00

In [41]:

train_parsed

<MapDataset element_spec={'album_name_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'album_name_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'album_uri_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'album_uri_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_followers_can': TensorSpec(shape=(), dtype=tf.float32, name=None), 'artist_genres_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_genres_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_name_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_name_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artist_pop_can': TensorSpec(shape=(), dtype=tf.float32, name=None), 'artist_pop_pl': TensorSpec(shape=(5,), dtype=tf.float32, name=None), 'artist_uri_can': TensorSpec(shape=(), dtype=tf.string, name=None), 'artist_uri_pl': TensorSpec(shape=(5,), dtype=tf.string, name=None), 'artists_followers_pl': TensorSpec(shape=(5,), dtype=tf.float32, name

In [42]:
for x in train_parsed.batch(1).take(1):
    print(x)

{'album_name_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'The Best of Mandy Moore'], dtype=object)>, 'album_name_pl': <tf.Tensor: shape=(1, 5), dtype=string, numpy=
array([[b'The Best of Mandy Moore', b'The Best of Mandy Moore',
        b'The Best of Mandy Moore', b'The Best of Mandy Moore',
        b'The Best of Mandy Moore']], dtype=object)>, 'album_uri_can': <tf.Tensor: shape=(1,), dtype=string, numpy=array([b'spotify:album:551186Jr75bHrh58ZdEMhW'], dtype=object)>, 'album_uri_pl': <tf.Tensor: shape=(1, 5), dtype=string, numpy=
array([[b'spotify:album:551186Jr75bHrh58ZdEMhW',
        b'spotify:album:551186Jr75bHrh58ZdEMhW',
        b'spotify:album:551186Jr75bHrh58ZdEMhW',
        b'spotify:album:551186Jr75bHrh58ZdEMhW',
        b'spotify:album:551186Jr75bHrh58ZdEMhW']], dtype=object)>, 'artist_followers_can': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([724908.], dtype=float32)>, 'artist_genres_can': <tf.Tensor: shape=(1,), dtype=string, numpy=
array([b"'dance po

**Finished**