# Fashion Retail Recommendations Using Neo4j Graph Data Science and Apache Arrow

## Download the dataset

In [None]:
!kaggle competitions download -c h-and-m-personalized-fashion-recommendations -f articles.csv -p data
!kaggle competitions download -c h-and-m-personalized-fashion-recommendations -f customers.csv -p data 
!kaggle competitions download -c h-and-m-personalized-fashion-recommendations -f transactions_train.csv -p data
!cd data && unzip -n '*.zip'

## Import the dataset

In [1]:
import pyarrow as pa
import pyarrow.flight as flight
import base64
from pyarrow import compute as pacompute
from pyarrow import csv as pacsv
from config import NEO4J_URI,NEO4J_USER,NEO4J_PASSWORD

In [2]:
token = base64.b64encode(f"""{NEO4J_USER}:{NEO4J_PASSWORD}""".encode('utf8'))
options = flight.FlightCallOptions(headers=[
            (b'authorization', b'Basic ' + token)
        ])
client = pa.flight.connect("grpc+tcp://localhost:8491")

In [3]:
print(client.list_actions(options=options))

[ActionType(type='CREATE_GRAPH', description=''), ActionType(type='CREATE_DATABASE', description=''), ActionType(type='NODE_LOAD_DONE', description=''), ActionType(type='RELATIONSHIP_LOAD_DONE', description=''), ActionType(type='ABORT', description='')]


### Initialize the process

In [4]:
graph_name_import = "HEM.NODES2022.NEW"

In [5]:
# # CREATE A IN-MEMORY GRAPH
# action = flight.Action("CREATE_GRAPH", """{
#     "name": "HEMNODES2022",
#     "database_name": "hem",
#     "concurrency": 4
# }""".encode('utf-8'))

# CREATE A DATABASE
action = flight.Action("CREATE_DATABASE", f"""{{
    "name": "{graph_name_import}",
    "concurrency": 4
}}""".encode('utf-8'))

In [6]:
res = client.do_action(action, options = options)
for r in res:
    print(r.body.to_pybytes().decode('utf-8'))

{"name":"HEM.NODES2022.NEW"}


### Write nodes

#### Load the csv

In [7]:
articles = pacsv.read_csv('data/articles.csv')
customers = pacsv.read_csv('data/customers.csv')

#### write articles

In [8]:
articles

