# Movielens download

1. download MovieLens 1M public dataset
2. Write datasets to TF-Records
3. Generate dataset vocabulary and look-up dictionaries

## imports

In [1]:
import sys
sys.path.append("..")
import env_config

In [2]:
print(f"PREFIX: {env_config.PREFIX}")
print(f"PROJECT_ID: {env_config.PROJECT_ID}")
print(f"LOCATION: {env_config.LOCATION}")

PREFIX: jt-towers-v1
PROJECT_ID: hybrid-vertex
LOCATION: us-central1


In [3]:
!pwd

/home/jupyter/crispy_towers/notebooks


In [5]:
import os
import json
from pprint import pprint
import pickle as pkl
import numpy as np
import pandas as pd

import logging
logging.disable(logging.WARNING)
import warnings
warnings.filterwarnings('ignore')

# tensorflow
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
import tensorflow_datasets as tfds

# google cloud
from google.cloud import aiplatform, storage

# cloud storage client
storage_client = storage.Client(project=env_config.PROJECT_ID)
# bucket = storage_client.bucket(BUCKET_NAME)

# Vertex client
aiplatform.init(project=env_config.PROJECT_ID, location=env_config.LOCATION)

sys.path.append("..")
from src.data import data_utils as data_utils

## config: data download and prep

In [6]:
LOCAL_OUTPUT_DIR     = "local_data/examples"
LOCAL_EXTRACT_DIR    = "local_data/raw"

MIN_TIMELINE_LENGTH  = 3
MIN_RATING           = 1

NUM_TRAIN_RECORDS    = 8
NUM_VAL_RECORDS      = 3

BUILD_VOCAB          = True

print(f"EXAMPLE_GEN_GCS_PATH : {env_config.EXAMPLE_GEN_GCS_PATH}")
print(f"TF_RECORD_PREFIX     : {env_config.TF_RECORD_PREFIX}")
print(f"MAX_CONTEXT_LENGTH   : {env_config.MAX_CONTEXT_LENGTH}")
print(f"MAX_GENRE_LENGTH     : {env_config.MAX_GENRE_LENGTH}")

EXAMPLE_GEN_GCS_PATH : data/movielens/m1m
TF_RECORD_PREFIX     : ml1m
MAX_CONTEXT_LENGTH   : 10
MAX_GENRE_LENGTH     : 10


#### create GCS bucket

In [7]:
REGION=env_config.LOCATION
BUCKET_URI=f"gs://{env_config.BUCKET_NAME}"

In [13]:
!gsutil mb -l $REGION $BUCKET_URI

Creating gs://jt-towers-v1-hybrid-vertex-bucket/...


### copy these commands to command line terminal

In [8]:
print("copy these commands into terminal:\n")
print(f"export PROJECT_ID={env_config.PROJECT_ID}")
print(f"export BUCKET_NAME={env_config.BUCKET_NAME}")
print(f"export EXAMPLE_GEN_GCS_PATH={env_config.EXAMPLE_GEN_GCS_PATH}")
print(f"export TF_RECORD_PREFIX={env_config.TF_RECORD_PREFIX}")
print(f"export MAX_GENRE_LENGTH={env_config.MAX_GENRE_LENGTH}")
print(f"export MAX_CONTEXT_LENGTH={env_config.MAX_CONTEXT_LENGTH}")
print(f"export LOCAL_EXTRACT_DIR={LOCAL_EXTRACT_DIR}")
print(f"export LOCAL_OUTPUT_DIR={LOCAL_OUTPUT_DIR}")
print(f"export MIN_TIMELINE_LENGTH={MIN_TIMELINE_LENGTH}")
print(f"export MIN_RATING={MIN_RATING}")
print(f"export BUILD_VOCAB={BUILD_VOCAB}")
print(f"export NUM_TRAIN_RECORDS={NUM_TRAIN_RECORDS}")
print(f"export NUM_VAL_RECORDS={NUM_VAL_RECORDS}")
print(f"chmod +x example_gen_movielens.py")

copy these commands into terminal:

