diff --git a/integrations/neo4j/1_feature_pipeline.ipynb b/integrations/neo4j/1_feature_pipeline.ipynb index 94558371..c59815c0 100644 --- a/integrations/neo4j/1_feature_pipeline.ipynb +++ b/integrations/neo4j/1_feature_pipeline.ipynb @@ -341,6 +341,29 @@ "party_labels.head(5)" ] }, + { + "cell_type": "markdown", + "id": "4eb1c080-b777-4d2c-bd8c-7542e5ca37f9", + "metadata": {}, + "source": [ + "#### Convert date time to unix epoch milliseconds " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b22d10f4-d642-44db-8236-a995600c3807", + "metadata": {}, + "outputs": [], + "source": [ + "transaction_labels[\"event_time\"] = transaction_labels.tran_timestamp\n", + "transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", + "party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)\n", + "party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)\n", + "\n", + "transaction_labels['month'] = pd.to_datetime(transaction_labels['tran_timestamp'], unit='ms').dt.month" + ] + }, { "cell_type": "markdown", "id": "2b2f5b51-6ad2-4017-8773-fd7ddf9d3161", @@ -393,7 +416,7 @@ "outputs": [], "source": [ "# Connect to Neo4j database\n", - "gds = GraphDataScience('bolt://localhost:7687', auth=('neo4j', 'hopsworks'))" + "gds = GraphDataScience('bolt://localhost:7687', auth=('your_user', 'your_password'))" ] }, { @@ -427,7 +450,9 @@ " Returned processed edges (relationships).\n", " \"\"\"\n", " \n", - " relationships = df[['source', 'target', 'tran_id']]\n", + " relationships = df[['source', 'target', 'tran_id', 'tran_timestamp']].copy()\n", + " relationships[\"tran_timestamp\"] = relationships[\"tran_timestamp\"].map(lambda x: int(x) // 100000)\n", + " \n", " relationships = relationships.rename(columns={\"source\": \"sourceNodeId\",\n", " \"target\": \"targetNodeId\"},\n", " errors=\"raise\")\n", @@ -450,7 +475,8 @@ "metadata": {}, "outputs": [], "source": [ - "def pupulate_graph(input_df):\n", + "# populate graph database\n", + "def populate_graph(input_df):\n", " \"\"\"\n", " Build Neo4j graph and return node embeddings\n", " \"\"\"\n", @@ -465,78 +491,100 @@ " # Check if the number of nodes is correctly stored\n", " assert G.node_count() == len(nodes)\n", "\n", - " # Compute embeddings\n", - " graph_embdeddings_df = gds.node2vec.stream(G)\n", - "\n", - " # Delete graph for next partition\n", - " G.drop()\n", - "\n", - " # Convert integer node ID back to the original ID\n", - " graph_embdeddings_df['nodeId'] = [convertFromNumber(nodeId) for nodeId in graph_embdeddings_df['nodeId']]\n", - "\n", - " return {\"id\": graph_embdeddings_df.nodeId.to_numpy(), \"graph_embeddings\": graph_embdeddings_df.embedding.to_numpy()}" + "populate_graph(transaction_labels)\n", + "transaction_labels.drop([\"event_time\"], axis=1, inplace=True)" ] }, { - "cell_type": "markdown", - "id": "f288f063", + "cell_type": "code", + "execution_count": null, + "id": "5ea8cdb1-5f9d-4139-ba08-7574e28976ad", "metadata": {}, + "outputs": [], "source": [ - "#### Compute graph embeddings" + "#G.drop()" ] }, { "cell_type": "code", "execution_count": null, - "id": "45f95f3b", + "id": "cb83623b-7542-4254-845d-1237eabd16fd", "metadata": {}, "outputs": [], "source": [ - "tqdm.pandas()\n", - "transaction_graphs_by_month = transaction_labels.groupby(pd.Grouper(key='tran_timestamp', freq='M')).progress_apply(lambda x: pupulate_graph(x)) " + "# get graph object\n", + "G = gds.graph.get(\"transactions-graph\")" ] }, { "cell_type": "code", "execution_count": null, - "id": "c6f7108c", + "id": "a0e6ddf0-9f39-4d28-a4b7-fc560bb6389b", "metadata": {}, "outputs": [], "source": [ - "# Build embeddings data frame\n", + "# compute node embeddings\n", + "def compute_node_embeddings_subgraph(G, start_date, end_date):\n", + " \"\"\"\n", + " Poject Neo4j sub graph and return node embeddings\n", + " \"\"\"\n", + " start = float(start_date.timestamp() / 1000000000)\n", + " end = float(end_date.timestamp() / 1000000000)\n", + "\n", + " # Project sub Graph\n", + " gds.graph.filter(\n", + " \"tmp-sub-graph\", # new projected graph\n", + " G, # existing projected graph\n", + " \"*\",\n", + " f\"r.tran_timestamp > {start} AND r.tran_timestamp < {end}\"\n", + " )\n", + " subG = gds.graph.get(\"tmp-sub-graph\")\n", "\n", - "timestamps = transaction_graphs_by_month.index.values\n", - "graph_embeddings = transaction_graphs_by_month.tolist()\n", "\n", - "graph_embdeddings_df = pd.DataFrame()\n", - "for timestamp, graph_embedding in zip(timestamps, graph_embeddings):\n", - " df_tmp = pd.DataFrame(graph_embedding)\n", - " df_tmp[\"tran_timestamp\"] = timestamp\n", - " graph_embdeddings_df = pd.concat([graph_embdeddings_df, df_tmp]) \n", - "graph_embdeddings_df.head(5)" + " # Check if the number of nodes is correctly stored\n", + " # Compute embeddings\n", + " graph_embdeddings_df = gds.node2vec.stream(subG)\n", + "\n", + " # Delete graph for next partition\n", + " subG.drop()\n", + "\n", + " # Convert integer node ID back to the original ID\n", + " graph_embdeddings_df['nodeId'] = [convertFromNumber(nodeId) for nodeId in graph_embdeddings_df['nodeId']]\n", + "\n", + " return {\"id\": graph_embdeddings_df.nodeId.to_numpy(), \"graph_embeddings\": graph_embdeddings_df.embedding.to_numpy(), \"tran_timestamp\": start_date}\n", + " " ] }, { "cell_type": "markdown", - "id": "89521da4", + "id": "94125a6b-0808-4247-a32f-1b53a1416615", "metadata": {}, "source": [ - "#### Convert date time to unix epoc milliseconds " + "#### Compute graph embeddings" ] }, { "cell_type": "code", "execution_count": null, - "id": "87b0af80", + "id": "e2bd9de6-0814-4e1d-a6e5-5b3573104dd9", "metadata": {}, "outputs": [], "source": [ - "transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", - "graph_embdeddings_df.tran_timestamp = graph_embdeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", - "party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)\n", - "party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)\n", + "# Compute embeddings by month and build data frame\n", "\n", - "transaction_labels['month'] = pd.to_datetime(transaction_labels['tran_timestamp'], unit='ms').dt.month" + "emb_df_list = []\n", + "start_date = datetime.datetime(2020, 1, 1)\n", + "end_date = datetime.datetime(2021, 12, 31)\n", + "\n", + "while start_date <= end_date:\n", + " last_day_of_month = datetime.datetime(start_date.year, start_date.month, 1) + datetime.timedelta(days=32)\n", + " end_date_of_month = last_day_of_month - datetime.timedelta(days=last_day_of_month.day)\n", + " emb_df_list.append( pd.DataFrame(compute_node_embeddings_subgraph(G, start_date, end_date_of_month)))\n", + " start_date = end_date_of_month + datetime.timedelta(days=1)\n", + "\n", + "graph_embdeddings_df = pd.concat(emb_df_list)\n", + "graph_embdeddings_df[\"tran_timestamp\"] = graph_embdeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6\n", + "graph_embdeddings_df.head()" ] }, { @@ -568,7 +616,7 @@ { "cell_type": "code", "execution_count": null, - "id": "a6a0128f-01c8-4e18-b72b-d1e13744c431", + "id": "627bfbbf-1dea-4898-8dd2-a8e139f0f497", "metadata": {}, "outputs": [], "source": [ @@ -664,7 +712,6 @@ " name = \"transactions_monthly\",\n", " version = 1,\n", " primary_key = [\"id\"],\n", - " partition_key = [\"tran_timestamp\"], \n", " description = \"transactions monthly aggregates features\",\n", " event_time = 'tran_timestamp',\n", " online_enabled = True,\n", @@ -804,9 +851,9 @@ ], "metadata": { "kernelspec": { - "display_name": "myenv", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "myenv" + "name": "python3" }, "language_info": { "codemirror_mode": { diff --git a/integrations/neo4j/2_training_pipeline.ipynb b/integrations/neo4j/2_training_pipeline.ipynb index fad69bfe..cbb7aef2 100644 --- a/integrations/neo4j/2_training_pipeline.ipynb +++ b/integrations/neo4j/2_training_pipeline.ipynb @@ -84,7 +84,7 @@ ")\n", "\n", "graph_embeddings_fg = fs.get_feature_group(\n", - " name=\"transactions_emb\",\n", + " name=\"graph_embeddings\",\n", " version=1,\n", ") \n", "\n", @@ -113,9 +113,9 @@ " \"monthly_out_mean_amount\", \n", " \"monthly_out_std_amount\",\n", " ]\n", - " ).join(\n", + " )).join(\n", " graph_embeddings_fg.select([\"graph_embeddings\"]),\n", - " )" + " )\n" ] }, { @@ -564,7 +564,7 @@ " feature_vector_exploded_emb = [*self.flat2gen(feature_vector)]\n", "\n", " # Reshape feature vector to match the model's input shape\n", - " feature_vector_reshaped = np.array(feature_vector_exploded_emb).reshape(1, 41)\n", + " feature_vector_reshaped = np.array(feature_vector_exploded_emb).reshape(1, 137)\n", "\n", " # Convert the feature vector to a TensorFlow constant\n", " input_vector = tf.constant(feature_vector_reshaped, dtype=tf.float32)\n", diff --git a/integrations/neo4j/3_online_inference.ipynb b/integrations/neo4j/3_online_inference.ipynb index 43f52195..c2ddbdef 100644 --- a/integrations/neo4j/3_online_inference.ipynb +++ b/integrations/neo4j/3_online_inference.ipynb @@ -124,7 +124,7 @@ "metadata": {}, "outputs": [], "source": [ - "deployment.get_logs()" + "#deployment.get_logs()" ] }, { @@ -168,7 +168,7 @@ "metadata": {}, "outputs": [], "source": [ - "deployment.get_logs()" + "#deployment.get_logs()" ] }, { @@ -279,9 +279,9 @@ ], "metadata": { "kernelspec": { - "display_name": "myenv", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "myenv" + "name": "python3" }, "language_info": { "codemirror_mode": {