pyarrow.Table
article_id: int64
product_code: int64
prod_name: string
product_type_no: int64
product_type_name: string
product_group_name: string
graphical_appearance_no: int64
graphical_appearance_name: string
colour_group_code: int64
colour_group_name: string
perceived_colour_value_id: int64
perceived_colour_value_name: string
perceived_colour_master_id: int64
perceived_colour_master_name: string
department_no: int64
department_name: string
index_code: string
index_name: string
index_group_no: int64
index_group_name: string
section_no: int64
section_name: string
garment_group_no: int64
garment_group_name: string
detail_desc: string
----
article_id: [[108775015,108775044,108775051,110065001,110065002,...,453272036,453272037,453272038,453272042,453272045],[453272047,453272048,453272049,453272051,453272053,...,506495004,506495006,506495009,506495012,506503002],...,[896477004,896506001,896506002,896509001,896522001,...,920012003,920016001,920016002,920084003,920084004],[920158001,9201590

In [9]:
articles_columns = articles.column_names

In [10]:
articles_columns = [name if name != 'article_id' else 'nodeId' for name in articles_columns]

In [11]:
articles = articles.rename_columns(articles_columns)

In [12]:
articles = articles.add_column(1, 'labels', pa.array(['Article'] * len(articles), pa.string()))

In [13]:
articles = articles.select(['nodeId', 'labels', 'product_code', 'product_type_no', 'graphical_appearance_no', 'colour_group_code', 'perceived_colour_value_id', 'perceived_colour_master_id', 'department_no', 'index_group_no', 'section_no', 'garment_group_no'])

In [14]:
articles.schema

nodeId: int64
labels: string
product_code: int64
product_type_no: int64
graphical_appearance_no: int64
colour_group_code: int64
perceived_colour_value_id: int64
perceived_colour_master_id: int64
department_no: int64
index_group_no: int64
section_no: int64
garment_group_no: int64

In [15]:
node_descriptor = pa.flight.FlightDescriptor.for_command(f"""{{
    "name": "{graph_name_import}",
    "entity_type": "node"
}}
""")

writer, _ = client.do_put(node_descriptor, articles.schema, options = options)

with writer:
        writer.write(articles)
#0.2s


In [16]:
article_max = pacompute.max(articles['nodeId']).as_py()

customer_columns = customers.column_names
customer_columns = [name if name != 'customer_id' else 'nodeId' for name in customer_columns]
customers = customers.add_column(1, 'labels', pa.array(['Customer'] * len(customers), pa.string()))
customers = customers.add_column(1, 'nodeId', pa.array(range(article_max, article_max + len(customers)), pa.int64()))
customers_import = customers.select(['nodeId', 'labels'])

writer, _ = client.do_put(node_descriptor, customers_import.schema, options = options)

with writer:
        writer.write(customers_import)
#0.3s

In [17]:
action = flight.Action("NODE_LOAD_DONE", f"""{{
    "name": "{graph_name_import}"
}}""".encode('utf-8'))
res = client.do_action(action, options = options)
for r in res:
    print(r.body.to_pybytes().decode('utf-8'))
#0.2s

{"name":"HEM.NODES2022.NEW","node_count":1477522}


### Load Relationships

In [18]:
transactions = pacsv.read_csv('data/transactions_train.csv')
#2.1s

In [19]:
transactions = transactions.join(customers, 'customer_id')
#3.2s

In [20]:
transactions_columns = [name if name != 'nodeId' else 'sourceNodeId' for name in transactions.column_names]
transactions_columns = [name if name != 'article_id' else 'targetNodeId' for name in transactions_columns]
transactions = transactions.rename_columns(transactions_columns)
transactions = transactions.append_column('relationshipType', pa.array(['BUY'] * len(transactions), pa.string()))
transactions = transactions.select(['sourceNodeId', 'targetNodeId', 'price', 'relationshipType'])

In [21]:
transactions

pyarrow.Table
sourceNodeId: int64
targetNodeId: int64
price: double
relationshipType: string
----
sourceNodeId: [[959994174,959994244,959994244,959994244,959994244,...,960458168,960458168,960458168,960458168,960458168],[960099850,960099850,960099850,960099850,960099850,...,960631979,960631979,960631979,960631985,960631985],...,[960736847,960736847,960736852,960736852,960736852,...,959469265,959469265,959469280,959469280,959469280],[960153231,960153272,960153272,960153459,960153459,...,960199274,960199596,960199596,960200309,960200309]]
targetNodeId: [[648173005,671513003,561445005,666080002,661166005,...,587782003,642105002,631770002,548926001,638443002],[623434013,569929001,569929001,569929001,661162003,...,399087019,673396002,673396002,624062003,626445017],...,[903647001,903647001,866714016,783346030,887761002,...,845241002,754267054,855706009,719957017,739363001],[915529005,821395005,915526002,783707116,783707112,...,568601044,900279001,785034009,827957003,827957003]]
price: [[0.010

In [22]:
rel_descriptor = pa.flight.FlightDescriptor.for_command(f"""{{
    "name": "{graph_name_import}",
    "entity_type": "relationship"
}}
""")

writer, _ = client.do_put(rel_descriptor, transactions.schema, options = options)

with writer:
        writer.write(transactions)
#11.5s

In [23]:
action = flight.Action("RELATIONSHIP_LOAD_DONE", f"""{{
    "name": "{graph_name_import}"
}}""".encode('utf-8'))
res = client.do_action(action, options = options)
for r in res:
    print(r.body.to_pybytes().decode('utf-8'))
#18.7s

{"name":"HEM.NODES2022.NEW","relationship_count":31788324}


## Create recommendation pipeline

In [24]:
from graphdatascience import GraphDataScience


In [25]:
# Use Neo4j URI and credentials according to your setup
# NEO4J_URI could look similar to "bolt://my-server.neo4j.io:7687"
gds = GraphDataScience(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

# Check the installed GDS version on the server
print(gds.version())


2.2.2


In [26]:
gds.set_database(graph_name_import)


In [27]:
G, _ = gds.graph.project(
    "hem",
    ["Article", "Customer"],
    {
        "BUY": {'orientation': 'UNDIRECTED'}
    },
    readConcurrency=4
)
# 4.3s

Loading:   0%|          | 0/100 [00:00<?, ?%/s]

In [36]:
pipe, res = gds.beta.pipeline.linkPrediction.create("recommendation")

In [37]:
pipe.addNodeProperty("fastRP", embeddingDimension=128, mutateProperty="embedding", randomSeed=42)

name                                                    recommendation
nodePropertySteps    [{'name': 'gds.fastRP.mutate', 'config': {'ran...
featureSteps                                                        []
splitConfig          {'negativeSamplingRatio': 1.0, 'testFraction':...
autoTuningConfig                                     {'maxTrials': 10}
parameterSpace       {'MultilayerPerceptron': [], 'RandomForest': [...
Name: 0, dtype: object

In [38]:
pipe.addFeature("hadamard", nodeProperties=["embedding"])

name                                                    recommendation
nodePropertySteps    [{'name': 'gds.fastRP.mutate', 'config': {'ran...
featureSteps         [{'name': 'HADAMARD', 'config': {'nodeProperti...
splitConfig          {'negativeSamplingRatio': 1.0, 'testFraction':...
autoTuningConfig                                     {'maxTrials': 10}
parameterSpace       {'MultilayerPerceptron': [], 'RandomForest': [...
Name: 0, dtype: object

In [39]:
pipe.feature_steps()


Unnamed: 0,name,config
0,HADAMARD,{'nodeProperties': ['embedding']}


In [40]:
pipe.configureSplit(trainFraction=0.6, testFraction=0.4, validationFolds=5)

name                                                    recommendation
nodePropertySteps    [{'name': 'gds.fastRP.mutate', 'config': {'ran...
featureSteps         [{'name': 'HADAMARD', 'config': {'nodeProperti...
splitConfig          {'negativeSamplingRatio': 1.0, 'testFraction':...
autoTuningConfig                                     {'maxTrials': 10}
parameterSpace       {'MultilayerPerceptron': [], 'RandomForest': [...
Name: 0, dtype: object

In [41]:
pipe = gds.pipeline.get("recommendation")
#pipe.addMLP(hiddenLayerSizes=[128, 64, 2])#,maxEpochs=100, patience=10)
pipe.addLogisticRegression()
#pipe.addRandomForest(numberOfDecisionTrees=30)

name                                                    recommendation
nodePropertySteps    [{'name': 'gds.fastRP.mutate', 'config': {'ran...
featureSteps         [{'name': 'HADAMARD', 'config': {'nodeProperti...
splitConfig          {'negativeSamplingRatio': 1.0, 'testFraction':...
autoTuningConfig                                     {'maxTrials': 10}
parameterSpace       {'MultilayerPerceptron': [], 'RandomForest': [...
Name: 0, dtype: object

In [42]:
G = gds.graph.get('hem')

In [43]:
buy_recommender, train_result = pipe.train(
    G,
    sourceNodeLabel='Customer',
    targetNodeLabel='Article',
    targetRelationshipType="BUY",
    modelName="buy-recommender",
    randomSeed=42
)
#4m 15.2s

Link Prediction Train Pipeline:   0%|          | 0/100 [00:00<?, ?%/s]

In [44]:
print(train_result)

modelSelectionStats    {'modelCandidates': [{'metrics': {'AUCPR': {'v...
trainMillis                                                       255080
modelInfo              {'pipeline': {'nodePropertySteps': [{'name': '...
configuration          {'pipeline': 'recommendation', 'randomSeed': 4...
Name: 0, dtype: object


In [45]:
train_result.modelSelectionStats

{'modelCandidates': [{'metrics': {'AUCPR': {'validation': {'avg': 0.6842737529178428,
      'min': 0.6839116918345676,
      'max': 0.6847192258178231},
     'train': {'avg': 0.6842619190715364,
      'min': 0.6841477530776381,
      'max': 0.6843503895143053}}},
   'parameters': {'maxEpochs': 100,
    'minEpochs': 1,
    'penalty': 0.0,
    'patience': 1,
    'methodName': 'LogisticRegression',
    'batchSize': 100,
    'tolerance': 0.001,
    'learningRate': 0.001}}],
 'bestParameters': {'maxEpochs': 100,
  'minEpochs': 1,
  'penalty': 0.0,
  'patience': 1,
  'methodName': 'LogisticRegression',
  'batchSize': 100,
  'tolerance': 0.001,
  'learningRate': 0.001},
 'bestTrial': 1}

In [46]:
model = gds.model.get('buy-recommender')

### put the prediction on the projected graph

In [49]:
mutate_results = model.predict_mutate(G, 
  mutateRelationshipType = 'BUY_APPROX_PREDICTED',
  mutateProperty = 'probability',
  topK = 1,
  sampleRate=0.5,
  randomJoins=2,
  maxIterations=3,
  #necessary for deterministic results
  concurrency = 1,
  randomSeed =  42
)
# 36.3s

Link Prediction Predict Pipeline:   0%|          | 0/100 [00:00<?, ?%/s]

### retrieve the results from the projection

In [50]:
reader = client.do_get(flight.Ticket("""{
    "graph_name": "hem",
    "database_name": "HEM.NODES2022.NEW", 
    "procedure_name": "gds.graph.relationshipProperty.stream",
    "configuration": {
        "relationship_types": "BUY_APPROX_PREDICTED",
        "relationship_property": "probability"
    }
    }""".encode('utf-8')), options = options)
table = reader.read_all()
# 0.6s

In [37]:
print(table)

pyarrow.Table
sourceNodeId: int64 not null
targetNodeId: int64 not null
relationshipType: dictionary<values=string, indices=int32, ordered=0> not null
propertyValue: double not null
----
sourceNodeId: [[0,0,0,0,0,...,717,717,717,717,717],[741013,741014,741015,741016,741017,...,750242,750243,750244,750245,750246],...,[359523,359524,359525,359526,359527,...,368793,368794,368795,368796,368797],[368798,368799,368800,368801,368802,...,371008,371009,371010,371011,371012]]
targetNodeId: [[313560,655679,705938,776295,886269,...,959255,971935,1184278,1295123,1296952],[90074,96329,4052,78809,54113,...,45878,18289,32755,48133,1202],...,[48792,93768,2963,48771,50271,...,100831,35047,55334,43129,48070],[59952,23225,65121,87769,37253,...,74524,86830,57917,66568,92610]]
relationshipType: [  -- dictionary:
["BUY_APPROX_PREDICTED"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["BUY_APPROX_PREDICTED"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],...,  -- dictionary:
["BUY_APPROX_PREDICTED"]  -- ind

## DGL LINK PREDICTION

In [7]:
G, _ = gds.graph.project(
    "hem_dgl",
    ["Article", "Customer"],
    {
        "BUY": {
        'orientation': 'UNDIRECTED',
        "properties": ["price"]
        }
    },
    readConcurrency=4
)
#9.5s

Loading:   0%|          | 0/100 [00:00<?, ?%/s]

In [8]:
result = gds.fastRP.mutate(
    G,
    mutateProperty="embedding",
    randomSeed=42,
    embeddingDimension=128,
    iterationWeights=[0.8, 1, 1, 1],
)
#15.2s

FastRP:   0%|          | 0/100 [00:00<?, ?%/s]

In [9]:
reader = client.do_get(flight.Ticket("""{
    "graph_name": "hem_dgl",
    "database_name": "HEM.NODES2022.NEW", 
    "procedure_name": "gds.graph.nodeProperty.stream",
    "configuration": {
        "node_labels":  ["Article", "Customer"],
        "node_property": "embedding"
    }
    }""".encode('utf-8')), options = options)
table_nodes = reader.read_all()
#1.6s

In [22]:
sorted_table_nodes = table_nodes.append_column('dglID', pa.array(range(0,len(table_nodes)), pa.int64()))

In [41]:
sorted_table_nodes = sorted_table_nodes.sort_by([("dglID", "ascending")])

In [23]:
print(sorted_table_nodes)

pyarrow.Table
nodeId: int64 not null
propertyValue: list<propertyValue.inner: float not null> not null
  child 0, propertyValue.inner: float not null
dglID: int64
----
nodeId: [[0,1,2,3,4,...,1478530,1478531,1478532,1478533,1478534]]
propertyValue: [[[-0.100073874,-0.26626396,-0.012616102,-0.23033217,0.35614634,...,-0.40032893,0.21652773,-0.028770644,-0.121459305,0.32951498],[-0.41985935,-0.023758918,-0.013264384,-0.2588179,0.11248251,...,-0.1512475,0.17956246,-0.017649047,-0.10805801,0.25117278],...,[-0.07483108,-0.07485901,0.15815175,-0.0558758,-0.12628093,...,-0.13341352,0.09667079,-0.15405637,0.046845216,0.3162402],[0.028021835,0.20552,-0.16797295,0.09843139,0.2908689,...,-0.15081756,0.13862059,-0.39557898,0.035192423,0.3521633]]]
dglID: [[0,1,2,3,4,...,1477517,1477518,1477519,1477520,1477521]]


In [12]:
reader = client.do_get(flight.Ticket("""{
    "graph_name": "hem_dgl",
    "database_name": "HEM.NODES2022.NEW", 
    "procedure_name": "gds.graph.relationshipProperty.stream",
    "configuration": {
        "relationship_types": "BUY",
        "relationship_property": "price"
    }
    }""".encode('utf-8')), options = options)
table_rels = reader.read_all()
#6.9s

In [13]:
import dgl
import torch

In [25]:
print(table_rels)

pyarrow.Table
sourceNodeId: int64 not null
targetNodeId: int64 not null
relationshipType: dictionary<values=string, indices=int32, ordered=0> not null
propertyValue: double not null
----
sourceNodeId: [[0,0,0,0,0,...,0,0,0,0,0],[741013,741013,741013,741013,741013,...,741459,741459,741459,741459,741459],...,[370544,370545,370545,370545,370545,...,370933,370933,370933,370933,370933],[370933,370933,370933,370933,370933,...,371011,371012,371012,371012,371012]]
targetNodeId: [[103055,103055,103055,107138,107138,...,1362453,1362667,1362731,1362731,1363169],[74,117,132,201,285,...,5821,7527,10359,14157,14157],...,[91495,2677,7548,30910,40190,...,71034,71034,73338,73679,75735],[75735,75735,75735,75891,76179,...,94579,32688,34554,41843,44065]]
relationshipType: [  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],...,  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["BUY"]  -- indices:
[0

In [35]:
tables_id = sorted_table_nodes.select(['nodeId', 'dglID'])
new_rels = table_rels.join(tables_id, keys='sourceNodeId', right_keys='nodeId')
new_rels = new_rels.join(tables_id, keys='targetNodeId', right_keys='nodeId', right_suffix="_target")

#7.1s

In [36]:
print(new_rels)

pyarrow.Table
sourceNodeId: int64
targetNodeId: int64
relationshipType: dictionary<values=string, indices=int32, ordered=0>
propertyValue: double
dglID: int64
dglID_target: int64
----
sourceNodeId: [[67,67,67,67,67,...,73,73,73,73,73],[374091,374091,374091,374091,374091,...,374490,374490,374490,374490,374490],...,[370106,370106,370106,370106,370106,...,370544,370544,370544,370544,370544],[362448,362448,362448,362448,362448,...,362931,362931,362931,362931,362931]]
targetNodeId: [[985765,985765,985765,986246,986284,...,160397,160580,160580,160580,160613],[46678,61589,65571,68990,72052,...,89485,89489,90149,90149,90912],...,[40238,40479,41107,41107,41107,...,53286,56440,56440,57073,91481],[71919,72381,72382,72383,72385,...,505,505,506,506,506]]
relationshipType: [  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],...,  -- dictionary:
["BUY"]  -- indices:
[0,0,0,0,0,...,0,0,0,0,0],  -- dictionary:
["BUY"]  -- indi

In [37]:
dizionario = new_rels.to_pydict()
sources = torch.IntTensor(dizionario["dglID"])
targets = torch.IntTensor(dizionario["dglID_target"])

g = dgl.graph((sources, targets))
#3m 21.1s

In [39]:
print(g)

Graph(num_nodes=1477522, num_edges=63576648,
      ndata_schemes={}
      edata_schemes={})


In [42]:
dizionario = sorted_table_nodes.to_pydict()
g.ndata['embedding'] = torch.FloatTensor(dizionario['propertyValue'])
#1m 30.3s

In [48]:
import itertools
import numpy as np
import scipy.sparse as sp
import torch.nn as nn
import torch.nn.functional as F
import random

In [87]:
sampler = dgl.dataloading.MultiLayerFullNeighborSampler(2)
sampler = dgl.dataloading.as_edge_prediction_sampler(
    sampler, negative_sampler=dgl.dataloading.negative_sampler.Uniform(5))
indice = random.sample(range(len(g.ndata['embedding'])), 1000)
indice = torch.IntTensor(indice)
dataloader = dgl.dataloading.DataLoader(
    g, indice, sampler,
    batch_size=64,
    shuffle=True,
    drop_last=False,
    num_workers=1)

In [88]:
class StochasticTwoLayerGCN(nn.Module):
    def __init__(self, in_features, hidden_features, out_features):
        super().__init__()
        self.conv1 = dgl.nn.GraphConv(in_features, hidden_features, allow_zero_in_degree=True)
        self.conv2 = dgl.nn.GraphConv(hidden_features, out_features, allow_zero_in_degree=True)

    def forward(self, blocks, x):
        x = F.relu(self.conv1(blocks[0], x))
        x = F.relu(self.conv2(blocks[1], x))
        return x

In [89]:
class ScorePredictor(nn.Module):
    def forward(self, edge_subgraph, x):
        with edge_subgraph.local_scope():
            edge_subgraph.ndata['x'] = x
            edge_subgraph.apply_edges(dgl.function.u_dot_v('x', 'x', 'score'))
            return edge_subgraph.edata['score']

In [90]:
class Model(nn.Module):
    def __init__(self, in_features, hidden_features, out_features):
        super().__init__()
        self.gcn = StochasticTwoLayerGCN(
            in_features, hidden_features, out_features)
        self.predictor = ScorePredictor()

    def forward(self, positive_graph, negative_graph, blocks, x):
        x = self.gcn(blocks, x)
        pos_score = self.predictor(positive_graph, x)
        neg_score = self.predictor(negative_graph, x)
        return pos_score, neg_score

In [91]:
def compute_loss(pos_score, neg_score):
    # an example hinge loss
    n = pos_score.shape[0]
    return (neg_score.view(n, -1) - pos_score.view(n, -1) + 1).clamp(min=0).mean()

model = Model(128, 128, 64)
model = model
opt = torch.optim.Adam(model.parameters())
#aggiungiamo self loop per non far rompere ogni cosa nell'universo
g = dgl.add_self_loop(g)
loss_n = 0
epochs = 5
with dataloader.enable_cpu_affinity():
    for epoch in range(epochs):
        for input_nodes, positive_graph, negative_graph, blocks in dataloader:
            positive_graph = positive_graph
            negative_graph = negative_graph
            input_features = blocks[0].srcdata['embedding']
            pos_score, neg_score = model(positive_graph, negative_graph, blocks, input_features)
            loss = compute_loss(pos_score, neg_score)
            loss_n += loss.mean().item()
            opt.zero_grad()
            loss.backward()
            opt.step()
        print(f"loss for epoch {epoch + 1} is: {loss_n}")
        loss_n = 0

1 DL workers are assigned to cpus [0], main process will use cpus [1, 2, 3, 4, 5]
loss for epoch 1 is: 280.06372582912445
loss for epoch 2 is: 7.661756634712219
loss for epoch 3 is: 6.506469696760178
loss for epoch 4 is: 5.929817616939545
loss for epoch 5 is: 5.21606719493866
