In [2]:
from google.cloud import aiplatform
from google.cloud import bigquery

In [3]:
from google.cloud.aiplatform_v1beta1 import (FeatureOnlineStoreAdminServiceClient, FeatureRegistryServiceClient)

In [4]:
from google.cloud.aiplatform_v1beta1.types import feature as feature_pb2
from google.cloud.aiplatform_v1beta1.types import feature_group as feature_group_pb2
from google.cloud.aiplatform_v1beta1.types import feature_online_store as feature_online_store_pb2
from google.cloud.aiplatform_v1beta1.types import feature_view as feature_view_pb2

In [5]:
from google.cloud.aiplatform_v1beta1.types import feature_online_store_admin_service as feature_online_store_admin_service_pb2
from google.cloud.aiplatform_v1beta1.types import feature_registry_service as feature_registry_service_pb2
from google.cloud.aiplatform_v1beta1.types import featurestore_service as featurestore_service_pb2
from google.cloud.aiplatform_v1beta1.types import io as io_pb2
from google.cloud.aiplatform_v1beta1.types import service_networking as service_networking_pb2
from vertexai.resources.preview import (FeatureOnlineStore, FeatureView,FeatureViewBigQuerySource)
     

In [18]:
feature_extract_query = """
CREATE OR REPLACE TABLE `innov-mtwr-6eqv-1.bq_feature_store2.product_data` AS
WITH
 product_order_agg AS (
   SELECT cast(product_id as string) as entity_id,
     countif(status in ("Shipped", "Complete")) as good_order_count,
     countif(status in ("Returned", "Cancelled")) as bad_order_count
   FROM `bigquery-public-data.thelook_ecommerce.order_items`
   WHERE
     timestamp_trunc(created_at, day) >= timestamp_trunc(timestamp_sub(CURRENT_TIMESTAMP(), interval 30 day), day) and
     timestamp_trunc(created_at, day) < timestamp_trunc(CURRENT_TIMESTAMP(), day)
   group by 1
   order by entity_id), 
 product_basic AS (
   SELECT cast(id as string) AS entity_id,
     lower(name) as name,
     lower(category) as category,
     lower(brand) as brand,
     cost,
     retail_price
   FROM  `bigquery-public-data.thelook_ecommerce.products`)
SELECT product_basic.entity_id,name,category,brand,cost,retail_price,good_order_count,bad_order_count, current_timestamp() as feature_timestamp
FROM product_basic
LEFT OUTER JOIN product_order_agg
USING (entity_id)
"""

In [19]:
PROJECT_ID   = 'innov-mtwr-6eqv-1'
LOCATION     = "us-central1"  # @param {type:"string"}
API_ENDPOINT = f"{LOCATION}-aiplatform.googleapis.com"

from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=LOCATION)

In [20]:
bq_client = bigquery.Client(project=PROJECT_ID)

In [None]:
job = bq_client.query_and_wait(query=feature_extract_query)

In [None]:
product_data = bq_client.query(feature_extract_query).result().to_dataframe()

In [None]:
print(product_data.shape)
product_data.head()

In [None]:
product_data.dtypes

In [None]:
BQ_VIEW_ID = "innov-mtwr-6eqv-1.bq_feature_store2.product_features"  # @param {type:"string"}
view = bigquery.Table(BQ_VIEW_ID)
view.view_query = feature_extract_query
view = bq_client.create_table(view, exists_ok=True)

print(f"Created {view.table_type}: {str(view.reference)}")

### Set up and start online serving

### Initialize Admin Service Client

In [21]:
admin_client    = FeatureOnlineStoreAdminServiceClient( client_options={"api_endpoint": API_ENDPOINT})

registry_client = FeatureRegistryServiceClient(client_options={"api_endpoint": API_ENDPOINT})

## Create online store instance

### Option 1: Create an online store instance for Optimized online serving with private endpoint

In [None]:
FEATURE_ONLINE_STORE_ID = "thelook_opt_private_unique"  # @param {type:"string"}
ALLOW_LISTED_PROJECT    = f"{PROJECT_ID}"  # @param {type:"string"}

