# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"> **Hopsworks Feature Store** </span>
<span style="font-width:bold; font-size: 2rem; color:#333;">Part 01 - Feature Pipeline: Load, Engineer & Connect</span>

## <span style="color:#ff5f27;"> 📝 Imports </span>

In [1]:
import hopsworks
import pandas as pd
from graphdatascience import GraphDataScience
from datetime import datetime
import great_expectations as ge
from great_expectations.core import ExpectationSuite, ExpectationConfiguration
from keys import *

In [2]:
'''
NEO4J_URI = "neo4j://..."
NEO4J_USER = "..."
NEO4J_PASSWORD = "..."
DATABASE_NAME = "..."
'''

'\nNEO4J_URI = "neo4j://..."\nNEO4J_USER = "..."\nNEO4J_PASSWORD = "..."\nDATABASE_NAME = "..."\n'

## <span style="color:#ff5f27;"> 💽 Loading Graph from Neo4J </span>

In [3]:
gds = GraphDataScience(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD), database=DATABASE_NAME)

In [4]:
gds.graph.list()

Unnamed: 0,degreeDistribution,graphName,database,memoryUsage,sizeInBytes,nodeCount,relationshipCount,configuration,density,creationTime,modificationTime,schema,schemaWithOrientation
0,"{'min': 0, 'max': 124, 'p90': 4, 'p999': 124, ...",portRoutes2,citi-supply-chain-supplychain-nov-3-2023-11-17-23,2559 KiB,2620584,729,1305,{},0.002459,2023-11-03T18:46:50.246652087+00:00,2023-11-03T18:46:50.598936154+00:00,"{'graphProperties': {}, 'nodes': {'Port': {}},...","{'graphProperties': {}, 'nodes': {'Port': {}},..."


In [5]:
port_routes = gds.graph.get("portRoutes2")

## <span style="color:#ff5f27;"> 🔎 Explore nodes </span>

In [6]:
port_routes

Graph({'graphName': 'portRoutes2', 'nodeCount': 729, 'relationshipCount': 1305, 'database': 'citi-supply-chain-supplychain-nov-3-2023-11-17-23', 'configuration': {}, 'schema': {'graphProperties': {}, 'nodes': {'Port': {}}, 'relationships': {'HAS_TRADE_ROUTE': {'distance': 'Float (DefaultValue(NaN), PERSISTENT, Aggregation.NONE)'}}}, 'memoryUsage': '2559 KiB'})

In [7]:
port_routes.node_properties()

Port    []
dtype: object

In [8]:
port_routes.name()

'portRoutes2'

In [9]:
# Shortest path between 2 example nodes
display(gds.shortestPath.dijkstra.stream(port_routes, sourceNode=2406, targetNode=2404))
display(gds.shortestPath.dijkstra.stream(port_routes, sourceNode=2358, targetNode=2378))

Unnamed: 0,index,sourceNode,targetNode,totalCost,nodeIds,costs,path
0,0,2406,2404,3.0,"[2406, 2360, 2386, 2404]","[0.0, 1.0, 2.0, 3.0]","((cost), (cost), (cost))"


Unnamed: 0,index,sourceNode,targetNode,totalCost,nodeIds,costs,path
0,0,2358,2378,1.0,"[2358, 2378]","[0.0, 1.0]",((cost))


## <span style="color:#ff5f27;"> 🛠️ Graph Feature Engineering using Neo4J APIs</span>


# Retrieve all trips

In [10]:
'''
trip_df = gds.run_cypher(
    "MATCH (p) RETURN p.trip_id, p.arrival_time, p.destination_port, p.lon, p.departure_time, p.lat, p.departure_port"
)
trip_df.dropna(subset = ["p.trip_id"], inplace=True)
trip_df.dropna(inplace=True)

trip_df.to_csv('tmp/trip_df.csv', index=False)
'''

