In [1]:
from py2neo import Graph
import pandas as pd
from numpy.random import randint
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import *
from pyspark.sql import functions as F
from sklearn.metrics import roc_curve, auc
from collections import Counter
from cycler import cycler
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

In [2]:
import findspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
findspark.init()


In [3]:
graph = Graph("bolt://localhost", auth=("neo4j", "123"))
def down_sample(df):
    copy = df.copy()
    zero = Counter(copy.label.values)[0]
    un = Counter(copy.label.values)[1]
    n = zero - un
    copy = copy.drop(copy[copy.label == 0].sample(n=n, random_state=1).index)
    return copy.sample(frac=1)


In [4]:
# Find positive examples
train_existing_links = graph.run("""
MATCH (a:Character)-[:Interacts_early]->(b:Character)
RETURN id(a) AS node1, id(b) AS node2, 1 AS label
""").to_data_frame()# Find negative examples
train_missing_links = graph.run("""
MATCH (a:Character)
WHERE (a)-[:Interacts_early]-()
MATCH (a)-[:Interacts_early*2..3]-(other)
WHERE not((a)-[:Interacts_early]-(other))
RETURN id(a) AS node1, id(other) AS node2, 0 AS label
""").to_data_frame()# Remove duplicates
train_missing_links = train_missing_links.drop_duplicates()
training_df = train_missing_links.append(train_existing_links, ignore_index=True)
training_df['label'] = training_df['label'].astype('category')
training_df = down_sample(training_df)
training_data = spark.createDataFrame(training_df)

test_existing_links = graph.run("""
MATCH (a:Character)-[:Interacts_late]->(b:Character)
RETURN id(a) AS node1, id(b) AS node2, 1 AS label
""").to_data_frame()# Find negative examples
test_missing_links = graph.run("""
MATCH (a:Character)
WHERE (a)-[:Interacts_late]-()
MATCH (a)-[:Interacts_late*2..3]-(other)
WHERE not((a)-[:Interacts_late]-(other))
RETURN id(a) AS node1, id(other) AS node2, 0 AS label
""").to_data_frame()# Remove duplicates
test_missing_links = test_missing_links.drop_duplicates()
test_df = test_missing_links.append(test_existing_links, ignore_index=True)
test_df['label'] = test_df['label'].astype('category')
test_df = down_sample(test_df)
test_data = spark.createDataFrame(test_df)

test_data.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0| 1329|
|    1| 1329|
+-----+-----+



In [5]:
def create_pipeline(fields):
    assembler = VectorAssembler(inputCols=fields, outputCol="features")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features",
                                numTrees=30, maxDepth=10)
    return Pipeline(stages=[assembler, rf])

In [6]:

def apply_graphy_training_features(data):
    query = """
    UNWIND $pairs AS pair
    MATCH (p1) WHERE id(p1) = pair.node1
    MATCH (p2) WHERE id(p2) = pair.node2
    RETURN pair.node1 AS node1,
    pair.node2 AS node2,
    size([(p1)-[:Interacts_early]-(a)-
    [:Interacts_early]-(p2) | a]) AS CommonInteractions,
    size((p1)-[:Interacts_early]-()) * size((p2)-
    [:Interacts_early]-()) AS prefAttachment,
    size(apoc.coll.toSet(
    [(p1)-[:Interacts_early]-(a) | id(a)] +
    [(p2)-[:Interacts_early]-(a) | id(a)]
    )) AS totalNeighbors
    """
    pairs = [{"node1": row["node1"], "node2": row["node2"]}
    for row in data.collect()]
    features = spark.createDataFrame(graph.run(query,
                                               {"pairs": pairs}).to_data_frame())
    return data.join(features, ["node1", "node2"])

def apply_graphy_test_features(data):
    query = """
    UNWIND $pairs AS pair
    MATCH (p1) WHERE id(p1) = pair.node1
    MATCH (p2) WHERE id(p2) = pair.node2
    RETURN pair.node1 AS node1,
    pair.node2 AS node2,
    size([(p1)-[:Interacts]-(a)-[:Interacts]-(p2) | a]) AS CommonInteractions,
    size((p1)-[:Interacts]-()) * size((p2)-[:Interacts]-())
    AS prefAttachment,
    size(apoc.coll.toSet(
    [(p1)-[:Interacts]-(a) | id(a)] + [(p2)-[:Interacts]-(a) | id(a)]
    )) AS totalNeighbors
    """
    pairs = [{"node1": row["node1"], "node2": row["node2"]}
    for row in data.collect()]
    features = spark.createDataFrame(graph.run(query,
                                               {"pairs": pairs}).to_data_frame())
    return data.join(features, ["node1", "node2"])


training_data = apply_graphy_training_features(training_data)
test_data = apply_graphy_test_features(test_data)