online_store_config = feature_online_store_pb2.FeatureOnlineStore(
                                            optimized=feature_online_store_pb2.FeatureOnlineStore.Optimized(),
                                            dedicated_serving_endpoint=feature_online_store_pb2.FeatureOnlineStore.DedicatedServingEndpoint( private_service_connect_config=service_networking_pb2.PrivateServiceConnectConfig( enable_private_service_connect=True, project_allowlist=[ALLOW_LISTED_PROJECT,], )),
)

create_store_lro = admin_client.create_feature_online_store(
    feature_online_store_admin_service_pb2.CreateFeatureOnlineStoreRequest(
        parent                   = f"projects/{PROJECT_ID}/locations/{LOCATION}",
        feature_online_store_id  = FEATURE_ONLINE_STORE_ID,
        feature_online_store     = online_store_config,
    )
)

print(create_store_lro.result())

In [None]:
FeatureOnlineStore(FEATURE_ONLINE_STORE_ID)

In [7]:
FeatureOnlineStore.list()

[<vertexai.resources.preview.feature_store.feature_online_store.FeatureOnlineStore object at 0x7f6d48ca5fc0> 
 resource name: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique]

In [32]:
FEATURE_ONLINE_STORE_ID = 'projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique'

### Create feature view instance

In [14]:
# Create FeatureView
FEATURE_VIEW_ID = "product_ftr_vw"  # @param {type:"string"}

In [11]:
my_fos = FeatureOnlineStore(FEATURE_ONLINE_STORE_ID)

In [12]:
my_fos

<vertexai.resources.preview.feature_store.feature_online_store.FeatureOnlineStore object at 0x7f6d31d83ee0> 
resource name: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique

In [15]:
my_fv = my_fos.create_feature_view( FEATURE_VIEW_ID,
                                    FeatureViewBigQuerySource(uri="bq://innov-mtwr-6eqv-1.bq_feature_store2.product_data", entity_id_columns=["entity_id"], ),
                                    # sync_config can be set with CRON_SCHEDULE if you want to setup auto sync
                                    sync_config=None,
                                  )

Creating FeatureView
Create FeatureView backing LRO: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/product_ftr_vw/operations/8586254835819479040
FeatureView created. Resource name: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/product_ftr_vw
To use this FeatureView in another session:
feature_view = aiplatform.FeatureView('projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/product_ftr_vw')


### Option 2: Create featureView with FeatureGroups/Feature

#### Create FeatureGroup/Features

In [16]:
FEATURE_GROUP_ID = "product_features_unique"  # @param {type: "string"}
FEATURE_IDS = [
    "good_order_count",
    "bad_order_count",
    "category",
    "name",
    "brand",
    "cost",
    "retail_price",
]

In [22]:
# Now, create the featureGroup

feature_group_config = feature_group_pb2.FeatureGroup(
    big_query=feature_group_pb2.FeatureGroup.BigQuery(big_query_source=io_pb2.BigQuerySource(input_uri="bq://innov-mtwr-6eqv-1.bq_feature_store2.product_data")
    )
)

create_group_lro = registry_client.create_feature_group(
    feature_registry_service_pb2.CreateFeatureGroupRequest(
                                                            parent            = f"projects/{PROJECT_ID}/locations/{LOCATION}",
                                                            feature_group_id  = FEATURE_GROUP_ID,
                                                            feature_group     = feature_group_config,
                                                          )
                                                        )
    

In [23]:
print(create_group_lro.result())

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique"



In [24]:
create_feature_lros = []
for id in FEATURE_IDS:
    create_feature_lros.append(
        registry_client.create_feature(
            featurestore_service_pb2.CreateFeatureRequest(
                parent=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureGroups/{FEATURE_GROUP_ID}",
                feature_id=id,
                feature=feature_pb2.Feature(),
            )
        )
    )

In [25]:
for lro in create_feature_lros:
    print(lro.result())

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/good_order_count"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/bad_order_count"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/category"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/name"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/brand"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/cost"

