# <span style="font-width:bold; font-size: 3rem; color:#1EB182;">**Hopsworks Feature Store** </span><span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Load, Engineer & Connect</span>

<span style="font-width:bold; font-size: 1.4rem;"> This is the first part of the Neo4j and Hopsworks Feature Store integration. As part of this first module, we will work with data related to credit card transactions. 
The objective of this tutorial is to demonstrate how to work with **Neo4j** and the **Hopworks Feature Store**  for batch data with a goal of training and deploying a model that can predict fraudulent transactions.</span>

## **🗒️ This notebook is divided in 4 sections:** 
1. Import data into Neo4j
2. Use Neo4j's GDS library and `node2vec` to calculate graph node embeddings
3. More Feature Engineering of transactions
5. Create Feature Groups in Hopsworks Feature Store from Neo4j embeddings and processed features

![tutorial-flow](../../images/01_featuregroups.png)

### 📝 Import libraries 

In [1]:
import datetime
import neo4j
from neo4j import GraphDatabase
from graphdatascience import GraphDataScience
import pandas as pd
import numpy as np
from features.transactions import get_in_out_transactions
from features.party import get_transaction_labels, get_party_labels

## <span style="color:#ff5f27;"> 💽 Loading the Data </span>

The data we will use comes from three different CSV files:

- `transactions.csv`: transaction information such as timestamp, location, and the amount. 
- `alert_transactions.csv`: Suspicious Activity Report (SAR) transactions.
- `party.csv`: User profile information.

In a production system, these CSV files would originate from separate data sources or tables, and probably separate data pipelines. **All three files have a customer id column `id` in common, which we can use for joins.**

Let's go ahead and load the data.

## Loading the Data into Neo4j

