In [2]:
# !pip install apache-beam[gcp]

In [44]:
import os
import tensorflow as tf 
import apache_beam as beam 
import tensorflow_data_validation as tfdv
from google.cloud import bigquery
from datetime import datetime
import numpy as np

# from tensorflow_transform.tf_metadata import dataset_metadata
# from tensorflow_transform.tf_metadata import dataset_schema

print("TF version:", tf.__version__)
print("TFDV version:", tfdv.__version__)
print("Beam version:", beam.__version__)
print("BQ SDK version:", bigquery.__version__)

TF version: 2.9.0-rc2
TFDV version: 1.9.0
Beam version: 2.40.0
BQ SDK version: 2.34.4


# Apache Beam Pipeline -> write tf-records from BQ

### Table rows
* BigQueryIO read and write transforms produce and consume data as a `PCollection` of dictionaries, where each element in the `PCollection` represents a single row in the table

### Schemas
* When writing to BigQuery, you must supply a table schema for the destination table that you want to write to, unless you specify a create disposition of CREATE_NEVER. 
* Creating a table schema covers schemas in more detail.

In [8]:
PROJECT_ID = 'hybrid-vertex'
BUCKET_NAME = 'spotify-beam-v1' # 'spotify-tfrecords-blog' # Set your Bucket name
REGION = 'us-central1' # Set the region for Dataflow jobs
VERSION = 'v1'

ROOT = f'gs://{BUCKET_NAME}/{VERSION}'

CANDIDATE_DIR = ROOT + "/candidates/"
DATA_DIR = ROOT + '/data/' # Location to store data
STATS_DIR = ROOT +'/stats/' # Location to store stats 
STAGING_DIR = ROOT + '/job/staging/' # Dataflow staging directory on GCP
TEMP_DIR =  ROOT + '/job/temp/' # Dataflow temporary directory on GCP

### Cleanup working directory

In [18]:
# test_root ='gs://spotify-tfrecords-blog/candidate_tracks_v2'
# files = np.array(tf.io.gfile.listdir(test_root))
# for file in files:
#     file_to_remove = f'{test_root}/{file}'
#     print(file_to_remove)
#     break
#     # tf.io.gfile.remove(f'{test_root}/{file})
# # files

In [19]:
if tf.io.gfile.exists(ROOT):
    print("Removing {} contents...".format(ROOT))
    files = np.array(tf.io.gfile.listdir(ROOT))
    for file in files:
        tf.io.gfile.remove(f'{ROOT}/{file}')

print("Creating working directory: {}".format(ROOT))
tf.io.gfile.mkdir(ROOT)

Creating working directory: gs://spotify-beam-v1/v1


## Extract data from BigQuery to GCS

* extract data from BigQuery, 
* convert data to TFRecord files, 
* store data files in Google Cloud Storage (GCS). 
> * This data file in GCS will then be used by TFDV for stats & vocabs

### Define SQL Query

In [20]:
def generate_query(project=None, bq_dataset=None, bq_table=None, limit=None):
    query =f"""
        SELECT *
        FROM 
          `{project}.{bq_dataset}.{bq_table}`
        """
    if limit:
        query  += "LIMIT {}".format(limit)
        
    return query

In [21]:
from tensorflow.train import BytesList, Feature, FeatureList, Int64List, FloatList
from tensorflow.train import SequenceExample, FeatureLists


def string_array(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[v.encode('utf-8') for v in value]))


def float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[float(v) for v in value]))


def int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[int(v) for v in value]))