export PROJECT_ID=hybrid-vertex
export BUCKET_NAME=jt-towers-v1-hybrid-vertex-bucket
export EXAMPLE_GEN_GCS_PATH=data/movielens/m1m
export TF_RECORD_PREFIX=ml1m
export MAX_GENRE_LENGTH=10
export MAX_CONTEXT_LENGTH=10
export LOCAL_EXTRACT_DIR=local_data/raw
export LOCAL_OUTPUT_DIR=local_data/examples
export MIN_TIMELINE_LENGTH=3
export MIN_RATING=1
export BUILD_VOCAB=True
export NUM_TRAIN_RECORDS=8
export NUM_VAL_RECORDS=3
chmod +x example_gen_movielens.py


In [15]:
data_gen_command = """python example_gen_movielens.py --project_id=$PROJECT_ID --gcs_bucket_name=$BUCKET_NAME \
--gcs_data_path_prefix=$EXAMPLE_GEN_GCS_PATH \
--tfrecord_prefix=$TF_RECORD_PREFIX \
--local_data_dir=$LOCAL_EXTRACT_DIR \
--local_output_dir=$LOCAL_OUTPUT_DIR \
--min_timeline_length=$MIN_TIMELINE_LENGTH \
--max_context_length=$MAX_CONTEXT_LENGTH \
--max_context_movie_genre_length=$MAX_GENRE_LENGTH \
--min_rating=$MIN_RATING \
--train_data_fraction=0.9 \
--build_vocabs=$BUILD_VOCAB \
--num_train_tfrecords=$NUM_TRAIN_RECORDS \
--num_test_tfrecords=$NUM_VAL_RECORDS \
"""
print(data_gen_command)

python example_gen_movielens.py --project_id=$PROJECT_ID --gcs_bucket_name=$BUCKET_NAME --gcs_data_path_prefix=$EXAMPLE_GEN_GCS_PATH --tfrecord_prefix=$TF_RECORD_PREFIX --local_data_dir=$LOCAL_EXTRACT_DIR --local_output_dir=$LOCAL_OUTPUT_DIR --min_timeline_length=$MIN_TIMELINE_LENGTH --max_context_length=$MAX_CONTEXT_LENGTH --max_context_movie_genre_length=$MAX_GENRE_LENGTH --min_rating=$MIN_RATING --train_data_fraction=0.9 --build_vocabs=$BUILD_VOCAB --num_train_tfrecords=$NUM_TRAIN_RECORDS --num_test_tfrecords=$NUM_VAL_RECORDS 


# Confim tfrecords

In [9]:
GCS_DATA_PATH = f"{env_config.BUCKET_URI}/{env_config.EXAMPLE_GEN_GCS_PATH}"
print(f"GCS_DATA_PATH : {GCS_DATA_PATH}")

! gsutil ls $GCS_DATA_PATH

GCS_DATA_PATH : gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m
gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/
gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/val/
gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/vocabs/


In [10]:
all_files = []
for blob in storage_client.list_blobs(
    f"{env_config.BUCKET_NAME}", 
    prefix=f'{env_config.EXAMPLE_GEN_GCS_PATH}/train/', 
    # delimiter='/'
):
    if '.tfrecord' in blob.name:
        all_files.append(blob.public_url.replace("https://storage.googleapis.com/", "gs://"))
        
all_files

['gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-001-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-002-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-003-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-004-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-005-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-006-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-007-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-008-of-008.tfrecord']

In [11]:
for blob in storage_client.list_blobs(
    f"{env_config.BUCKET_NAME}", 
    prefix=f'{env_config.EXAMPLE_GEN_GCS_PATH}/val/', 
    # delimiter='/'
):
    if '.tfrecord' in blob.name:
        all_files.append(blob.public_url.replace("https://storage.googleapis.com/", "gs://"))
        
all_files

['gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-001-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-002-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-003-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-004-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-005-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-006-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-007-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/train/ml1m-008-of-008.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/val/ml1m-001-of-003.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/val/ml1m-002-of-003.tfrecord',
 'gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/val/ml1m-003-o

In [13]:
mv_dataset = tf.data.TFRecordDataset(all_files)
# train_dataset = train_dataset.map(movielens_ds_utils.parse_tfrecord)
mv_dataset = mv_dataset.map(data_utils._parse_function)

for x in mv_dataset.batch(1).take(1):
    pprint(x)

{'context_movie_genre': <tf.Tensor: shape=(1, 10), dtype=string, numpy=
array([[b'Animation', b'Comedy', b'Thriller', b'Animation',
        b"Children's", b'Comedy', b'Musical', b'Animation',
        b"Children's", b'Comedy']], dtype=object)>,
 'context_movie_id': <tf.Tensor: shape=(1, 10), dtype=string, numpy=
array([[b'735', b'2009', b'2012', b'2965', b'3682', b'907', b'1268',
        b'2231', b'887', b'902']], dtype=object)>,
 'context_movie_rating': <tf.Tensor: shape=(1, 10), dtype=float32, numpy=array([[5., 4., 3., 2., 4., 3., 5., 4., 3., 4.]], dtype=float32)>,
 'context_movie_title': <tf.Tensor: shape=(1, 10), dtype=string, numpy=
array([[b'Close Shave, A (1995)', b'Jungle Book, The (1967)',
        b'Little Mermaid, The (1989)', b'Robin Hood (1973)',
        b'Chicken Run (2000)', b'Wizard of Oz, The (1939)',
        b'This Is Spinal Tap (1984)', b'Producers, The (1968)',
        b"Singin' in the Rain (1952)", b'My Fair Lady (1964)']],
      dtype=object)>,
 'context_movie_year'

# Validate vocab file

In [14]:
VOCAB_FILENAME="vocab_dict.pkl"

In [15]:
EXISTING_VOCAB_FILE = f'gs://{env_config.BUCKET_NAME}/{env_config.EXAMPLE_GEN_GCS_PATH}/vocabs/{VOCAB_FILENAME}'
print(f"Downloading vocab...")

os.system(f'gsutil -q cp {EXISTING_VOCAB_FILE} .')
print(f"Downloaded vocab from: {EXISTING_VOCAB_FILE}\n")

filehandler = open(VOCAB_FILENAME, 'rb')
vocab_dict = pkl.load(filehandler)
filehandler.close()

for key in vocab_dict.keys():
    pprint(key)

Downloading vocab...
Downloaded vocab from: gs://jt-towers-v1-hybrid-vertex-bucket/data/movielens/m1m/vocabs/vocab_dict.pkl

'movie_id'
'movie_year'
'movie_genre'
'movie_title'
'user_id'
'user_gender_vocab'
'user_age_vocab'
'user_occ_vocab'
'user_zip_vocab'
'min_timestamp'
'max_timestamp'
'timestamp_buckets'


In [25]:
list_of_years = vocab_dict['movie_year'] #.tolist()

min_yr = min(list_of_years)
max_yr = max(list_of_years)

print(f"min: {min_yr}; max: {max_yr}")

min: 0; max: 2000


In [16]:
MOVIELENS_NUM_MOVIES = len(vocab_dict['movie_id'])
MOVIELENS_NUM_USERS = len(vocab_dict['user_id'])

print(f"MOVIELENS_NUM_MOVIES : {MOVIELENS_NUM_MOVIES}")
print(f"MOVIELENS_NUM_USERS  : {MOVIELENS_NUM_USERS}")

MOVIELENS_NUM_MOVIES : 3884
MOVIELENS_NUM_USERS  : 6041


# Create look-up dictionaries

In [23]:
USER_AGE_LOOKUP = data_utils.get_dictionary_lookup_by_tf_data_key(
    key = 'user_age'
    , dataset = mv_dataset
)

USER_AGE_DIM = len(USER_AGE_LOOKUP)
print(f"USER_AGE_DIM: {USER_AGE_DIM}")


USER_OCC_LOOKUP = data_utils.get_dictionary_lookup_by_tf_data_key(
    key = 'user_occupation_text'
    , dataset= mv_dataset
)
USER_OCC_LOOKUP[b''] = 21
USER_OCC_DIM = len(USER_OCC_LOOKUP)
print(f"USER_OCC_DIM: {USER_OCC_DIM}")

USER_AGE_DIM: 8
USER_OCC_DIM: 22


In [24]:
config = f'''
USER_AGE_LOOKUP       = {USER_AGE_LOOKUP}
USER_AGE_DIM          = {USER_AGE_DIM}
USER_OCC_LOOKUP       = {USER_OCC_LOOKUP}
USER_OCC_DIM          = {USER_OCC_DIM}
MOVIELENS_NUM_MOVIES  = {MOVIELENS_NUM_MOVIES}
MOVIELENS_NUM_USERS   = {MOVIELENS_NUM_USERS}
'''
    
with open(f'../src/data/mv_lookup_dicts.py', 'w') as f:
    f.write(config)

In [33]:
df_all = tfds.as_dataframe(mv_dataset.take(20))
df_all.head(5)

Unnamed: 0,context_movie_genre,context_movie_id,context_movie_rating,context_movie_title,context_movie_year,context_rating_timestamp,target_movie_genres,target_movie_id,target_movie_rating,target_movie_title,target_movie_year,target_rating_timestamp,user_age,user_gender,user_id,user_occupation_text,user_zip_code
0,"[b'Animation', b'Comedy', b'Thriller', b'Anima...","[b'735', b'2009', b'2012', b'2965', b'3682', b...","[5.0, 4.0, 3.0, 2.0, 4.0, 3.0, 5.0, 4.0, 3.0, ...","[b'Close Shave, A (1995)', b'Jungle Book, The ...","[1995, 1967, 1989, 1973, 2000, 1939, 1984, 196...","[962569921, 962569966, 962569966, 962569966, 9...","[b""Children's"", b'Comedy', b'Musical', b'UNK',...",b'1015',3.0,b'Mary Poppins (1964)',1964,962570100,25,b'M',b'5027',b'programmer',b'92404'
1,"[b'Drama', b'War', b'Drama', b'Comedy', b'Dram...","[b'40', b'1170', b'1223', b'3599', b'2993', b'...","[4.0, 3.0, 3.0, 4.0, 4.0, 2.0, 2.0, 3.0, 5.0, ...","[b'Richard III (1995)', b'Passion Fish (1992)'...","[1995, 1992, 1990, 1968, 1962, 1992, 1980, 198...","[965267494, 965267527, 965267527, 965267527, 9...","[b'Drama', b'UNK', b'UNK', b'UNK', b'UNK', b'U...",b'148',5.0,b'Apollo 13 (1995)',1995,965267578,25,b'M',b'4302',b'artist',b'91910'
2,"[b'Horror', b'Drama', b'UNK', b'UNK', b'UNK', ...","[b'2390', b'1104', b'0', b'0', b'0', b'0', b'0...","[4.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[b'Texas Chainsaw Massacre, The (1974)', b'Peo...","[1974, 1996, 0, 0, 0, 0, 0, 0, 0, 0]","[975796636, 975796636, 0, 0, 0, 0, 0, 0, 0, 0]","[b'Comedy', b'UNK', b'UNK', b'UNK', b'UNK', b'...",b'859',1.0,"b""Gone Fishin' (1997)""",1997,975796636,18,b'M',b'620',b'college/grad student',b'93560'
3,"[b'Drama', b'Romance', b'Drama', b'Comedy', b'...","[b'890', b'1282', b'1061', b'2484', b'1202', b...","[4.0, 3.0, 3.0, 3.0, 4.0, 4.0, 5.0, 0.0, 0.0, ...","[b""Breakfast at Tiffany's (1961)"", b'Field of ...","[1961, 1989, 1973, 1960, 1980, 1982, 1977, 0, ...","[965524610, 965524610, 965524610, 965524646, 9...","[b'Horror', b'UNK', b'UNK', b'UNK', b'UNK', b'...",b'2949',3.0,b'Re-Animator (1985)',1985,965612488,35,b'M',b'4018',b'technician/engineer',b'54601'
4,"[b'Comedy', b'Drama', b'Comedy', b'Thriller', ...","[b'367', b'1434', b'2734', b'2226', b'365', b'...","[3.0, 3.0, 4.0, 4.0, 3.0, 3.0, 1.0, 3.0, 1.0, ...","[b'Paper, The (1994)', b'Vegas Vacation (1997)...","[1994, 1997, 1993, 1998, 1994, 1995, 1999, 199...","[974614949, 974615030, 974615030, 974615067, 9...","[b'Thriller', b'UNK', b'UNK', b'UNK', b'UNK', ...",b'3150',4.0,b'Pacific Heights (1990)',1990,974615327,50,b'M',b'2172',b'retired',b'87502'


# Create parsed candidate dataset

> for help, see [Load data from DataFrame](https://cloud.google.com/bigquery/docs/samples/bigquery-load-table-dataframe#code-sample) from BigQuery code samples

## Create BQ dataset

In [38]:
import json 
import tensorflow_datasets as tfds
from google.cloud import bigquery

# bigquery client
bqclient = bigquery.Client(project=env_config.PROJECT_ID)

In [39]:
BQ_DATASET_NAME = f"{env_config.BUCKET_NAME.lower().replace(env_config.PROJECT_ID,'')}"
BQ_DATASET_NAME = BQ_DATASET_NAME.replace('bucket','').replace('-','_').replace('__','_').rstrip("_")

BQ_TABLE_NAME = "test_tfrecord_dump"

BQ_TABLE_REF = f"{env_config.PROJECT_ID}.{BQ_DATASET_NAME}.{BQ_TABLE_NAME}"

print(f"BUCKET_NAME     = {env_config.BUCKET_NAME}")
print(f"BUCKET_URI      = {env_config.BUCKET_URI}")
print(f"BQ_DATASET_NAME = {BQ_DATASET_NAME}")
print(f"BQ_TABLE_REF    = {BQ_TABLE_REF}")

BUCKET_NAME     = jt-towers-v1-hybrid-vertex-bucket
BUCKET_URI      = gs://jt-towers-v1-hybrid-vertex-bucket
BQ_DATASET_NAME = jt_towers_v1
BQ_TABLE_REF    = hybrid-vertex.jt_towers_v1.test_tfrecord_dump


In [40]:
ds = bigquery.Dataset(f"{env_config.PROJECT_ID}.{BQ_DATASET_NAME}")
ds.location = "US"
ds = bqclient.create_dataset(dataset = ds, exists_ok = False)
print(ds.full_dataset_id)

hybrid-vertex:jt_towers_v1


### write tf examples to json

In [69]:
# BIGQUERY_TMP_FILE = "tmp_bq.json"
# BATCH_SIZE= 5 # 1_000

# with open(BIGQUERY_TMP_FILE, "w") as f:
#     for example in mv_dataset.batch(BATCH_SIZE, drop_remainder=False).as_numpy_iterator():
#         f.write(json.dumps(example) + "\n")
#         break

In [59]:
# BIGQUERY_TMP_FILE = "tmp_bq.json"
# BATCH_SIZE= 5 # 1_000

# with open(BIGQUERY_TMP_FILE, "w") as fp:
#     for example in mv_dataset.batch(BATCH_SIZE, drop_remainder=False).as_numpy_iterator():
#         json.dump(example, fp)
#         break

> see [this example](https://github.com/tottenjordan/tf_vertex_agents/blob/main/02-supervised-to-bandit-training/02g-example-optimized-dataset.ipynb)

```
def build_dict_from_trajectory(
    trajectory : trajectories.Trajectory
) -> Dict[str, Any]:
    trajectory_dict = {
        "step_type": trajectory.step_type.numpy().tolist(),
        "observation": [
            {
                "observation_batch": batch
            } for batch in trajectory.observation['global'].numpy().tolist()
        ],
        "chosen_arm_features": [
            {
                "chosen_arm_features_batch": batch
            } for batch in trajectory.policy_info.chosen_arm_features.numpy().tolist()
        ],
        "action": trajectory.action.numpy().tolist(),
        "next_step_type": trajectory.next_step_type.numpy().tolist(),
        "reward": trajectory.reward.numpy().tolist(),
        "discount": trajectory.discount.numpy().tolist(),
    }
    return trajectory_dict
```

In [82]:
from typing import Dict, Any

def build_dict_from_trajectory(
    example,
) -> Dict[str, Any]:
    ex_dict = {
        "context": [
            {
                "context_movie_id": batch 
            } for batch in example['context_movie_id'].numpy().tolist()
            # {
            #     "context_movie_rating": batch 
            # } for batch in example['context_movie_rating'].numpy().tolist()
            # {
            #     "context_rating_timestamp": batch 
            # } for batch in example['context_rating_timestamp'].numpy().tolist()
            # {
            #     "context_movie_genre": batch 
            # } for batch in example['context_movie_genre'].numpy().tolist()
            # {
            #     "context_movie_year": batch 
            # } for batch in example['context_movie_year'].numpy().tolist()
            # {
            #     "context_movie_title": batch 
            # } for batch in example['context_movie_title'].numpy().tolist()
        ],
        "target_movie_id": example['target_movie_id'].numpy().tolist(),
        "target_movie_rating": example['target_movie_rating'].numpy().tolist(),
        "target_rating_timestamp": example['target_rating_timestamp'].numpy().tolist(),
        "target_movie_genres": example['target_movie_genres'].numpy().tolist(),
        "target_movie_year": example['target_movie_year'].numpy().tolist(),
        "target_movie_title": example['target_movie_title'].numpy().tolist(),
    }
    return ex_dict

In [83]:
for x in mv_dataset.batch(10).take(1):
    # test_traj = my_trajectory_fn(x)
    test_dict = build_dict_from_trajectory(x)
    break

x

{'context_movie_genre': <tf.Tensor: shape=(10, 10), dtype=string, numpy=
 array([[b'Animation', b'Comedy', b'Thriller', b'Animation',
         b"Children's", b'Comedy', b'Musical', b'Animation',
         b"Children's", b'Comedy'],
        [b'Drama', b'War', b'Drama', b'Comedy', b'Drama', b'Drama',
         b'Romance', b'Action', b'Drama', b'War'],
        [b'Horror', b'Drama', b'UNK', b'UNK', b'UNK', b'UNK', b'UNK',
         b'UNK', b'UNK', b'UNK'],
        [b'Drama', b'Romance', b'Drama', b'Comedy', b'Sci-Fi', b'Horror',
         b'Sci-Fi', b'Thriller', b'Action', b'Comedy'],
        [b'Comedy', b'Drama', b'Comedy', b'Thriller', b'Comedy', b'Drama',
         b'Drama', b'Action', b'Adventure', b'Horror'],
        [b'Comedy', b'Drama', b'Action', b'Crime', b'Drama',
         b'Documentary', b'Crime', b'Drama', b'Drama', b'Comedy'],
        [b'Drama', b'Drama', b'Drama', b'Drama', b'Adventure', b'Drama',
         b'Adventure', b'Drama', b'Drama', b'Adventure'],
        [b'Drama', b'Sci-F

In [85]:
test_dict

{'context': [{'context_movie_id': [b'735',
    b'2009',
    b'2012',
    b'2965',
    b'3682',
    b'907',
    b'1268',
    b'2231',
    b'887',
    b'902']},
  {'context_movie_id': [b'40',
    b'1170',
    b'1223',
    b'3599',
    b'2993',
    b'3192',
    b'3580',
    b'3002',
    b'1885',
    b'2329']},
  {'context_movie_id': [b'2390',
    b'1104',
    b'0',
    b'0',
    b'0',
    b'0',
    b'0',
    b'0',
    b'0',
    b'0']},
  {'context_movie_id': [b'890',
    b'1282',
    b'1061',
    b'2484',
    b'1202',
    b'1081',
    b'3691',
    b'0',
    b'0',
    b'0']},
  {'context_movie_id': [b'367',
    b'1434',
    b'2734',
    b'2226',
    b'365',
    b'145',
    b'2548',
    b'2547',
    b'1469',
    b'556']},
  {'context_movie_id': [b'2928',
    b'2890',
    b'2843',
    b'3113',
    b'3714',
    b'3339',
    b'3827',
    b'3841',
    b'3823',
    b'3684']},
  {'context_movie_id': [b'2329',
    b'2860',
    b'1771',
    b'3002',
    b'3099',
    b'1069',
    b'3278',
    b'586'

In [76]:
# type(example)
example

{'context_movie_genre': array([[b'Animation', b'Comedy', b'Thriller', b'Animation',
         b"Children's", b'Comedy', b'Musical', b'Animation',
         b"Children's", b'Comedy'],
        [b'Drama', b'War', b'Drama', b'Comedy', b'Drama', b'Drama',
         b'Romance', b'Action', b'Drama', b'War'],
        [b'Horror', b'Drama', b'UNK', b'UNK', b'UNK', b'UNK', b'UNK',
         b'UNK', b'UNK', b'UNK'],
        [b'Drama', b'Romance', b'Drama', b'Comedy', b'Sci-Fi', b'Horror',
         b'Sci-Fi', b'Thriller', b'Action', b'Comedy'],
        [b'Comedy', b'Drama', b'Comedy', b'Thriller', b'Comedy', b'Drama',
         b'Drama', b'Action', b'Adventure', b'Horror']], dtype=object),
 'context_movie_id': array([[b'735', b'2009', b'2012', b'2965', b'3682', b'907', b'1268',
         b'2231', b'887', b'902'],
        [b'40', b'1170', b'1223', b'3599', b'2993', b'3192', b'3580',
         b'3002', b'1885', b'2329'],
        [b'2390', b'1104', b'0', b'0', b'0', b'0', b'0', b'0', b'0', b'0'],
        [b'

### Load tmp file to BigQuery table

In [74]:
job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("user_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("user_gender", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("user_age", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("user_occupation_text", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("user_zip_code", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField(
            "a",
            "RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("context_movie_id", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
                bigquery.SchemaField("context_movie_id", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
                bigquery.SchemaField("context_movie_rating", bigquery.enums.SqlTypeNames.FLOAT, mode="REPEATED"),
                bigquery.SchemaField("context_rating_timestamp", bigquery.enums.SqlTypeNames.INTEGER, mode="REPEATED"),
                bigquery.SchemaField("context_movie_genre", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
                bigquery.SchemaField("context_movie_year", bigquery.enums.SqlTypeNames.INTEGER, mode="REPEATED"),
                bigquery.SchemaField("context_movie_title", bigquery.enums.SqlTypeNames.STRING, mode="REPEATED"),
            ]
        ),
        bigquery.SchemaField("target_movie_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("target_movie_rating", bigquery.enums.SqlTypeNames.FLOAT),
        bigquery.SchemaField("target_rating_timestamp", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("target_movie_genres", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("target_movie_year", bigquery.enums.SqlTypeNames.INTEGER),
        bigquery.SchemaField("target_movie_title", bigquery.enums.SqlTypeNames.STRING),
    ],
    # source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
)

In [75]:
## using tmp json file
# with open(BIGQUERY_TMP_FILE, "rb") as source_file:
#     load_job = bqclient.load_table_from_file(
#         source_file, BQ_TABLE_REF, job_config=job_config
#     )

# using dataframe
job = bqclient.load_table_from_dataframe(
    df_all, BQ_TABLE_REF, job_config=job_config
)
    
job.result()  # Wait for the job to complete.

table = bqclient.get_table(BQ_TABLE_REF)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), BQ_TABLE_REF
    )
)

ValueError: bq_schema contains fields not present in dataframe: {'a'}

In [71]:
BQ_TABLE_REF

'hybrid-vertex.jt_towers_v1.test_tfrecord_dump'

## Run Dataflow pipeline

In [None]:
# ! python3 main.py \
#     $PROJECT_ID \        # 1
#     $VPC_NETWORK_NAME \  # 2
#     $REGION \            # 3
#     $DATA_VERSION \      # 4
#     $BUCKET_NAME \       # 5
#     $CANDIDATE_PREFIX \  # 6
#     $BQ_DATASET \        # 7
#     $BQ_TABLE_CANDIDATES # 8

# gitignore

In [None]:
rm -rf `find . -name "*.ipynb_checkpoints" -o -name "*.cpython-310.pyc" -o -name "__pycache__"`

In [26]:
%%writefile .gitignore
*.cpython-310.pyc
*checkpoint*
*.ipynb_checkpoints/*
*/data/local_data/*
*.csv
*__pycache__
*cpython-37.pyc
.gitignore

Writing .gitignore


# Clean up

In [None]:
# (optional) delete local
# cd src/data
# rm -rf local_data

**finished**