name: "projects/932910293196/locations/us-central1/featureGroups/product_features_unique/features/retail_price"



In [28]:
FEATURE_VIEW_ID = "registry_product_ftr_vw"  # @param {type:"string"}
CRON_SCHEDULE = "TZ=America/Los_Angeles 56 * * * *"  # @param {type:"string"}

In [35]:
feature_registry_source = feature_view_pb2.FeatureView.FeatureRegistrySource(
    feature_groups=[
        feature_view_pb2.FeatureView.FeatureRegistrySource.FeatureGroup(
            feature_group_id=FEATURE_GROUP_ID, feature_ids=FEATURE_IDS
        )
    ]
)

In [36]:
sync_config = feature_view_pb2.FeatureView.SyncConfig(cron=CRON_SCHEDULE)

In [39]:
FEATURE_ONLINE_STORE_ID

'projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique'

In [40]:
create_view_lro = admin_client.create_feature_view(
    feature_online_store_admin_service_pb2.CreateFeatureViewRequest(
                                    #parent=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}",
                                    parent=FEATURE_ONLINE_STORE_ID,
                                    feature_view_id=FEATURE_VIEW_ID,
                                    feature_view=feature_view_pb2.FeatureView( feature_registry_source=feature_registry_source, sync_config=sync_config,)
                                )
)

# Wait for LRO to complete and show result
print(create_view_lro.result())

name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw"



### Verify FeatureView instance creation

In [42]:
# Again, list all feature view under the FEATURE_ONLINE_STORE_ID to confirm
admin_client.list_feature_views(
    #parent=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}"
    parent=FEATURE_ONLINE_STORE_ID
)

