Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 88 additions & 41 deletions integrations/neo4j/1_feature_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,29 @@
"party_labels.head(5)"
]
},
{
"cell_type": "markdown",
"id": "4eb1c080-b777-4d2c-bd8c-7542e5ca37f9",
"metadata": {},
"source": [
"#### Convert date time to unix epoch milliseconds "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "b22d10f4-d642-44db-8236-a995600c3807",
"metadata": {},
"outputs": [],
"source": [
"transaction_labels[\"event_time\"] = transaction_labels.tran_timestamp\n",
"transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6\n",
"party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)\n",
"party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)\n",
"\n",
"transaction_labels['month'] = pd.to_datetime(transaction_labels['tran_timestamp'], unit='ms').dt.month"
]
},
{
"cell_type": "markdown",
"id": "2b2f5b51-6ad2-4017-8773-fd7ddf9d3161",
Expand Down Expand Up @@ -393,7 +416,7 @@
"outputs": [],
"source": [
"# Connect to Neo4j database\n",
"gds = GraphDataScience('bolt://localhost:7687', auth=('neo4j', 'hopsworks'))"
"gds = GraphDataScience('bolt://localhost:7687', auth=('your_user', 'your_password'))"
]
},
{
Expand Down Expand Up @@ -427,7 +450,9 @@
" Returned processed edges (relationships).\n",
" \"\"\"\n",
" \n",
" relationships = df[['source', 'target', 'tran_id']]\n",
" relationships = df[['source', 'target', 'tran_id', 'tran_timestamp']].copy()\n",
" relationships[\"tran_timestamp\"] = relationships[\"tran_timestamp\"].map(lambda x: int(x) // 100000)\n",
" \n",
" relationships = relationships.rename(columns={\"source\": \"sourceNodeId\",\n",
" \"target\": \"targetNodeId\"},\n",
" errors=\"raise\")\n",
Expand All @@ -450,7 +475,8 @@
"metadata": {},
"outputs": [],
"source": [
"def pupulate_graph(input_df):\n",
"# populate graph database\n",
"def populate_graph(input_df):\n",
" \"\"\"\n",
" Build Neo4j graph and return node embeddings\n",
" \"\"\"\n",
Expand All @@ -465,78 +491,100 @@
" # Check if the number of nodes is correctly stored\n",
" assert G.node_count() == len(nodes)\n",
"\n",
" # Compute embeddings\n",
" graph_embdeddings_df = gds.node2vec.stream(G)\n",
"\n",
" # Delete graph for next partition\n",
" G.drop()\n",
"\n",
" # Convert integer node ID back to the original ID\n",
" graph_embdeddings_df['nodeId'] = [convertFromNumber(nodeId) for nodeId in graph_embdeddings_df['nodeId']]\n",
"\n",
" return {\"id\": graph_embdeddings_df.nodeId.to_numpy(), \"graph_embeddings\": graph_embdeddings_df.embedding.to_numpy()}"
"populate_graph(transaction_labels)\n",
"transaction_labels.drop([\"event_time\"], axis=1, inplace=True)"
]
},
{
"cell_type": "markdown",
"id": "f288f063",
"cell_type": "code",
"execution_count": null,
"id": "5ea8cdb1-5f9d-4139-ba08-7574e28976ad",
"metadata": {},
"outputs": [],
"source": [
"#### Compute graph embeddings"
"#G.drop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "45f95f3b",
"id": "cb83623b-7542-4254-845d-1237eabd16fd",
"metadata": {},
"outputs": [],
"source": [
"tqdm.pandas()\n",
"transaction_graphs_by_month = transaction_labels.groupby(pd.Grouper(key='tran_timestamp', freq='M')).progress_apply(lambda x: pupulate_graph(x)) "
"# get graph object\n",
"G = gds.graph.get(\"transactions-graph\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c6f7108c",
"id": "a0e6ddf0-9f39-4d28-a4b7-fc560bb6389b",
"metadata": {},
"outputs": [],
"source": [
"# Build embeddings data frame\n",
"# compute node embeddings\n",
"def compute_node_embeddings_subgraph(G, start_date, end_date):\n",
" \"\"\"\n",
" Poject Neo4j sub graph and return node embeddings\n",
" \"\"\"\n",
" start = float(start_date.timestamp() / 1000000000)\n",
" end = float(end_date.timestamp() / 1000000000)\n",
"\n",
" # Project sub Graph\n",
" gds.graph.filter(\n",
" \"tmp-sub-graph\", # new projected graph\n",
" G, # existing projected graph\n",
" \"*\",\n",
" f\"r.tran_timestamp > {start} AND r.tran_timestamp < {end}\"\n",
" )\n",
" subG = gds.graph.get(\"tmp-sub-graph\")\n",
"\n",
"timestamps = transaction_graphs_by_month.index.values\n",
"graph_embeddings = transaction_graphs_by_month.tolist()\n",
"\n",
"graph_embdeddings_df = pd.DataFrame()\n",
"for timestamp, graph_embedding in zip(timestamps, graph_embeddings):\n",
" df_tmp = pd.DataFrame(graph_embedding)\n",
" df_tmp[\"tran_timestamp\"] = timestamp\n",
" graph_embdeddings_df = pd.concat([graph_embdeddings_df, df_tmp]) \n",
"graph_embdeddings_df.head(5)"
" # Check if the number of nodes is correctly stored\n",
" # Compute embeddings\n",
" graph_embdeddings_df = gds.node2vec.stream(subG)\n",
"\n",
" # Delete graph for next partition\n",
" subG.drop()\n",
"\n",
" # Convert integer node ID back to the original ID\n",
" graph_embdeddings_df['nodeId'] = [convertFromNumber(nodeId) for nodeId in graph_embdeddings_df['nodeId']]\n",
"\n",
" return {\"id\": graph_embdeddings_df.nodeId.to_numpy(), \"graph_embeddings\": graph_embdeddings_df.embedding.to_numpy(), \"tran_timestamp\": start_date}\n",
" "
]
},
{
"cell_type": "markdown",
"id": "89521da4",
"id": "94125a6b-0808-4247-a32f-1b53a1416615",
"metadata": {},
"source": [
"#### Convert date time to unix epoc milliseconds "
"#### Compute graph embeddings"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87b0af80",
"id": "e2bd9de6-0814-4e1d-a6e5-5b3573104dd9",
"metadata": {},
"outputs": [],
"source": [
"transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6\n",
"graph_embdeddings_df.tran_timestamp = graph_embdeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6\n",
"party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)\n",
"party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)\n",
"# Compute embeddings by month and build data frame\n",
"\n",
"transaction_labels['month'] = pd.to_datetime(transaction_labels['tran_timestamp'], unit='ms').dt.month"
"emb_df_list = []\n",
"start_date = datetime.datetime(2020, 1, 1)\n",
"end_date = datetime.datetime(2021, 12, 31)\n",
"\n",
"while start_date <= end_date:\n",
" last_day_of_month = datetime.datetime(start_date.year, start_date.month, 1) + datetime.timedelta(days=32)\n",
" end_date_of_month = last_day_of_month - datetime.timedelta(days=last_day_of_month.day)\n",
" emb_df_list.append( pd.DataFrame(compute_node_embeddings_subgraph(G, start_date, end_date_of_month)))\n",
" start_date = end_date_of_month + datetime.timedelta(days=1)\n",
"\n",
"graph_embdeddings_df = pd.concat(emb_df_list)\n",
"graph_embdeddings_df[\"tran_timestamp\"] = graph_embdeddings_df.tran_timestamp.values.astype(np.int64) // 10 ** 6\n",
"graph_embdeddings_df.head()"
]
},
{
Expand Down Expand Up @@ -568,7 +616,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "a6a0128f-01c8-4e18-b72b-d1e13744c431",
"id": "627bfbbf-1dea-4898-8dd2-a8e139f0f497",
"metadata": {},
"outputs": [],
"source": [
Expand Down Expand Up @@ -664,7 +712,6 @@
" name = \"transactions_monthly\",\n",
" version = 1,\n",
" primary_key = [\"id\"],\n",
" partition_key = [\"tran_timestamp\"], \n",
" description = \"transactions monthly aggregates features\",\n",
" event_time = 'tran_timestamp',\n",
" online_enabled = True,\n",
Expand Down Expand Up @@ -804,9 +851,9 @@
],
"metadata": {
"kernelspec": {
"display_name": "myenv",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "myenv"
"name": "python3"
},
"language_info": {
"codemirror_mode": {
Expand Down
8 changes: 4 additions & 4 deletions integrations/neo4j/2_training_pipeline.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
")\n",
"\n",
"graph_embeddings_fg = fs.get_feature_group(\n",
" name=\"transactions_emb\",\n",
" name=\"graph_embeddings\",\n",
" version=1,\n",
") \n",
"\n",
Expand Down Expand Up @@ -113,9 +113,9 @@
" \"monthly_out_mean_amount\", \n",
" \"monthly_out_std_amount\",\n",
" ]\n",
" ).join(\n",
" )).join(\n",
" graph_embeddings_fg.select([\"graph_embeddings\"]),\n",
" )"
" )\n"
]
},
{
Expand Down Expand Up @@ -564,7 +564,7 @@
" feature_vector_exploded_emb = [*self.flat2gen(feature_vector)]\n",
"\n",
" # Reshape feature vector to match the model's input shape\n",
" feature_vector_reshaped = np.array(feature_vector_exploded_emb).reshape(1, 41)\n",
" feature_vector_reshaped = np.array(feature_vector_exploded_emb).reshape(1, 137)\n",
"\n",
" # Convert the feature vector to a TensorFlow constant\n",
" input_vector = tf.constant(feature_vector_reshaped, dtype=tf.float32)\n",
Expand Down
8 changes: 4 additions & 4 deletions integrations/neo4j/3_online_inference.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"metadata": {},
"outputs": [],
"source": [
"deployment.get_logs()"
"#deployment.get_logs()"
]
},
{
Expand Down Expand Up @@ -168,7 +168,7 @@
"metadata": {},
"outputs": [],
"source": [
"deployment.get_logs()"
"#deployment.get_logs()"
]
},
{
Expand Down Expand Up @@ -279,9 +279,9 @@
],
"metadata": {
"kernelspec": {
"display_name": "myenv",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "myenv"
"name": "python3"
},
"language_info": {
"codemirror_mode": {
Expand Down