In [7]:
plt.style.use('fivethirtyeight')
fig, axs = plt.subplots(1, 2, figsize=(18, 7), sharey=True)
charts = [(1, "have interacted"), (0, "haven't interacted")]
for index, chart in enumerate(charts):
    label, title = chart
    filtered = training_data.filter(training_data["label"] == label)
    interactions = filtered.toPandas()["CommonInteractions"]
    histogram =interactions.value_counts().sort_index()
    histogram /= float(histogram.sum())
    histogram.plot(kind="bar", x='Common Interactions', color="darkblue",
    ax=axs[index], title=f"Character who {title} (label={label})")
    axs[index].xaxis.set_label_text("Common Interactions")
    
plt.tight_layout()
plt.show()

## Training of model

Here is a basic model based on common interactions in order to predict the future interactions.

In [8]:
def train_model(fields, training_data):
    pipeline = create_pipeline(fields)
    model = pipeline.fit(training_data)
    return model

basic_model = train_model(["CommonInteractions"], training_data)

eval_df = spark.createDataFrame(
        [(0,), (1,), (2,), (10,), (100,)],
        ['CommonInteractions'])


(basic_model.transform(eval_df)
    .select("CommonInteractions", "probability", "prediction")
    .show(truncate=False))

+------------------+----------------------------------------+----------+
|CommonInteractions|probability                             |prediction|
+------------------+----------------------------------------+----------+
|0                 |[0.8213024551457555,0.17869754485424455]|0.0       |
|1                 |[0.8213024551457555,0.17869754485424455]|0.0       |
|2                 |[0.09791873796806792,0.9020812620319322]|1.0       |
|10                |[0.09791873796806792,0.9020812620319322]|1.0       |
|100               |[0.09791873796806792,0.9020812620319322]|1.0       |
+------------------+----------------------------------------+----------+



In [9]:
def evaluate_model(model, test_data):
    # Execute the model against the test set
    predictions = model.transform(test_data)
    # Compute true positive, false positive, false negative counts
    tp = predictions[(predictions.label == 1) &
        (predictions.prediction == 1)].count()
    fp = predictions[(predictions.label == 0) &
                     (predictions.prediction == 1)].count()
    fn = predictions[(predictions.label == 1) &
                     (predictions.prediction == 0)].count()
    # Compute recall and precision manually
    recall = float(tp) / (tp + fn)
    precision = float(tp) / (tp + fp)
    # Compute accuracy using Spark MLLib's binary classification evaluator
    accuracy = BinaryClassificationEvaluator().evaluate(predictions)
    # Compute false positive rate and true positive rate using sklearn functions
    labels = [row["label"] for row in predictions.select("label").collect()]
    preds = [row["probability"][1] for row in predictions.select
             ("probability").collect()]
    fpr, tpr, threshold = roc_curve(labels, preds)
    roc_auc = auc(fpr, tpr)
    return { "fpr": fpr, "tpr": tpr, "roc_auc": roc_auc, "accuracy": accuracy,
            "recall": recall, "precision": precision }


In [10]:
def display_results(results):
    results = {k: v for k, v in results.items() if k not in ["fpr", "tpr", "roc_auc"]}
    return pd.DataFrame({"Measure": list(results.keys()),
                         "Score": list(results.values())})

basic_results = evaluate_model(basic_model, test_data)
display_results(basic_results)

Unnamed: 0,Measure,Score
0,accuracy,0.806245
1,recall,0.765237
2,precision,0.833607


In [11]:
def create_roc_plot():
    plt.style.use('classic')
    fig = plt.figure(figsize=(13, 8))
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    plt.ylabel('True Positive Rate')
    plt.xlabel('False Positive Rate')
    plt.rc('axes', prop_cycle=(cycler('color',
                                      ['r', 'g', 'b', 'c', 'm', 'y', 'k'])))
    plt.plot([0, 1], [0, 1], linestyle='--', label='Random score (AUC = 0.50)')
    return plt, fig

def add_curve(plt, title, fpr, tpr, roc):
    plt.plot(fpr, tpr, label=f"{title} (AUC = {roc:0.2})")
    


In [12]:
plt, fig = create_roc_plot()
add_curve(plt, "Common Interactions",basic_results["fpr"], basic_results["tpr"], basic_results["roc_auc"])
plt.legend(loc='lower right')
plt.show()

In [13]:
(training_data.filter(training_data["label"]==1)
    .describe()
    .select("summary", "CommonInteractions", "prefAttachment", "totalNeighbors")
    .show())
(training_data.filter(training_data["label"]==0)
    .describe()
    .select("summary", "CommonInteractions", "prefAttachment", "totalNeighbors")
    .show())

+-------+------------------+------------------+------------------+
|summary|CommonInteractions|    prefAttachment|    totalNeighbors|
+-------+------------------+------------------+------------------+
|  count|              1849|              1849|              1849|
|   mean| 6.472147106544078| 649.0016224986479|  47.7658193618172|
| stddev| 6.590397693758528|1042.7011892287317|31.153957172436126|
|    min|                 0|                 2|                 3|
|    max|                49|              7968|               159|
+-------+------------------+------------------+------------------+

