# Using Feature Store for training and serving

In [1]:
import os

# The Google Cloud Notebook product has specific requirements
IS_GOOGLE_CLOUD_NOTEBOOK = os.path.exists("/opt/deeplearning/metadata/env_version")

# Google Cloud Notebook requires dependencies to be installed with '--user'
USER_FLAG = ""
if IS_GOOGLE_CLOUD_NOTEBOOK:
    USER_FLAG = "--user"

In [1]:
#!pip install --user kfp==1.6.4
#!pip install --user tfx==1.0.0-rc1
#!pip install --user google-cloud-bigquery-datatransfer==3.2.0
#!pip install --user google-cloud-pipeline-components==0.1.1
#!pip install --user google-cloud-aiplatform==1.1.1

In [2]:
#!pip3 install {USER_FLAG} --upgrade git+https://github.com/googleapis/python-aiplatform.git@main-test

In [3]:
import copy
import numpy as np
import os
import pprint
import pandas as pd
import random
import tensorflow as tf
import time

from google.cloud import aiplatform
from google.cloud import bigquery_datatransfer
from google.cloud import bigquery
from google.cloud import exceptions

from google.cloud.aiplatform_v1beta1 import (
    FeaturestoreOnlineServingServiceClient, FeaturestoreServiceClient)
from google.cloud.aiplatform_v1beta1.types import FeatureSelector, IdMatcher
from google.cloud.aiplatform_v1beta1.types import \
    entity_type as entity_type_pb2
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore as featurestore_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_monitoring as featurestore_monitoring_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_online_service as featurestore_online_service_pb2
from google.cloud.aiplatform_v1beta1.types import \
    featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.protobuf.duration_pb2 import Duration

## Configure lab settings

In [4]:
PROJECT_ID = 'jsb-demos'
REGION = 'us-central1'
PREFIX = 'jsb_taxi'

API_ENDPOINT = f'{REGION}-aiplatform.googleapis.com'

## Create Featurestore clients

Admin client for CRUD operations.

In [5]:
admin_client = FeaturestoreServiceClient(client_options={"api_endpoint": API_ENDPOINT})
BASE_RESOURCE_PATH = admin_client.common_location_path(PROJECT_ID, REGION)

Data client for accessing features.

In [6]:
data_client = FeaturestoreOnlineServingServiceClient(
    client_options={"api_endpoint": API_ENDPOINT}
)


## Create Featurestore and define schemas

### Create a feature store

In [7]:
FEATURESTORE_ID = f'{PREFIX}_featurestore'

In [8]:
create_lro = admin_client.create_featurestore(
    featurestore_service_pb2.CreateFeaturestoreRequest(
        parent=BASE_RESOURCE_PATH,
        featurestore_id=FEATURESTORE_ID,
        featurestore=featurestore_pb2.Featurestore(
            online_serving_config=featurestore_pb2.Featurestore.OnlineServingConfig(
                fixed_node_count=3
            ),
        ),
    )
)

In [9]:
create_lro.result() # Wait

name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore"

#### List feature stores

In [10]:
admin_client.list_featurestores(parent=BASE_RESOURCE_PATH)

ListFeaturestoresPager<featurestores {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore"
  create_time {
    seconds: 1627542424
    nanos: 234977000
  }
  update_time {
    seconds: 1627542424
    nanos: 310537000
  }
  etag: "AMEw9yN_YKsFNlzKT-_4Vj4pqdWUIKZnyl3QjmuS07sjq8gL-DveCfvRJum6mv_lbcTl"
  online_serving_config {
    fixed_node_count: 3
  }
  state: STABLE
}
featurestores {
  name: "projects/917455532365/locations/us-central1/featurestores/movie_prediction_20210728220735"
  create_time {
    seconds: 1627510307
    nanos: 181123000
  }
  update_time {
    seconds: 1627510307
    nanos: 256385000
  }
  etag: "AMEw9yPlTm7vewh0T_fj8VJKq05G_oBzRyvfMAi4Dipg5wFeXphi7cfQvRWcKXucX_o="
  online_serving_config {
    fixed_node_count: 3
  }
  state: STABLE
}
featurestores {
  name: "projects/917455532365/locations/us-central1/featurestores/sales_propensity_20210728220735"
  create_time {
    seconds: 1627512752
    nanos: 27044000
  }
  update_time

#### Get your feature store

In [11]:
admin_client.get_featurestore(
    name=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID)
)