ListFeatureViewsPager<feature_views {
  name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw"
  create_time {
    seconds: 1717884874
    nanos: 623047000
  }
  update_time {
    seconds: 1717884875
    nanos: 736456000
  }
  etag: "AMEw9yO6OS0f_MgA5dgP7PB2uXvBIqd9z1Jfsr2-CrbMNBUElus3DykfUaWlYuhRttnu"
  sync_config {
    cron: "TZ=America/Los_Angeles 56 * * * *"
  }
  feature_registry_source {
    feature_groups {
      feature_group_id: "product_features_unique"
      feature_ids: "good_order_count"
      feature_ids: "bad_order_count"
      feature_ids: "category"
      feature_ids: "name"
      feature_ids: "brand"
      feature_ids: "cost"
      feature_ids: "retail_price"
    }
  }
}
feature_views {
  name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/product_ftr_vw"
  create_time {
    seconds: 1717883280
    nanos: 192094000
  }
  update_t

In [43]:
fv_list = FeatureView.list(feature_online_store_id=FEATURE_ONLINE_STORE_ID)

In [45]:
for fv in fv_list:
    print(fv.name)
    print(fv.gca_resource)

registry_product_ftr_vw
name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw"
create_time {
  seconds: 1717884874
  nanos: 623047000
}
update_time {
  seconds: 1717884875
  nanos: 736456000
}
etag: "AMEw9yOZ3JbVjinmNt9qB2XM4GaiQX5zML01tWehvm_kYi38_cJ9dYdpqhtstkO6RvtE"
sync_config {
  cron: "TZ=America/Los_Angeles 56 * * * *"
}
feature_registry_source {
  feature_groups {
    feature_group_id: "product_features_unique"
    feature_ids: "good_order_count"
    feature_ids: "bad_order_count"
    feature_ids: "category"
    feature_ids: "name"
    feature_ids: "brand"
    feature_ids: "cost"
    feature_ids: "retail_price"
  }
}

product_ftr_vw
name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/product_ftr_vw"
create_time {
  seconds: 1717883280
  nanos: 192094000
}
update_time {
  seconds: 1717883281
  nanos: 106434000
}
etag: "AMEw9yNF1zj2ClYFW_Tw6q

In [49]:
my_fv =FeatureView('projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw')
     

In [50]:
my_fv

<vertexai.resources.preview.feature_store.feature_view.FeatureView object at 0x7f6d286e34c0> 
resource name: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw

In [51]:
# Sync the FeatureView
fv_sync = my_fv.sync()

In [54]:
fv_sync

<vertexai.resources.preview.feature_store.feature_view.FeatureView.FeatureViewSync object at 0x7f6d286d5360> 
resource name: projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw/featureViewSyncs/4542907219603095552

In [53]:
import time

while True:
    feature_view_sync = my_fv.get_sync(fv_sync.resource_name.split("/")[9]).gca_resource
    if feature_view_sync.run_time.end_time.seconds > 0:
        status = "Succeed" if feature_view_sync.final_status.code == 0 else "Failed"
        print(f"Sync {status} for {feature_view_sync.name}.")
        # wait a little more for the job to properly shutdown
        time.sleep(30)
        break
    else:
        print("Sync ongoing, waiting for 30 seconds.")
    time.sleep(30)

Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync ongoing, waiting for 30 seconds.
Sync Succeed for projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw/featureViewSyncs/4542907219603095552.


In [55]:
# Get Optimized online store
admin_client.get_feature_online_store(
    name=FEATURE_ONLINE_STORE_ID
)

name: "projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique"
create_time {
  seconds: 1717881252
  nanos: 109462000
}
update_time {
  seconds: 1717881252
  nanos: 184087000
}
etag: "AMEw9yM66B8k3lFNCn1wwHlpZHfP1PEtdPVOxOLOqf44Z6Wdupf0YNZ7SORWYPJLK7k="
state: STABLE
dedicated_serving_endpoint {
  private_service_connect_config {
    enable_private_service_connect: true
    project_allowlist: "innov-mtwr-6eqv-1"
  }
  service_attachment: "projects/z2e035513e31d706e-tp/regions/us-central1/serviceAttachments/gkedpm-ed36888bd5615ebe09511e751234a9-sa"
}
embedding_management {
  enabled: true
}
optimized {
}

In [56]:
from google.cloud.aiplatform_v1beta1.services.feature_online_store_service.transports.grpc import FeatureOnlineStoreServiceGrpcTransport
from google.cloud.aiplatform_v1beta1 import FeatureOnlineStoreServiceClient
import grpc

data_client = FeatureOnlineStoreServiceClient(
    transport = FeatureOnlineStoreServiceGrpcTransport(
        # Add the IP address of the Endpoint you just created.
        channel = grpc.insecure_channel("10.0.0.19:10002")
    )
)

In [59]:
from google.cloud.aiplatform_v1beta1.types import feature_online_store_service as feature_online_store_service_pb2

In [60]:
FEATURE_ONLINE_STORE_ID = 'thelook_opt_private_unique'
FEATURE_VIEW_ID         = 'registry_product_ftr_vw'

In [61]:
data_client.fetch_feature_values(
    request=feature_online_store_service_pb2.FetchFeatureValuesRequest(
        feature_view=f"projects/{PROJECT_ID}/locations/{LOCATION}/featureOnlineStores/{FEATURE_ONLINE_STORE_ID}/featureViews/{FEATURE_VIEW_ID}",
        id="16050"))

key_values {
  features {
    name: "feature_timestamp"
    value {
      int64_value: 1717885174314456
    }
  }
  features {
    name: "good_order_count"
    value {
      int64_value: 2
    }
  }
  features {
    name: "bad_order_count"
    value {
      int64_value: 0
    }
  }
  features {
    name: "category"
    value {
      string_value: "tops & tees"
    }
  }
  features {
    name: "name"
    value {
      string_value: "fred perry men\'s sharp stripe t-shirt"
    }
  }
  features {
    name: "brand"
    value {
      string_value: "fred perry"
    }
  }
  features {
    name: "cost"
    value {
      double_value: 36.30000002682209
    }
  }
  features {
    name: "retail_price"
    value {
      double_value: 60.0
    }
  }
}

In [None]:
my_fv =FeatureView('projects/932910293196/locations/us-central1/featureOnlineStores/thelook_opt_private_unique/featureViews/registry_product_ftr_vw')