def float_feature_list(value):
    """Returns a list of float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))

def get_type_map(query):
    bq_client = bigquery.Client()
    query_job = bq_client.query(f"{query}")
    results = query_job.result()
    
    type_map = {}
    
    for field in results.schema:
        type_map[field.name] = field.field_type
    
    return type_map

# def get_mode_map(query):
#     bq_client = bigquery.Client()
#     query_job = bq_client.query(f"{query}")
#     results = query_job.result()
    
#     mode_map = {}
    
#     for field in results.schema:
#         mode_map[field.name] = field.mode
    
#     return mode_map


def row_to_example(instance, type_map):
    feature = {}
    for key, value in instance.items():
        data_type = type_map[key]
        if value is None:
            feature[key] = tf.train.Feature()
        elif data_type == 'INTEGER':
            feature[key] = tf.train.Feature(
                int64_list=tf.train.Int64List(value=[value]))
        elif data_type == 'FLOAT':
            feature[key] = tf.train.Feature(
                float_list=tf.train.FloatList(value=[value]))
        else:
            feature[key] = tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(value)]))
            
    return tf.train.Example(features=tf.train.Features(feature=feature))

## Submit Pipeline

In [22]:
def run_pipeline(args):

    source_query = args.pop('source_query')
    sink_data_location = args.pop('sink_data_location')
    runner = args['runner']
    
    pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)
    print(pipeline_options)
    
    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        (pipeline 
         | "Read from BigQuery">> beam.io.Read(beam.io.BigQuerySource(query = source_query, use_standard_sql = True))
         | 'Convert to tf Example' >> beam.Map(lambda instance: row_to_example(instance, type_map))
         | 'Serialize to String' >> beam.Map(lambda example: example.SerializeToString(deterministic=True))
         | "Write as TFRecords to GCS" >> beam.io.WriteToTFRecord(
                    file_path_prefix = sink_data_location+"candidates", 
                    file_name_suffix=".tfrecord")
        )

In [23]:
runner = 'DataflowRunner'
job_name = 'spotify-tfrecord-beam-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))


print("Generating source query...")
LIMIT = 10000
PROJECT_ID='hybrid-vertex'
BQ_DATASET='spotify_train_4'
BQ_TABLE = 'candidate_features_v2'

data_location = CANDIDATE_DIR

source_query = generate_query(project=PROJECT_ID, bq_dataset=BQ_DATASET, bq_table=BQ_TABLE, limit=LIMIT)

print("Retrieving data type...")
type_map = get_type_map(source_query)

args = {
    'job_name': job_name,
    'runner': runner,
    'source_query': source_query,
    'type_map': type_map,
    'sink_data_location': data_location,
    'project': PROJECT_ID,
    'region': REGION,
    'staging_location': STAGING_DIR,
    'temp_location': TEMP_DIR,
    # 'save_main_session': True,
    # 'setup_file': './setup.py'
}
print("Pipeline args are set.")

Generating source query...
Retrieving data type...
Pipeline args are set.


In [25]:
# tf.logging.set_verbosity(tf.logging.ERROR)

print("Running data extraction pipeline...")
run_pipeline(args)
print("Pipeline is done.")

Running data extraction pipeline...
GoogleCloudOptions(create_from_snapshot=None, dataflow_endpoint=https://dataflow.googleapis.com, dataflow_kms_key=None, dataflow_service_options=None, enable_artifact_caching=False, enable_hot_key_logging=False, enable_streaming_engine=False, flexrs_goal=None, impersonate_service_account=None, job_name=spotify-tfrecord-beam-220711-142630, labels=None, no_auth=False, project=hybrid-vertex, region=us-central1, service_account_email=None, staging_location=gs://spotify-beam-v1/v1/job/staging/, temp_location=gs://spotify-beam-v1/v1/job/temp/, template_location=None, transform_name_mapping=None, update=False)


  from ipykernel import kernelapp as app


  temp_location = pcoll.pipeline.options.view_as(


AssertionError: Job did not reach to a terminal state after waiting indefinitely. Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2022-07-11_07_27_47-12338818112686689332?project=<ProjectId>

In [None]:
#!gsutil ls {DATA_DIR}/*
!ls {CANDIDATE_DIR}/*

# Version 2

### Reading from BigQuery

* To read an entire BigQuery table, use the table parameter with the BigQuery table name.

### Define Table

In [31]:
PROJECT_ID = 'hybrid-vertex'
BQ_DATASET = 'spotify_train_3'
BQ_TABLE = 'train_flatten'

table_spec = f'{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}'

In [39]:
CONFIG = {
    "data": {
        "bq_source_table": table_spec,
        "schema": [
            # playlist - context features
            { "name": "name", "kind": "string", "mode": "NULLABLE" },
            { "name": "collaborative", "kind": "string", "mode": "NULLABLE" },
            { "name": "n_songs_pl", "kind": "float32", "mode": "NULLABLE" },
            { "name": "num_artists_pl", "kind": "float32", "mode": "NULLABLE" },
            { "name": "num_albums_pl", "kind": "float32", "mode": "NULLABLE" },
            { "name": "description_pl", "kind": "string", "mode": "NULLABLE" },
            # seed track - context features
            { "name": "track_name_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_name_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "album_name_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "track_uri_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_uri_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "album_uri_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "duration_seed_track", "kind": "float32", "mode": "NULLABLE" },
            { "name": "track_pop_seed_track", "kind": "float32", "mode": "NULLABLE" },
            { "name": "artist_pop_seed_track", "kind": "float32", "mode": "NULLABLE" },
            { "name": "artist_genres_seed_track", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_followers_seed_track", "kind": "float32", "mode": "NULLABLE" },
            # candidate - context features
            { "name": "track_name_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_name_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "album_name_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "track_uri_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_uri_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "album_uri_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "duration_ms_can", "kind": "float32", "mode": "NULLABLE" },
            { "name": "track_pop_can", "kind": "float32", "mode": "NULLABLE" },
            { "name": "artist_pop_can", "kind": "float32", "mode": "NULLABLE" },
            { "name": "artist_genres_can", "kind": "string", "mode": "NULLABLE" },
            { "name": "artist_followers_can", "kind": "float32", "mode": "NULLABLE" },
            # Ragged Features
            { "name": "track_name_pl", "kind": "string", "mode": "REPEATED" },
            { "name": "artist_name_pl", "kind": "string", "mode": "REPEATED" },
            { "name": "album_name_pl", "kind": "string", "mode": "REPEATED" },
            { "name": "track_uri_pl", "kind": "string", "mode": "REPEATED" },
            { "name": "duration_ms_songs_pl", "kind": "float32", "mode": "REPEATED" },
            { "name": "artist_pop_pl", "kind": "float32", "mode": "REPEATED" },
            { "name": "artists_followers_pl", "kind": "float32", "mode": "REPEATED" },
            { "name": "track_pop_pl", "kind": "float32", "mode": "REPEATED" },
            { "name": "artist_genres_pl", "kind": "string", "mode": "REPEATED" },
        ]
    },
    "modes": [
        { "name": "analysis" },
        { "name": "training", "transform": "analysis", "shuffle": "True" },
        { "name": "validation", "transform": "analysis" },
        { "name": "testing", "transform": "identity" }
    ]
}

In [45]:
# Read the SQL code
table = CONFIG['data']['bq_source_table'] #.read()
schema = CONFIG['data']['schema']

# Create a BigQuery source
source = beam.io.gcp.bigquery.ReadFromBigQuery(table=table, use_standard_sql=True, flatten_results=False)
source
# Create metadata needed later
# spec = schema.to_feature_spec()
# meta = dataset_metadata.DatasetMetadata(
#     schema=dataset_schema.from_feature_spec(spec)
# )

<ReadFromBigQuery(PTransform) label=[ReadFromBigQuery] at 0x7f2aecbdcf10>

In [42]:
schema_test = CONFIG['data']['schema']
# schema_test.to_feature_spec()

In [None]:
# Read the SQL code
table = open(config['data']['bq_source_table']).read()

# Create a BigQuery source
source = beam.io.BigQuerySource(table=table, use_standard_sql=True, flatten_results=False)

# Create metadata needed later
spec = schema.to_feature_spec()
meta = dataset_metadata.DatasetMetadata(
    schema=dataset_schema.from_feature_spec(spec)
)

data = pipeline \
    | 'read' >> beam.io.Read(source)