name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore"
create_time {
  seconds: 1627542424
  nanos: 234977000
}
update_time {
  seconds: 1627542424
  nanos: 310537000
}
etag: "AMEw9yOr1EmCBWfym6bbxPAi7t8lTbYOqXbhLEO4xLdftBqUWRKhpUHExjNzTGiFnRx8"
online_serving_config {
  fixed_node_count: 3
}
state: STABLE

### Create Entity Type

You can specify a monitoring config which will by default be inherited by all Features under this EntityType.

In [12]:
ENTITY_TYPE_ID = 'trips'
DESCRIPTION = 'Taxi trips'

In [13]:
entity_type_lro = admin_client.create_entity_type(
    featurestore_service_pb2.CreateEntityTypeRequest(
        parent=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        entity_type_id=ENTITY_TYPE_ID,
        entity_type=entity_type_pb2.EntityType(
            description=DESCRIPTION,
            monitoring_config=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig(
                snapshot_analysis=featurestore_monitoring_pb2.FeaturestoreMonitoringConfig.SnapshotAnalysis(
                    monitoring_interval=Duration(seconds=86400),  # 1 day
                ),
            ),
        ),
    )
)

# Similarly, wait for EntityType creation operation.
print(entity_type_lro.result())

name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips"
etag: "AMEw9yNsvE_zQ-w1F6U-sb6_qyu5_4R8pppBJn8g41mqNqfGpOg_"



### Create Features

In [14]:
features=[
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Month of a trip",
            ),
            feature_id="trip_month",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Day of a trip",
            ),
            feature_id="trip_day",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Day of a week",
            ),
            feature_id="trip_day_of_week",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Hour of a trip",
            ),
            feature_id="trip_hour",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Trip duration in seconds",
            ),
            feature_id="trip_seconds",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Payment type",
            ),

            feature_id="payment_type",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Pick location",
            ),
            feature_id="pickup_grid",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.STRING,
                description="Dropoff location",
            ),
            feature_id="dropoff_grid",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Euclidean distance between pick up and dropoff",
            ),
            feature_id="euclidean",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.DOUBLE,
                description="Miles travelled during the trip",
            ),
            feature_id="trip_miles",
        ),
        featurestore_service_pb2.CreateFeatureRequest(
            feature=feature_pb2.Feature(
                value_type=feature_pb2.Feature.ValueType.INT64,
                description="Trip tip classification",
            ),
            feature_id="tip_bin",
        ),
    ]

In [15]:
admin_client.batch_create_features(
    parent=admin_client.entity_type_path(PROJECT_ID, REGION, FEATURESTORE_ID, ENTITY_TYPE_ID),
    requests=features
).result()

