diff --git a/api_examples/hsfs/feature_group_change_notification_cdc.ipynb b/api_examples/hsfs/feature_group_change_notification_cdc.ipynb new file mode 100644 index 00000000..238f147e --- /dev/null +++ b/api_examples/hsfs/feature_group_change_notification_cdc.ipynb @@ -0,0 +1,253 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9e9da978", + "metadata": {}, + "source": [ + "![Screenshot from 2022-06-16 14-24-57.png]()" + ] + }, + { + "cell_type": "markdown", + "id": "55b19afd", + "metadata": {}, + "source": [ + "## 💽 CDC (change data notification) for Feature Groups \n", + "\n", + "This notebook shows you how to write a CDC client that subscribes to changes to an Online Feature Group\n", + "\n", + "The notebook performs the following steps:\n", + "\n", + "1. Create the Kafka topic in your Hopsworks Project for the CDC notifications for the Fetaure Group\n", + "2. Create the Feature Group, providing the name of the CDC notification topic you just created\n", + "3. Insert some data into the feature group\n", + "4. Run a Kafka Client that consume the changes to the Feature Group (from the notification topic)" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "46ee5e6e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Connected. Call `.close()` to terminate connection gracefully.\n", + "\n", + "Logged in to project, explore it here https://hopsworks0.logicalclocks.com/p/2167\n", + "Connected. Call `.close()` to terminate connection gracefully.\n" + ] + } + ], + "source": [ + "import hopsworks\n", + "import pandas as pd\n", + "\n", + "project = hopsworks.login()\n", + "fs = project.get_feature_store()" + ] + }, + { + "cell_type": "markdown", + "id": "c7b04834", + "metadata": {}, + "source": [ + "### Create the CDC Kafka Topic for Notifications" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b951fca3", + "metadata": {}, + "outputs": [], + "source": [ + "feature_group_name = \"cdc\"\n", + "notification_topic = f\"{feature_group_name}_notification\"\n", + "partitions = 10\n", + "\n", + "if not any(topic.name == notification_topic for topic in project.get_kafka_api().get_topics()):\n", + " project.get_kafka_api().create_topic(notification_topic, None, None, partitions=partitions)" + ] + }, + { + "cell_type": "markdown", + "id": "c5b95586", + "metadata": {}, + "source": [ + "### Create the Feature Group, providing the CDC topic name " + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "ba5fc19f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Feature Group created successfully, explore it at \n", + "https://hopsworks0.logicalclocks.com/p/2167/fs/2115/fg/2066\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "7c478f19a06247b38633c023263993df", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "Uploading Dataframe: 0.00% | | Rows 0/10 | Elapsed Time: 00:00 | Remaining Time: ?" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Launching job: cdc_1_offline_fg_materialization\n", + "Job started successfully, you can follow the progress at \n", + "https://hopsworks0.logicalclocks.com/p/2167/jobs/named/cdc_1_offline_fg_materialization/executions\n" + ] + }, + { + "data": { + "text/plain": [ + "(, None)" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fg = fs.get_or_create_feature_group(name=feature_group_name, \n", + " version=1, \n", + " primary_key=[\"id\"], \n", + " online_enabled=True, \n", + " notification_topic_name=notification_topic)\n", + "\n", + "size = 10\n", + "fg_data = {'id': range(0, size), 'text': \"test\"}\n", + "fg_df = pd.DataFrame.from_dict(fg_data)\n", + "fg.insert(fg_df)" + ] + }, + { + "cell_type": "markdown", + "id": "150370b6", + "metadata": {}, + "source": [ + "### Reading the contents of CDC topic" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "4444e032", + "metadata": {}, + "outputs": [], + "source": [ + "import confluent_kafka\n", + "\n", + "# after assignment make consumer read from the begginning\n", + "def my_assign(consumer, partitions):\n", + " for partition in partitions:\n", + " partition.offset = confluent_kafka.OFFSET_BEGINNING\n", + " consumer.assign(partitions)\n", + "\n", + "storage_connector = fs._storage_connector_api.get_kafka_connector(fs.id)\n", + "\n", + "config = storage_connector.confluent_options()\n", + "# You have to define a 'group.id'. If multiple consumers with the same group.id are subscribed to the same topic,\n", + "# they will share consupmtions of its msgs\n", + "config[\"group.id\"] = feature_group_name\n", + "consumer = confluent_kafka.Consumer(config)\n", + "consumer.subscribe([notification_topic], on_assign=my_assign)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "09cc84f2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"6\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"2\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"5\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"7\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"0\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"9\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"3\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"8\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"4\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "b'{\"projectId\":2167,\"featureStoreId\":2115,\"featureGroupId\":2066,\"entry\":{\"id\":\"1\",\"text\":\"test\"},\"featureViews\":[]}'\n", + "done\n" + ] + } + ], + "source": [ + "run = True\n", + "while run:\n", + " msg = consumer.poll(timeout=10)\n", + " if msg is not None:\n", + " print(msg.value())\n", + " else:\n", + " run = False\n", + "print(\"done\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ed56a62d", + "metadata": {}, + "outputs": [], + "source": [ + "consumer.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "60e148c0", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}