+-------+------------------+-----------------+------------------+
|summary|CommonInteractions|   prefAttachment|    totalNeighbors|
+-------+------------------+-----------------+------------------+
|  count|              1849|             1849|              1849|
|   mean|0.4402379664683613|61.00378583017847| 17.35803136830719|
| stddev|0.9955654637902858|127.8862940523298|17.254947581369585|


## A more complex model with 3 features

In [14]:
fields = ["CommonInteractions", "prefAttachment", "totalNeighbors"]
graphy_model = train_model(fields, training_data)

graphy_results = evaluate_model(graphy_model, test_data)
display_results(graphy_results)


Unnamed: 0,Measure,Score
0,accuracy,0.884213
1,recall,0.808126
2,precision,0.811178


In [15]:
plt, fig = create_roc_plot()
add_curve(plt, "Common Interactions",
          basic_results["fpr"], basic_results["tpr"],
                              basic_results["roc_auc"])
add_curve(plt, "Graphy",
          graphy_results["fpr"], graphy_results["tpr"],
          graphy_results["roc_auc"])
plt.legend(loc='lower right')
plt.show()

## Which features are important ?

In [16]:
def plot_feature_importance(fields, feature_importances):
    df = pd.DataFrame({"Feature": fields, "Importance": feature_importances})
    df = df.sort_values("Importance", ascending=False)
    ax = df.plot(kind='bar', x='Feature', y='Importance', legend=None)
    ax.xaxis.set_label_text("")
    plt.tight_layout()
    plt.show()

In [17]:
rf_model = graphy_model.stages[-1]
plot_feature_importance(fields, rf_model.featureImportances)

## Plotting a decision tree (WIP package on jupyter...)

In [19]:
'''from spark_tree_plotting import export_graphviz
dot_string = export_graphviz(rf_model.trees[0],
                             featureNames=fields, categoryNames=[], classNames=["True", "False"],
                             filled=True, roundedCorners=True, roundLeaves=True)
with open("/tmp/rf.dot", "w") as file:
    file.write(dot_string)'''

'from spark_tree_plotting import export_graphviz\ndot_string = export_graphviz(rf_model.trees[0],\n                             featureNames=fields, categoryNames=[], classNames=["True", "False"],\n                             filled=True, roundedCorners=True, roundLeaves=True)\nwith open("/tmp/rf.dot", "w") as file:\n    file.write(dot_string)'

## Adding new features: triangle and clusters

Need to call some functions in Neo4j.

```cypher
call gds.graph.create("G","Character",
{
	Interacts : {
    	type:"Interacts",
        properties: 'weight',
        orientation:'UNDIRECTED'
    }
    })
```
    
```cypher
CALL gds.triangleCount.write("G", {
writeProperty:'trianglesTest'})
```
```cypher
CALL gds.localClusteringCoefficient.write("G", {
writeProperty:'coefficientTest'})
```


```cypher
call gds.graph.create("G_ea","Character",
{
	Interacts_early : {
    	type:"Interacts_early",
        properties: 'weight',
        orientation:'UNDIRECTED'
    }
    })
```

```cypher
CALL gds.triangleCount.write("G_ea", {
writeProperty:'trianglesTrain'})
```

```cypher
CALL gds.localClusteringCoefficient.write("G", {
writeProperty:'coefficientTrain'})
```

In [21]:
def apply_triangles_features(data, triangles_prop, coefficient_prop):
    query = """
    UNWIND $pairs AS pair
    MATCH (p1) WHERE id(p1) = pair.node1
    MATCH (p2) WHERE id(p2) = pair.node2
    RETURN pair.node1 AS node1,
            pair.node2 AS node2,
            apoc.coll.min([p1[$trianglesProp], p2[$trianglesProp]])
            AS minTriangles,
            apoc.coll.max([p1[$trianglesProp], p2[$trianglesProp]])
            AS maxTriangles,
            apoc.coll.min([p1[$coefficientProp], p2[$coefficientProp]])
            AS minCoefficient,
            apoc.coll.max([p1[$coefficientProp], p2[$coefficientProp]])
            AS maxCoefficient
    """
    params = {
        "pairs": [{"node1": row["node1"], "node2": row["node2"]}
                            for row in data.collect()],
        "trianglesProp": triangles_prop,
        "coefficientProp": coefficient_prop
    }
    features = spark.createDataFrame(graph.run(query, params).to_data_frame())
    return data.join(features, ["node1", "node2"])

training_data = apply_triangles_features(training_data, "trianglesTrain", "coefficientTrain")
test_data = apply_triangles_features(test_data, "trianglesTest", "coefficientTest")