#### Neo4j setup
Before executing the next cells, the Neo4j database must be installed and initialized:
- Install Neo4j Desktop from https://neo4j.com/download/
- Create a new database project and server
- Install the APOC and GDS plugins
- BOLT protocol should be set (if already is not) in the the [neo4j.conf](https://neo4j.com/docs/operations-manual/current/configuration/neo4j-conf/) file

First, let's set a few parameters to connect with the Neo4j database.

In [2]:
URI = "bolt://localhost:7687"
AUTH = ("neo4j", "changeme")
DATABASE = "gdshopsworksdemo"

Then we create a few indexes in Neo4j.

In [4]:
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.execute_query("CREATE CONSTRAINT party_id_constraint FOR (p:Party) REQUIRE p.partyId IS UNIQUE", database_=DATABASE)
    driver.execute_query("CREATE TEXT INDEX party_type_index FOR (p:Party) ON (p.partyType)", database_=DATABASE)
    driver.execute_query("CREATE CONSTRAINT transaction_id_constraint FOR ()-[r:TRANSACTION]-() REQUIRE r.tran_id is UNIQUE", database_=DATABASE)
    driver.execute_query("CREATE TEXT INDEX transaction_timestamp_index FOR ()-[r:TRANSACTION]-() ON r.tran_timestamp", database_=DATABASE)

Then we do the first import of the first .csv file, holding the (:Party) nodes. This will finish very quickly, as there are only 7-8k nodes.

In [5]:
with driver.session(database=DATABASE) as session:
            result = session.run("""
                load csv with headers from "https://repo.hops.works/master/hopsworks-tutorials/data/aml/party.csv" as parties
                create (p:Party)
                set p = parties
            """)
print(result.consume().counters)

  with driver.session(database=DATABASE) as session:


{'_contains_updates': True, 'labels_added': 7347, 'nodes_created': 7347, 'properties_set': 14694}


Next we will import the relationshops. There are approx 430k [:TRANSACTION] relationships, and importing these will take a few minutes.

In [6]:
with driver.session(database=DATABASE) as session:
            result = session.run("""
                LOAD CSV WITH HEADERS FROM "https://repo.hops.works/master/hopsworks-tutorials/data/aml/transactions.csv" AS Transaction
                    MATCH (startNode:Party)
                    WHERE startNode.partyId = Transaction.src
                    CALL {
                        WITH Transaction, startNode
                        MATCH (endNode:Party)
                        WHERE endNode.partyId = Transaction.dst
                        CREATE (startNode)-[rel:TRANSACTION {tran_id: Transaction.tran_id, tx_type: Transaction.tx_type, base_amt: Transaction.base_amt, tran_timestamp: datetime(Transaction.tran_timestamp)}]->(endNode)
                    } IN TRANSACTIONS OF 2500 ROWS;
            """)
print(result.consume().counters)

  with driver.session(database=DATABASE) as session:


{'_contains_updates': True, 'relationships_created': 438386, 'properties_set': 1753544}


This completes the importing of the data into Neo4j.

## Loading remaining data

#### ⛳️ Transactions dataset

In [7]:
transactions_df = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/transactions.csv", parse_dates = ['tran_timestamp'])
transactions_df.head(5)

Unnamed: 0,tran_id,tx_type,base_amt,tran_timestamp,src,dst
0,496,TRANSFER-FanOut,858.77,2020-01-01 00:00:00+00:00,3aa9646b,1e46e726
1,1342,TRANSFER-Mutual,386.86,2020-01-01 00:00:00+00:00,49203bc3,a74d1101
2,1580,TRANSFER-FanOut,616.43,2020-01-02 00:00:00+00:00,616d4505,99af2455
3,2866,TRANSFER-FanOut,146.44,2020-01-02 00:00:00+00:00,39be1ea2,e7ec7bdb
4,3997,TRANSFER-Mutual,439.09,2020-01-03 00:00:00+00:00,e2e0d938,afc399a9


#### ⛳️ Alert Transactions dataset

In [8]:
alert_transactions = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/alert_transactions.csv")
alert_transactions.head()

Unnamed: 0,alert_id,alert_type,is_sar,tran_id
0,47,gather_scatter,True,11873
1,47,gather_scatter,True,11874
2,47,gather_scatter,True,11875
3,47,gather_scatter,True,13151
4,47,gather_scatter,True,23148


#### ⛳️ Party dataset

In [9]:
party = pd.read_csv("https://repo.hops.works/master/hopsworks-tutorials/data/aml/party.csv")
party.head()

Unnamed: 0,partyId,partyType
0,5628bd6c,Organization
1,a1fcba39,Organization
2,f56c9501,Individual
3,9969afdd,Organization
4,b356eeae,Individual


## <span style="color:#ff5f27;"> 🛠️ Feature Engineering </span>
## Calculating the node embeddings in Neo4j
For each month of transactions, compute and store embeddings using the `node2vec` library. This uses the GDS library of Neo4j, which [needs to be installed](https://neo4j.com/docs/graph-data-science/current/installation/) on the Neo4j server.

In [10]:
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    try:
        gds = GraphDataScience(URI, auth=AUTH, database=DATABASE)

        # Determine months of transactions
        start_date = transactions_df['tran_timestamp'].min().to_pydatetime().replace(tzinfo=None)
        end_date = transactions_df['tran_timestamp'].max().to_pydatetime().replace(tzinfo=None)

        # For each month of transactions
        while start_date <= end_date:
            last_day_of_month = datetime.datetime(start_date.year, start_date.month, 1) + datetime.timedelta(days=32)
            end_date_of_month = last_day_of_month - datetime.timedelta(days=last_day_of_month.day)

            # Convert dates as milliseconds
            start = float(start_date.timestamp() / 10 ** 9)
            end = float(end_date_of_month.timestamp() / 10 ** 9)

            # Retrieve transactions within the month
            gds.run_cypher(
            f"MATCH (p1:Party)-[t:TRANSACTION]->(p2:Party) WHERE t.tran_timestamp >= {start} AND t.tran_timestamp < {end} RETURN p1, t, p2")

            # Create a temporary graph
            G, project_result = gds.graph.project("transaction_graph", "Party", "TRANSACTION")

            # Use the temporary graph to compute embeddings and save them as a property in the Neo4j database
            node2vec_result = gds.node2vec.write(
                G,                                #  Graph object
                embeddingDimension=16,
                walkLength=80,
                inOutFactor=1,
                returnFactor=1,
                writeProperty="node2vec"
            )

            # Increment to next month
            start_date = end_date_of_month + datetime.timedelta(days=1)

            # Remove the current monthly graph to generate a graph for the following month in the subsequent iteration
            gds.run_cypher(
                """
                CALL gds.graph.drop('transaction_graph') YIELD graphName
                """
            )

    except Exception as e:
        print(e)

## Feature Engineering outside Neo4j

#### To investigate patterns of suspicious activities you will make time window aggregates such monthly frequency, total, mean and standard deviation of amount of incoming and outgoing transasactions.  

In [11]:
# Renaming columns for clarity
transactions_df.columns = ['tran_id', 'tx_type', 'base_amt', 'tran_timestamp', 'source', 'target']

# Reordering columns for better readability
transactions_df = transactions_df[["source", "target", "tran_timestamp", "tran_id", "base_amt"]]

# Displaying the first few rows of the DataFrame
transactions_df.head(3)

Unnamed: 0,source,target,tran_timestamp,tran_id,base_amt
0,3aa9646b,1e46e726,2020-01-01 00:00:00+00:00,496,858.77
1,49203bc3,a74d1101,2020-01-01 00:00:00+00:00,1342,386.86
2,616d4505,99af2455,2020-01-02 00:00:00+00:00,1580,616.43


#### Incoming and Outgoing transactions

In [12]:
# Generating a DataFrame with monthly incoming and outgoing transaction statistics
in_out_df = get_in_out_transactions(transactions_df)

# Displaying the first few rows of the resulting DataFrame
in_out_df.head(3)

Unnamed: 0,tran_timestamp,id,monthly_in_count,monthly_in_total_amount,monthly_in_mean_amount,monthly_in_std_amount,monthly_out_count,monthly_out_total_amount,monthly_out_mean_amount,monthly_out_std_amount
0,1580428800000,0016359b,4.0,1872.92,468.23,175.2747,4.0,1843.32,460.83,252.951744
1,1580428800000,001dcc27,9.0,5874.64,652.737778,271.236889,0.0,0.0,0.0,0.0
2,1580428800000,00298665,1.0,755.64,755.64,0.0,1.0,521.11,521.11,0.0


#### Transactions identified as suspicious activity 

In [13]:
# Displaying the first few rows of the 'alert_transactions' DataFrame
alert_transactions.head(3)

Unnamed: 0,alert_id,alert_type,is_sar,tran_id
0,47,gather_scatter,True,11873
1,47,gather_scatter,True,11874
2,47,gather_scatter,True,11875


In [14]:
# Generating transaction labels based on transaction and alert transaction data
transaction_labels = get_transaction_labels(
    transactions_df, 
    alert_transactions,
)

# Displaying the first three rows of the resulting DataFrame
transaction_labels.head(3)

Unnamed: 0,source,target,id,tran_timestamp,is_sar
322886,cee9cf6d,79c248ae,2,2020-01-01 00:00:00+00:00,0
307052,65ab2f44,b20ce84b,3,2020-01-01 00:00:00+00:00,0
181198,2a39b731,a07edae4,4,2020-01-01 00:00:00+00:00,0


#### Party dataset

In [15]:
# Renaming columns for clarity
party.columns = ["id", "type"]

# Mapping 'type' values to numerical values for better representation
party.type = party.type.map({"Individual": 0, "Organization": 1})

# Displaying the first three rows of the DataFrame
party.head(3)

Unnamed: 0,id,type
0,5628bd6c,1
1,a1fcba39,1
2,f56c9501,0


In [16]:
# Filtering transactions with SAR(Suspicious Activity Reports) labels from the generated transaction labels DataFrame
alert_transactions = transaction_labels[transaction_labels.is_sar == 1]

# Displaying the first few rows of transactions flagged as SAR
alert_transactions.head(3)

Unnamed: 0,source,target,id,tran_timestamp,is_sar
41322,5e7442f1,0bffd1da,11873,2020-01-09 00:00:00+00:00,1
62128,65c7b5a1,0bffd1da,11874,2020-01-09 00:00:00+00:00,1
57575,04128f28,0bffd1da,11875,2020-01-09 00:00:00+00:00,1


In [17]:
# Generating party labels based on transaction labels and party information
party_labels = get_party_labels(
    transaction_labels, 
    party,
)

# Displaying the first three rows of the resulting DataFrame
party_labels.head(3)

Unnamed: 0,id,type,tran_timestamp,is_sar
0,5628bd6c,1,2021-12-20 00:00:00,0
1,a1fcba39,1,2021-12-20 00:00:00,0
2,f56c9501,0,2021-12-20 00:00:00,0


#### Convert date time to unix epoc milliseconds 

In [18]:
# Converting 'tran_timestamp' values to milliseconds for consistency
transaction_labels.tran_timestamp = transaction_labels.tran_timestamp.values.astype(np.int64) // 10 ** 6

# Converting 'tran_timestamp' values in 'party_labels' to milliseconds
party_labels.tran_timestamp = party_labels.tran_timestamp.map(lambda x: datetime.datetime.timestamp(x) * 1000)
party_labels.tran_timestamp = party_labels.tran_timestamp.values.astype(np.int64)

# Displaying the first three rows of the DataFrame
transaction_labels.head(3)

Unnamed: 0,source,target,id,tran_timestamp,is_sar
322886,cee9cf6d,79c248ae,2,1577836800000,0
307052,65ab2f44,b20ce84b,3,1577836800000,0
181198,2a39b731,a07edae4,4,1577836800000,0


---

# <span style="color:#ff5f27;">👮🏼‍♀️ Data Validation</span>

Before you define [feature groups](https://docs.hopsworks.ai/latest/generated/feature_group/) lets define [validation rules](https://docs.hopsworks.ai/latest/generated/feature_validation/) for features. You do expect some of the features to comply with certain *rules* or *expectations*. For example: a transacted amount must be a positive value. In the case of a transacted amount arriving as a negative value you can decide whether to stop it to `write` into a feature group and throw an error or allow it to be written but provide a warning. In the next section you will create feature store `expectations`, attach them to feature groups, and apply them to dataframes being appended to said feature group.

#### Data validation with Greate Expectations in Hopsworks
You can use GE library for validation in Hopsworks features store. 

##  <img src="../../images/icon102.png" width="18px"></img> Hopsworks feature store

The Hopsworks feature feature store library is Apache V2 licensed and available [here](https://github.com/logicalclocks/feature-store-api). The library is currently available for Python and JVM languages such as Scala and Java.
In this notebook, we are going to cover Python part.

You can find the complete documentation of the library here: 

The first step is to establish a connection with your Hopsworks feature store instance and retrieve the object that represents the feature store you'll be working with. 

> By default `project.get_feature_store()` returns the feature store of the project we are working with. However, it accepts also a project name as parameter to select a different feature store.

In [19]:
# import hopsworks

# project = hopsworks.login()

# fs = project.get_feature_store()


import hopsworks
conn = hopsworks.connection(
    host="snurran.hops.works",                                # DNS of your Feature Store instance
    hostname_verification=False,                     # Disable for self-signed certificates
    api_key_value="mHcmlu3RlZY6VqRu.loXm26Aq1TaygwyS8wb4qrK90Fc0Rmzve1ziE7FresJEgf0DxFHfIUOMthohuh52"          # Feature store API key value 
)
project = conn.get_project('GraphEmbeddingsDemo') # specify your project name
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.


### 🔬 Expectations suite

In [20]:
# Define Expectation Suite - no use of HSFS
import great_expectations as ge
from pprint import pprint
import json

expectation_suite = ge.core.ExpectationSuite(expectation_suite_name="aml_project_validations")
pprint(expectation_suite.to_json_dict(), indent=2)

{ 'data_asset_type': None,
  'expectation_suite_name': 'aml_project_validations',
  'expectations': [],
  'ge_cloud_id': None,
  'meta': {'great_expectations_version': '0.15.12'}}


In [21]:
expectation_suite.add_expectation(
  ge.core.ExpectationConfiguration(
  expectation_type="expect_column_max_to_be_between",
  kwargs={"column": "monthly_in_count", "min_value": 0, "max_value": 10000000}) 
)

{"kwargs": {"column": "monthly_in_count", "min_value": 0, "max_value": 10000000}, "meta": {}, "expectation_type": "expect_column_max_to_be_between"}

In [22]:
pprint(expectation_suite)

{
  "expectations": [
    {
      "kwargs": {
        "column": "monthly_in_count",
        "min_value": 0,
        "max_value": 10000000
      },
      "meta": {},
      "expectation_type": "expect_column_max_to_be_between"
    }
  ],
  "data_asset_type": null,
  "meta": {
    "great_expectations_version": "0.15.12"
  },
  "ge_cloud_id": null,
  "expectation_suite_name": "aml_project_validations"
}


## <span style="color:#ff5f27;"> 🪄 Register Feature Groups in Hopsworks </span>

A `Feature Groups` is a logical grouping of features, and experience has shown, that this grouping generally originates from the features being derived from the same data source. The `Feature Group` lets you save metadata along features, which defines how the Feature Store interprets them, combines them and reproduces training datasets created from them.

Generally, the features in a feature group are engineered together in an ingestion job. However, it is possible to have additional jobs to append features to an existing feature group. Furthermore, `feature groups` provide a way of defining a namespace for features, such that you can define features with the same name multiple times, but uniquely identified by the group they are contained in.

> It is important to note that `feature groups` are not groupings of features for immediate training of Machine Learning models. Instead, to ensure reusability of features, it is possible to combine features from any number of groups into training datasets.

#### Read embeddings from Neo4j, and store them in the Hopsworks feature store

In [23]:
# Connecting to Neo4j, getting the embeddings, and putting them into a data frame

with GraphDatabase.driver(URI, auth=AUTH) as driver:
    graph_embeddings_df = driver.execute_query(
        """MATCH (p:Party)-[t:TRANSACTION]->(:Party) 
            return 
            p.partyId as id, 
            p.node2vec as party_graph_embedding, 
            datetime(t.tran_timestamp).epochmillis as tran_timestamp""",
        database_=DATABASE,
        result_transformer_=neo4j.Result.to_df
    )
    
    print(type(graph_embeddings_df))  # <class 'pandas.core.frame.DataFrame'>
    print(graph_embeddings_df.head())

<class 'pandas.core.frame.DataFrame'>
         id                              party_graph_embedding  tran_timestamp
0  5628bd6c  [-0.813572108745575, 0.016053855419158936, 0.0...   1603843200000
1  5628bd6c  [-0.813572108745575, 0.016053855419158936, 0.0...   1602028800000
2  5628bd6c  [-0.813572108745575, 0.016053855419158936, 0.0...   1599609600000
3  5628bd6c  [-0.813572108745575, 0.016053855419158936, 0.0...   1587513600000
4  5628bd6c  [-0.813572108745575, 0.016053855419158936, 0.0...   1605657600000


In [24]:
# Increase data size to store embeddings
from hsfs import engine
features = engine.get_instance().parse_schema_feature_group(graph_embeddings_df)
for f in features:
    if f.type == "array<double>" or f.type == "array<float>":
        f.online_type = "VARBINARY(20000)"

# Define Feature Group
graph_embeddings_fg = fs.get_or_create_feature_group(name="graph_embeddings",
                                       version=1,
                                       primary_key=["id"],
                                       description="node embeddings from transactions graph",
                                       event_time = 'tran_timestamp',     
                                       online_enabled=True,
                                       features=features,
                                       statistics_config={"enabled": False, "histograms": False, "correlations": False, "exact_uniqueness": False}
                                       )

# Insert data frame into Feature Group
graph_embeddings_fg.insert(graph_embeddings_df)

Feature Group created successfully, explore it at 
https://snurran.hops.works:443/p/10375/fs/10322/fg/12309


Uploading Dataframe: 0.00% |          | Rows 0/438386 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: graph_embeddings_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://snurran.hops.works/p/10375/jobs/named/graph_embeddings_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x288b85940>, None)

#### Transactions monthly aggregates feature group

In [25]:
# Define Feature Group
transactions_fg = fs.get_or_create_feature_group(
    name = "transactions_monthly",
    version = 1,
    primary_key = ["id"],
    partition_key = ["tran_timestamp"],   
    description = "transactions monthly aggregates features",
    event_time = 'tran_timestamp',
    online_enabled = True,
    statistics_config = {"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": False},
    expectation_suite=expectation_suite
)   

# Insert data frame into Feature Group
transactions_fg.insert(in_out_df)

Feature Group created successfully, explore it at 
https://snurran.hops.works:443/p/10375/fs/10322/fg/12310
Validation succeeded.
Validation Report saved successfully, explore a summary at https://snurran.hops.works:443/p/10375/fs/10322/fg/12310


Uploading Dataframe: 0.00% |          | Rows 0/170876 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: transactions_monthly_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://snurran.hops.works/p/10375/jobs/named/transactions_monthly_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x1715b2f10>,
 {
   "results": [
     {
       "result": {
         "observed_value": 26.0,
         "element_count": 170876,
         "missing_count": null,
         "missing_percent": null
       },
       "expectation_config": {
         "kwargs": {
           "column": "monthly_in_count",
           "min_value": 0,
           "max_value": 10000000
         },
         "meta": {
           "expectationId": 9237
         },
         "expectation_type": "expect_column_max_to_be_between"
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-04-11T01:54:37.000273Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "success": true
     }
   ],
   "evaluation_parameters": {},
   "meta": {
     "great_expectations_version": "0.15.12",
     "expectation_suite_name": "aml_project_validations",
     "run_id": {
  

#### Party feature group

In [26]:
# Define Feature Group
party_fg = fs.get_or_create_feature_group(
    name = "party_labels",
    version = 1,
    primary_key = ["id"],
    description = "party fg with labels",
    event_time = 'tran_timestamp',        
    online_enabled = True,
    statistics_config = {"enabled": True, "histograms": True, "correlations": True, "exact_uniqueness": False}
)

# Insert data frame into Feature Group
party_fg.insert(party_labels)

Feature Group created successfully, explore it at 
https://snurran.hops.works:443/p/10375/fs/10322/fg/12311


Uploading Dataframe: 0.00% |          | Rows 0/7347 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: party_labels_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://snurran.hops.works/p/10375/jobs/named/party_labels_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x149172ac0>, None)

## <span style="color:#ff5f27;"> ⏭️ **Next:** Part 02 </span>
    
In the following notebook you will use feature groups to create feature views and training dataset.