'\ntrip_df = gds.run_cypher(\n    "MATCH (p) RETURN p.trip_id, p.arrival_time, p.destination_port, p.lon, p.departure_time, p.lat, p.departure_port"\n)\ntrip_df.dropna(subset = ["p.trip_id"], inplace=True)\ntrip_df.dropna(inplace=True)\n\ntrip_df.to_csv(\'tmp/trip_df.csv\', index=False)\n'

In [11]:
trip_df = pd.read_csv('tmp/trip_df.csv')
trip_df

Unnamed: 0,p.trip_id,p.arrival_time,p.destination_port,p.lon,p.departure_time,p.lat,p.departure_port
0,b56053ae-3dab-429b-8edd-fb5d00a294b2,2019-05-31T17:45:00.000Z,INDONESIA,110.985657,2019-05-30T22:54:30.000Z,-4.464749,IDPJG
1,50f3fa2b-5b62-44d2-b955-7da8c3464eb9,2022-02-23T03:43:00.000Z,INDONESIA,106.833300,2022-02-22T22:34:00.000Z,-6.133300,IDJKT
2,ffc87c9b-78e7-4109-bb78-d3dee96d5974,2022-04-04T12:40:00.000Z,INDONESIA,106.833300,2022-04-03T21:07:00.000Z,-6.133300,IDJKT
3,cbc7f3fa-e13f-4601-99bb-48a68c77edbe,2019-05-31T17:45:00.000Z,INDONESIA,110.985657,2019-05-30T22:54:30.000Z,-4.464749,IDPJG
4,dcb8214c-b245-46f5-935b-2003d55060b9,2020-01-21T16:17:00.000Z,CNFOC,119.663002,2020-01-21T10:21:00.000Z,25.935101,CNFOC
...,...,...,...,...,...,...,...
773000,7200af38-9555-4fd2-b3a1-78f7cb53db03,2022-05-07T01:27:00.000Z,MYKUA,-90.258003,2022-03-27T05:50:00.000Z,29.993401,USMSY
773001,58bd9881-1862-4df0-a38b-0f2639851e34,2022-05-07T01:30:00.000Z,MYKUA,-90.258003,2022-03-27T06:14:00.000Z,29.993401,USMSY
773002,31993d0f-d1ec-443c-a87d-4c5d9c590364,2021-11-11T11:03:20.000Z,TRTEK,4.416700,2021-10-31T23:54:32.727Z,51.216700,BEANR
773003,6b57c1f4-8f0d-4a7a-bc7e-f4f069dc7fc1,2021-11-11T11:03:20.000Z,TRTEK,4.416700,2021-10-31T23:54:32.727Z,51.216700,BEANR


# Compute paths (nodes to cross)

In [12]:
distances = []
existent = [] # counter list to avoid repetitions
trip_df = trip_df[trip_df["p.departure_port"] != trip_df["p.destination_port"]] # remove trips where origin and destination are the same

for departure_port,destination_port in trip_df[["p.departure_port", "p.destination_port"]].values.tolist():
    if not [departure_port,destination_port] in existent: # check if port pair already computed
        existent.append([departure_port,destination_port]) # increase counter list
        distances.append(gds.shortestPath.dijkstra.stream(port_routes, # graph
                                                          sourceNode=gds.find_node_id(["Port"], {"port_code": departure_port}), # departure
                                                          targetNode=gds.find_node_id(["Port"], {"port_code": destination_port})) # arrival
                         [["sourceNode", "targetNode", "totalCost", "nodeIds", "costs"]].to_dict())

In [22]:
# Reformat the distances
records = []
for r in distances:
    try:
        records.append({"sourceNode": r['sourceNode'][0], "targetNode": r['targetNode'][0], "nodeIds": r['nodeIds'][0], "costs": r['costs'][0]})
    except:
        pass