features {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_month"
  etag: "AMEw9yPshozOPojdCGBvdgQK4NrfTcxzr_zbSZk8SVTgDr2HcAtv"
}
features {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_day"
  etag: "AMEw9yMUsdoa2SMggP6VPUhsPg-i_XDYS-ILjmo1niwa6eR0Rtg_"
}
features {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_day_of_week"
  etag: "AMEw9yOAnwXWkhV3PYCgocpeFQZ4aVVRf72dQI9qhfGRxzI62p2v"
}
features {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_hour"
  etag: "AMEw9yNyvoEN-dK5Be_sWSzBCaNKzbcDaiE07qS1kvVUGjuuCoPr"
}
features {
  name: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_seconds"
  etag: "AMEw9yOvWm_2VvhrNNZnCkTmp8YfqIZ

### Discover features

#### Search for all features across all featurestores

In [16]:
for feature in admin_client.search_features(location=BASE_RESOURCE_PATH):
    print(feature.description)
    print(feature.name)

Dropoff location
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/dropoff_grid
Euclidean distance between pick up and dropoff
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/euclidean
Payment type
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/payment_type
Pick location
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/pickup_grid
Trip tip classification
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/tip_bin
Day of a trip
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_day
Day of a week
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_day_of_week
Hour of a trip
projects/91

#### Search for all features that are of type DOUBLE

In [17]:
features = admin_client.search_features(
    featurestore_service_pb2.SearchFeaturesRequest(
        location=BASE_RESOURCE_PATH, query="value_type=DOUBLE"
    )
)

for feature in features:
    print(feature.description)
    print(feature.name)

Euclidean distance between pick up and dropoff
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/euclidean
Miles travelled during the trip
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/trip_miles
The average rating for the movie, range is [1.0-5.0]
projects/917455532365/locations/us-central1/featurestores/movie_prediction_20210728220735/entityTypes/movies/features/average_rating
Total value of all purchases made so far
projects/917455532365/locations/us-central1/featurestores/sales_propensity_20210728220735/entityTypes/customer/features/lifetime_purchases


#### Search for all features with specific keywords in their ID

In [18]:
features = admin_client.search_features(
    featurestore_service_pb2.SearchFeaturesRequest(
        location=BASE_RESOURCE_PATH, query="feature_id:grid AND value_type=STRING"
    )
)

for feature in features:
    print(feature.description)
    print(feature.name)

Dropoff location
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/dropoff_grid
Pick location
projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips/features/pickup_grid


## Import Feature Values

### Prepare import table

In [19]:
BQ_DATASET_NAME = f'{PREFIX}_dataset' 
BQ_TABLE_NAME = 'feature_staging_table'
BQ_LOCATION = 'US'
SAMPLE_SIZE = 500000
YEAR = 2020

In [20]:
client = bigquery.Client()

dataset_id = f'{PROJECT_ID}.{BQ_DATASET_NAME}'
dataset = bigquery.Dataset(dataset_id)
dataset.location = BQ_LOCATION

try:
    dataset = client.create_dataset(dataset, timeout=30)
    print('Created dataset: ', dataset_id)
except exceptions.Conflict:
    print('Dataset {} already exists'.format(dataset_id))

Created dataset:  jsb-demos.jsb_taxi_dataset


In [21]:
sql_script_template = '''
CREATE OR REPLACE TABLE `@PROJECT.@DATASET.@TABLE` 
AS (
    WITH
      taxitrips AS (
      SELECT
        unique_key AS trip_id,
        FORMAT_DATETIME('%Y-%d-%m', trip_start_timestamp) AS date,
        trip_start_timestamp,
        trip_seconds,
        trip_miles,
        payment_type,
        pickup_longitude,
        pickup_latitude,
        dropoff_longitude,
        dropoff_latitude,
        tips,
        fare
      FROM
        `bigquery-public-data.chicago_taxi_trips.taxi_trips`
      WHERE 1=1 
      AND pickup_longitude IS NOT NULL
      AND pickup_latitude IS NOT NULL
      AND dropoff_longitude IS NOT NULL
      AND dropoff_latitude IS NOT NULL
      AND trip_miles > 0
      AND trip_seconds > 0
      AND fare > 0
      AND EXTRACT(YEAR FROM trip_start_timestamp) = @YEAR
    )
    SELECT
      trip_id,
      trip_start_timestamp,
      EXTRACT(MONTH from trip_start_timestamp) as trip_month,
      EXTRACT(DAY from trip_start_timestamp) as trip_day,
      EXTRACT(DAYOFWEEK from trip_start_timestamp) as trip_day_of_week,
      EXTRACT(HOUR from trip_start_timestamp) as trip_hour,
      trip_seconds,
      trip_miles,
      payment_type,
      ST_AsText(
          ST_SnapToGrid(ST_GeogPoint(pickup_longitude, pickup_latitude), 0.1)
      ) AS pickup_grid,
      ST_AsText(
          ST_SnapToGrid(ST_GeogPoint(dropoff_longitude, dropoff_latitude), 0.1)
      ) AS dropoff_grid,
      ST_Distance(
          ST_GeogPoint(pickup_longitude, pickup_latitude), 
          ST_GeogPoint(dropoff_longitude, dropoff_latitude)
      ) AS euclidean,
      IF((tips/fare >= 0.2), 1, 0) AS tip_bin,
      CASE (ABS(MOD(FARM_FINGERPRINT(date),10))) 
          WHEN 9 THEN 'TEST'
          WHEN 8 THEN 'VALIDATE'
          ELSE 'TRAIN' END AS data_split
    FROM
      taxitrips
    LIMIT @LIMIT
)
'''


sql_script = sql_script_template.replace(
    '@PROJECT', PROJECT_ID).replace(
    '@DATASET', BQ_DATASET_NAME).replace(
    '@TABLE', BQ_TABLE_NAME).replace(
    '@YEAR', str(YEAR)).replace(
    '@LIMIT', str(SAMPLE_SIZE))

job = client.query(sql_script)
job.result()

<google.cloud.bigquery.table._EmptyRowIterator at 0x7fd0346dfd50>

### Import features

In [22]:
entity_id_field = 'trip_id'
bq_table = f'bq://{PROJECT_ID}.{BQ_DATASET_NAME}.{BQ_TABLE_NAME}'

import_request = featurestore_service_pb2.ImportFeatureValuesRequest(
    entity_type=admin_client.entity_type_path(
        PROJECT_ID, REGION, FEATURESTORE_ID, ENTITY_TYPE_ID
    ),
    bigquery_source=io_pb2.BigQuerySource(
        input_uri=bq_table
    ),
    entity_id_field=entity_id_field,
    feature_specs=[
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="tip_bin"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_month"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_day"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_day_of_week"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_hour"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="payment_type"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="pickup_grid"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="dropoff_grid"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="euclidean"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_seconds"),
        featurestore_service_pb2.ImportFeatureValuesRequest.FeatureSpec(id="trip_miles"),
    ],
    feature_time_field="trip_start_timestamp",
    worker_count=1,
)

In [23]:
ingestion_lro = admin_client.import_feature_values(import_request)

In [37]:
ingestion_lro.result()

imported_entity_count: 500000
imported_feature_value_count: 5500000

## Online serving

The
[Online Serving APIs](https://cloud.google.com/vertex-ai/featurestore/docs/reference/rpc/google.cloud.aiplatform.v1beta1#featurestoreonlineservingservice)
lets you serve feature values for small batches of entities. It's designed for latency-sensitive service, such as online model prediction. For example, for a movie service, you might want to quickly shows movies that the current user would most likely watch by using online predictions.

### Read one entity per request

The ReadFeatureValues API is used to read feature values of one entity; hence
its custom HTTP verb is `readFeatureValues`. By default, the API will return the  latest value of each feature, meaning the feature values with the most recent  timestamp.

To read feature values, specify the entity ID and features to read. The response
contains a `header` and an `entity_view`. Each row of data in the `entity_view`
contains one feature value, in the same order of features as listed in the response header.

In [41]:
feature_selector = FeatureSelector(
    id_matcher=IdMatcher(ids=["tip_bin", "trip_miles", "trip_day","payment_type"])
)

features = data_client.read_feature_values(
    featurestore_online_service_pb2.ReadFeatureValuesRequest(
        # Fetch from the following feature store/entity type
        entity_type=admin_client.entity_type_path(
            PROJECT_ID, REGION, FEATURESTORE_ID, ENTITY_TYPE_ID
        ),
        # Fetch the user features whose ID is "alice"
        entity_id="13311b767c033d82e37439228ef23fd1d018d061",
        #trip_miles = 100,
        feature_selector=feature_selector,
    )
)
features

header {
  entity_type: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips"
  feature_descriptors {
    id: "tip_bin"
  }
  feature_descriptors {
    id: "trip_miles"
  }
  feature_descriptors {
    id: "trip_day"
  }
  feature_descriptors {
    id: "payment_type"
  }
}
entity_view {
  entity_id: "13311b767c033d82e37439228ef23fd1d018d061"
  data {
    value {
      int64_value: 0
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {
      double_value: 1.1
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {
      int64_value: 19
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {
      string_value: "Prcard"
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
}

### Read multiple entities per request

In [26]:
response_stream = data_client.streaming_read_feature_values(
    featurestore_online_service_pb2.StreamingReadFeatureValuesRequest(
        entity_type=admin_client.entity_type_path(
            PROJECT_ID, REGION, FEATURESTORE_ID, ENTITY_TYPE_ID
        ),
        entity_ids=["13311b767c033d82e37439228ef23fd1d018d061", "0e9be1edf79c3d88b4da5b9d11c2651538fb33b4"],
        feature_selector=feature_selector,
    )
)

In [27]:
for response in response_stream:
    print(response)

header {
  entity_type: "projects/917455532365/locations/us-central1/featurestores/jsb_taxi_featurestore/entityTypes/trips"
  feature_descriptors {
    id: "tip_bin"
  }
  feature_descriptors {
    id: "trip_miles"
  }
  feature_descriptors {
    id: "trip_day"
  }
}

entity_view {
  entity_id: "0e9be1edf79c3d88b4da5b9d11c2651538fb33b4"
  data {
    value {
      int64_value: 0
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {
      double_value: 0.9
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {
      int64_value: 19
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
}

entity_view {
  entity_id: "13311b767c033d82e37439228ef23fd1d018d061"
  data {
    value {
      int64_value: 0
      metadata {
        generate_time {
          seconds: 1592524800
        }
      }
    }
  }
  data {
    value {

## Batch serving

Batch Serving is used to fetch a large batch of feature values for high-throughput, typically for training a model or batch prediction. In this section, you will learn how to prepare for training examples by calling the BatchReadFeatureValues API.

### Use case



In [90]:
INPUT_CSV_FILE = "gs://cloud-samples-data-us-central1/ai-platform-unified/datasets/featurestore/movie_prediction.csv"
INPUT_CSV_FILE = "gs://jsb-demos-central1/vertex-ai-demo/feature_store/taxi_trip_ids.csv"


In [91]:
from datetime import datetime

In [104]:
# Output dataset
DESTINATION_DATA_SET = "jsb_taxi_dataset2"  # @param {type:"string"}
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
#DESTINATION_DATA_SET = "{prefix}_{timestamp}".format(
#    prefix=DESTINATION_DATA_SET, timestamp=TIMESTAMP)

# Output table. Make sure that the table does NOT already exist; the BatchReadFeatureValues API cannot overwrite an existing table
DESTINATION_TABLE_NAME = "training_data"  # @param {type:"string"}
DESTINATION_TABLE_NAME = "{prefix}_{timestamp}".format(
    prefix=DESTINATION_TABLE_NAME, timestamp=TIMESTAMP)

DESTINATION_PATTERN = "bq://{project}.{dataset}.{table}"
DESTINATION_TABLE_URI = DESTINATION_PATTERN.format(
    project=PROJECT_ID, dataset=DESTINATION_DATA_SET, table=DESTINATION_TABLE_NAME
)

In [105]:
batch_serving_request = featurestore_service_pb2.BatchReadFeatureValuesRequest(
    # featurestore info
    featurestore=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
    # URL for the label data, i.e., Table 1.
    csv_read_instances=io_pb2.CsvSource(
        gcs_source=io_pb2.GcsSource(uris=[INPUT_CSV_FILE])
    ),
    destination=featurestore_service_pb2.FeatureValueDestination(
        bigquery_destination=io_pb2.BigQueryDestination(
            # Output to BigQuery table created earlier
            output_uri=DESTINATION_TABLE_URI
        )
    ),
    entity_type_specs=[
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            # Read the 'age', 'gender' and 'liked_genres' features from the 'users' entity
            entity_type_id="trips",
            feature_selector=FeatureSelector(
                id_matcher=IdMatcher(
                    ids=[
                        # features, use "*" if you want to select all features within this entity type
                        #"trip_month",
                        #"trip_day",
                        "trip_hour",
                        "pickup_grid",
                        "payment_type",
                        #"*",
   
                    ]
                )
            ),
        ),
        #Showcasing how to get feature data from second Feature Store Instance
        featurestore_service_pb2.BatchReadFeatureValuesRequest.EntityTypeSpec(
            # Read the 'average_rating' and 'genres' feature values of the 'movies' entity
            entity_type_id="trips",
            feature_selector=FeatureSelector(
                id_matcher=IdMatcher(ids=["trip_month", "trip_day"])
            ),
        ),
    ],
)

In [106]:
# Execute the batch read
batch_serving_lro = admin_client.batch_read_feature_values(batch_serving_request)

In [109]:
# This long runing operation will poll until the batch read finishes.
batch_serving_lro.result()



In [123]:

SAMPLE_SIZE = "10"

In [122]:
sql_script_template = '''

SELECT *
      FROM
`@PROJECT.@DATASET.@TABLE`
LIMIT @LIMIT
'''

sql_script = sql_script_template.replace(
    '@PROJECT', PROJECT_ID).replace(
    '@DATASET', DESTINATION_DATA_SET).replace(
    '@TABLE', DESTINATION_TABLE_NAME).replace(
    '@LIMIT', str(SAMPLE_SIZE))

job = client.query(sql_script)
job.to_dataframe()

AttributeError: 'BigQueryReadClient' object has no attribute '_transport'

In [119]:
%%bigquery data
SELECT * 
FROM
jsb-demos.jsb_taxi_dataset2.training_data_20210729173804
LIMIT 10

Query complete after 0.00s: 100%|██████████| 1/1 [00:00<00:00, 743.67query/s] 
Downloading:   0%|          | 0/5 [00:00<?, ?rows/s]

AttributeError: 'BigQueryReadClient' object has no attribute '_transport'

In [29]:
%%bigquery
SELECT * 
FROM
`jk-test-1002.jktest2_dataset.feature_staging_table`
LIMIT 10

Executing query with job ID: 6c5e15ec-bbfe-464c-95e5-083d508ebae4
Query executing: 1.27s


ERROR:
 404 Not found: Project jk-test-1002

(job ID: 6c5e15ec-bbfe-464c-95e5-083d508ebae4)

             -----Query Job SQL Follows-----             

    |    .    |    .    |    .    |    .    |    .    |
   1:SELECT * 
   2:FROM
   3:`jk-test-1002.jktest2_dataset.feature_staging_table`
   4:LIMIT 10
    |    .    |    .    |    .    |    .    |    .    |


AttributeError: 'BigQueryReadClient' object has no attribute '_transport'

## Clean up

In [None]:
admin_client.delete_featurestore(
    featurestore_service_pb2.DeleteFeaturestoreRequest(
        name=admin_client.featurestore_path(PROJECT_ID, REGION, FEATURESTORE_ID),
        force=True
    )
)