In [23]:
distances_df = pd.DataFrame.from_records(records) # make distances a dataframe
distances_df["hops"] = distances_df.nodeIds.map(lambda x: len(x)) # hops = number of nodes of the chain
distances_df = distances_df[distances_df.hops > 2] # select only trips with not adjacent origin-destination (at least 1 intermediate hop)

In [24]:
distances_df.columns = ['node_id', 'target_node_id', 'hop_node_ids', 'costs', 'hops']
distances_df

Unnamed: 0,node_id,target_node_id,hop_node_ids,costs,hops
6,6000,2360,"[6000, 2390, 2360]","[0.0, 1.0, 2.0]",3
8,2412,2360,"[2412, 2368, 2371, 2360]","[0.0, 1.0, 2.0, 3.0]",4
16,2383,2360,"[2383, 2371, 2360]","[0.0, 1.0, 2.0]",3
17,2364,2360,"[2364, 2371, 2360]","[0.0, 1.0, 2.0]",3
19,2405,2360,"[2405, 2388, 2360]","[0.0, 1.0, 2.0]",3
...,...,...,...,...,...
1531,2389,235522,"[2389, 2364, 235522]","[0.0, 1.0, 2.0]",3
1544,2364,321227,"[2364, 2366, 321227]","[0.0, 1.0, 2.0]",3
1557,4456,517443,"[4456, 2371, 2388, 517443]","[0.0, 1.0, 2.0, 3.0]",4
1560,2389,517449,"[2389, 2371, 2388, 517449]","[0.0, 1.0, 2.0, 3.0]",4


# Distances

#### Compute distances of each edge

In [25]:
distance_edge_df = gds.graph.relationshipProperty.stream(port_routes,  node_properties="distance")
distance_edge_df = distance_edge_df.dropna() # remove NAs
distance_edge_df.columns = ["node_id", "target_node_id", "relationshipType", "distance_km"] # rename columns
distance_edge_df = distance_edge_df[["node_id", "target_node_id", "distance_km"]] # select columns
distance_edge_df

Unnamed: 0,node_id,target_node_id,distance_km
1,2358,2378,6823.546747
2,2358,2404,7622.919047
3,2358,2405,8120.530186
4,2358,2406,6841.241767
5,2358,2419,7130.264141
...,...,...,...
1295,962513,2419,7255.514237
1296,963560,2419,7448.409956
1298,978248,2410,8796.975979
1299,984591,2410,5289.952167


In [26]:
# Example distances from source to target
distance_edge_df[(distance_edge_df.node_id == 2364) & (distance_edge_df.target_node_id ==2371)]

Unnamed: 0,node_id,target_node_id,distance_km
101,2364,2371,9296.306421


#### Compute distances for trips

In [27]:
from itertools import islice

def sliding_window(elements, window_size):
    if len(elements) <= window_size:
       return elements
    for i in range(len(elements)):
        result = elements[i:i+window_size]
        if len(result) > 1:
            yield(elements[i:i+window_size])

def estimate_dist(x):
    distance = 0
    for i in sliding_window(x, window_size=2):
        tmp_df = distance_edge_df[(distance_edge_df.node_id == i[0]) & (distance_edge_df.target_node_id ==i[1])]
        distance += tmp_df.distance_km.values[0]
    return distance 
        
distances_df["distance_km"] = distances_df.hop_node_ids.map(lambda x: estimate_dist(x))
distances_df

Unnamed: 0,node_id,target_node_id,hop_node_ids,costs,hops,distance_km
6,6000,2360,"[6000, 2390, 2360]","[0.0, 1.0, 2.0]",3,12710.743444
8,2412,2360,"[2412, 2368, 2371, 2360]","[0.0, 1.0, 2.0, 3.0]",4,27080.252706
16,2383,2360,"[2383, 2371, 2360]","[0.0, 1.0, 2.0]",3,13045.090625
17,2364,2360,"[2364, 2371, 2360]","[0.0, 1.0, 2.0]",3,16890.267257
19,2405,2360,"[2405, 2388, 2360]","[0.0, 1.0, 2.0]",3,10294.613418
...,...,...,...,...,...,...
1531,2389,235522,"[2389, 2364, 235522]","[0.0, 1.0, 2.0]",3,16208.056197
1544,2364,321227,"[2364, 2366, 321227]","[0.0, 1.0, 2.0]",3,6276.940293
1557,4456,517443,"[4456, 2371, 2388, 517443]","[0.0, 1.0, 2.0, 3.0]",4,17285.456048
1560,2389,517449,"[2389, 2371, 2388, 517449]","[0.0, 1.0, 2.0, 3.0]",4,20186.814340


# Graph Centrality

#### Compute centralitities for each node 

In [28]:
# Closeness Centrality
closeness_centrality_df = gds.closeness.stream(port_routes)
closeness_centrality_df.columns = ['node_id', 'closeness_centrality']

# Betweenness Centrality
betweenness_centrality_df = gds.betweenness.stream(port_routes)
betweenness_centrality_df.columns = ['node_id', 'betweenness_centrality']

# Degree Centrality
degree_centrality_df = gds.degree.stream(port_routes)
degree_centrality_df.columns = ['node_id', 'degree_centrality']

In [29]:
centrality_df = closeness_centrality_df.merge(betweenness_centrality_df, on='node_id')
centrality_df = centrality_df.merge(degree_centrality_df, on='node_id')
centrality_df

Unnamed: 0,node_id,closeness_centrality,betweenness_centrality,degree_centrality
0,2357,0.267057,274.000000,1.0
1,2358,0.000000,0.000000,5.0
2,2359,0.290749,162.234203,31.0
3,2360,0.435644,3447.800196,38.0
4,2361,0.311321,215.558261,17.0
...,...,...,...,...
724,1035413,0.000000,0.000000,0.0
725,1036604,0.000000,0.000000,0.0
726,1044372,1.000000,0.000000,0.0
727,1044373,0.000000,0.000000,0.0


# Arrange datasets

In [30]:
# Add current event time column
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M')
distances_df['timestamp'] = pd.to_datetime(timestamp)
centrality_df['timestamp'] = pd.to_datetime(timestamp)

In [33]:
distances_df.head(1)

Unnamed: 0,node_id,target_node_id,hop_node_ids,costs,hops,distance_km,timestamp
6,6000,2360,"[6000, 2390, 2360]","[0.0, 1.0, 2.0]",3,12710.743444,2024-01-31 16:39:00


In [35]:
centrality_df.head(1)

Unnamed: 0,node_id,closeness_centrality,betweenness_centrality,degree_centrality,timestamp
0,2357,0.267057,274.0,1.0,2024-01-31 16:39:00


## <span style="color:#ff5f27;"> 🪄 Feature Groups Creation</span>

A `Feature Groups` is a logical grouping of features, and experience has shown, that this grouping generally originates from the features being derived from the same data source. The `Feature Group` lets you save metadata along features, which defines how the Feature Store interprets them, combines them and reproduces training datasets created from them.

Generally, the features in a feature group are engineered together in an ingestion job. However, it is possible to have additional jobs to append features to an existing feature group. Furthermore, `feature groups` provide a way of defining a namespace for features, such that you can define features with the same name multiple times, but uniquely identified by the group they are contained in.

> It is important to note that `feature groups` are not groupings of features for immediate training of Machine Learning models. Instead, to ensure reusability of features, it is possible to combine features from any number of groups into training datasets.

In [42]:
'''
# Connect to Hopsworks
project = hopsworks.login()

# Retrieve Feature Store
fs = project.get_feature_store()
'''

'\n# Connect to Hopsworks\nproject = hopsworks.login()\n\n# Retrieve Feature Store\nfs = project.get_feature_store()\n'

In [36]:
import hsfs
conn = hsfs.connection(
    host='staging.cloud.hopsworks.ai',                 # DNS of your Feature Store instance
    port=443,                           # Port to reach your Hopsworks instance, defaults to 443
    project='neo4j_tutorial_new',               # Name of your Hopsworks Feature Store project
    api_key_value='OzvDK7qnG4VlhPxX.3wzZfJWWFmuYegomtWDRXAAvvGhlJEPkg4bn9kSbQaWPDgC3akbtERQOojLtEl5r',            # The API key to authenticate with the feature store
    hostname_verification=True          # Disable for self-signed certificates
)
fs = conn.get_feature_store()           # Get the project's default feature store

Connected. Call `.close()` to terminate connection gracefully.


In [71]:
# Create Feature Group
port_embeddings_fg = fs.get_or_create_feature_group(
    name="port_embeddings",
    description="Embedded Spatial Positions of Ports",
    version=1,
    online_enabled=True,
    primary_key=["source_node_id", "target_node_id"],
    statistics_config=False,
    event_time='timestamp'
)

In [72]:
# Insert data in Feature Group
port_embeddings_fg.insert(df_embeddings)

Feature Group created successfully, explore it at 
https://staging.cloud.hopsworks.ai:443/p/122/fs/70/fg/29


Uploading Dataframe: 0.00% |          | Rows 0/320 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: port_embeddings_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://staging.cloud.hopsworks.ai/p/122/jobs/named/port_embeddings_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x176c931c0>, None)

In [73]:
# Update feature descriptions
feature_descriptions = [
    {"name": "source_node_id", "description": "Origin Port ID"}, 
    {"name": "target_node_id", "description": "Destination Port ID"}, 
    {"name": "source_node_embedding", "description": "Origin Port Embedding"}, 
    {"name": "target_node_embedding", "description": "Destination Port Embedding"}, 
    {"name": "timestamp", "description": "Timestamp of Feature Group Injection"}
]

for desc in feature_descriptions: 
    port_embeddings_fg.update_feature_description(desc["name"], desc["description"])

#### Distances and Hops

In [37]:
# Create Feature Group
port_distance_fg = fs.get_or_create_feature_group(
    name="port_distances",
    description="Distance between Nodes of Trips",
    version=1,
    online_enabled=True,
    primary_key=["node_id", "target_node_id"],
    statistics_config=True,
    event_time='timestamp'
)

In [38]:
# Create new Great Expectation
expectation_suite = ge.core.ExpectationSuite(expectation_suite_name="expectation_suite")

# Define Great Expectation
expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(expectation_type="expect_column_min_to_be_between",
                                     kwargs={"column":"distance_km",
                                             "min_value":0.0 # Distance Cannot be Negative
                                            }
                                    )
)
expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(expectation_type="expect_column_min_to_be_between",
                                     kwargs={"column":"hops",
                                             "min_value":3 # At least one intermediate hops
                                            }
                                    )
)

# Add Great Expectation to Feature Group
port_distance_fg.save_expectation_suite(
    expectation_suite=expectation_suite,
    validation_ingestion_policy="STRICT" # do not ingest data if validation failed
)

In [40]:
# Insert data in Feature Group
port_distance_fg.insert(distances_df)

Feature Group created successfully, explore it at 
https://staging.cloud.hopsworks.ai:443/p/125/fs/73/fg/46
Validation succeeded.
Validation Report saved successfully, explore a summary at https://staging.cloud.hopsworks.ai:443/p/125/fs/73/fg/46


Uploading Dataframe: 0.00% |          | Rows 0/320 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: port_distances_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://staging.cloud.hopsworks.ai/p/125/jobs/named/port_distances_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x2c4da4220>,
 {
   "results": [
     {
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-01-31T03:40:41.000964Z"
       },
       "result": {
         "observed_value": 3042.4064814550816,
         "element_count": 320,
         "missing_count": null,
         "missing_percent": null
       },
       "success": true,
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "meta": {
           "expectationId": 19
         },
         "kwargs": {
           "column": "distance_km",
           "min_value": 0.0
         }
       }
     },
     {
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-01-31T03:40:41.000964Z"
       },
       "result": {
         "observed_value": 3,
         "element_cou

In [41]:
# Update feature descriptions
feature_descriptions = [
    {"name": "node_id", "description": "Origin Port ID"}, 
    {"name": "target_node_id", "description": "Destination Port ID"},
    {"name": "distance_km", "description": "Shortest Distance [km] between the two Ports (Dijkstra algorithm)"},
    {"name": "hop_node_ids", "description": "Nodes to Cross, from Origin to Destination Included"},
    {"name": "costs", "description": "Costs of All Nodes to Cross, from Origin to Destination Included"},
    {"name": "hops", "description": "Number of Nodes to Cross, from Origin to Destination Included"},
    {"name": "timestamp", "description": "Timestamp of Feature Group Injection"}
]

for desc in feature_descriptions: 
    port_distance_fg.update_feature_description(desc["name"], desc["description"])

#### Centralities

In [42]:
# Create Feature Group
centrality_fg = fs.get_or_create_feature_group(
    name="port_centrality",
    description="Node Centralities of Ports",
    version=1,
    online_enabled=True,
    primary_key=["node_id"],
    statistics_config=True,
    event_time='timestamp'
)

In [43]:
# Create new Great Expectations
expectation_suite = ge.core.ExpectationSuite(expectation_suite_name="expectation_suite")

# Define Great Expectation
expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(expectation_type="expect_column_min_to_be_between",
                                     kwargs={"column":"closeness_centrality",
                                             "min_value":0.0 # Closeness Centrality Cannot be Negative
                                            }
                                    )
)
expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(expectation_type="expect_column_min_to_be_between",
                                     kwargs={"column":"degree_centrality",
                                             "min_value":0.0 # Degree Centrality Cannot be Negative
                                            }
                                    )
)

# Add Great Expectation to Feature Group
centrality_fg.save_expectation_suite(
    expectation_suite=expectation_suite,
    validation_ingestion_policy="STRICT" # do not ingest data if validation failed
)

In [46]:
# Insert data in Feature Group
centrality_fg.insert(centrality_df)

Feature Group created successfully, explore it at 
https://staging.cloud.hopsworks.ai:443/p/125/fs/73/fg/47
Validation succeeded.
Validation Report saved successfully, explore a summary at https://staging.cloud.hopsworks.ai:443/p/125/fs/73/fg/47


Uploading Dataframe: 0.00% |          | Rows 0/729 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: port_centrality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://staging.cloud.hopsworks.ai/p/125/jobs/named/port_centrality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x2c4b88850>,
 {
   "results": [
     {
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-01-31T03:41:41.000869Z"
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 729,
         "missing_count": null,
         "missing_percent": null
       },
       "success": true,
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "meta": {
           "expectationId": 22
         },
         "kwargs": {
           "column": "closeness_centrality",
           "min_value": 0.0
         }
       }
     },
     {
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-01-31T03:41:41.000869Z"
       },
       "result": {
         "observed_value": 0.0,
         "element_count":

In [47]:
# Update feature descriptions
feature_descriptions = [
    {"name": "node_id", "description": "Origin Port ID"}, 
    {"name": "closeness_centrality", "description": "Closeness Centrality"},
    {"name": "betweenness_centrality", "description": "Betweenness Centrality"},
    {"name": "degree_centrality", "description": "Degree Centrality"},
    {"name": "timestamp", "description": "Timestamp of Feature Group Injection"}
]

for desc in feature_descriptions: 
    centrality_fg.update_feature_description(desc["name"], desc["description"])

## <span style="color:#ff5f27;">⏭️ Next: Training Pipeline</span>
In the [following notebook](2_training_pipeline.ipynb) you will use your feature groups to create a train dataset, train a model and add a trained model to model registry.