<a href="https://colab.research.google.com/github/neohack22/IASD/blob/graphs/graphs/22_04_IASD_Model_Feature_Engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

Install the necessary libraries in your Colab notebook environment and connect to your hosted Neo4J Sandbox.

In [None]:
!pip uninstall pyspark

Found existing installation: pyspark 3.3.0
Uninstalling pyspark-3.3.0:
  Would remove:
    /usr/local/bin/beeline
    /usr/local/bin/beeline.cmd
    /usr/local/bin/docker-image-tool.sh
    /usr/local/bin/find-spark-home
    /usr/local/bin/find-spark-home.cmd
    /usr/local/bin/find_spark_home.py
    /usr/local/bin/load-spark-env.cmd
    /usr/local/bin/load-spark-env.sh
    /usr/local/bin/pyspark
    /usr/local/bin/pyspark.cmd
    /usr/local/bin/pyspark2.cmd
    /usr/local/bin/run-example
    /usr/local/bin/run-example.cmd
    /usr/local/bin/spark-class
    /usr/local/bin/spark-class.cmd
    /usr/local/bin/spark-class2.cmd
    /usr/local/bin/spark-shell
    /usr/local/bin/spark-shell.cmd
    /usr/local/bin/spark-shell2.cmd
    /usr/local/bin/spark-sql
    /usr/local/bin/spark-sql.cmd
    /usr/local/bin/spark-sql2.cmd
    /usr/local/bin/spark-submit
    /usr/local/bin/spark-submit.cmd
    /usr/local/bin/spark-submit2.cmd
    /usr/local/bin/sparkR
    /usr/local/bin/sparkR.cmd
    /usr/lo

In [None]:
!pip install neo4j pyspark

In [None]:
ip = "54.174.38.179"
bolt_port = "7687"
username = "neo4j"
password = "spots-carrier-wires"

In [None]:
from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://" + ip + ":" + bolt_port, auth=(username, password))

print(driver.address) # your-sandbox-ip:your-sandbox-bolt-port




54.174.38.179:7687


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
save_folder = '/content/gdrive/My Drive/IASD04/IASD_link_prediction/link-prediction/notebooks/data/'

# Objective

We are going to train a binary classifier to predict wether a link should exist between two *Author* nodes.

Each pair of *Author*s will be described with a feature vector and labeled with either 1 (if these two authors have collaborated) or 0 (if they have not).

# Feature Engineering

Let's generate features for our link prediction classifier. These features will describe a pair of *Author*s by using:
- graph topology measures
- community detection measures


We will identify the nodes by their ID, compute graph measures for these nodes in Neo4J and return a DataFrame with these new features describing each pair of nodes in the train and test set.

Load the CSV files saved in the train/test notebook.

In [None]:
df_train_under = spark.read.csv(save_folder + 'df_train_under.csv/*.csv', header=True, inferSchema=True).cache()
df_test_under = spark.read.csv(save_folder + 'df_test_under.csv/*.csv', header=True, inferSchema=True).cache()

In [None]:
df_train_under.show(10)

+-----+-----+------+
|label|node1| node2|
+-----+-----+------+
|    1|11797|102784|
|    1|11847|122413|
|    1|13024| 13048|
|    1|13265| 13616|
|    1|13323|244987|
|    1|13340| 13343|
|    1|13356| 13357|
|    1|13403|173386|
|    1|13526| 13527|
|    1|13600|234808|
+-----+-----+------+
only showing top 10 rows



In [None]:
df_test_under.show(5)

+-----+-----+------+
|label|node1| node2|
+-----+-----+------+
|    0|11381|217768|
|    0|11550|341313|
|    0|11653|268422|
|    0|11653|327525|
|    0|11763|100317|
+-----+-----+------+
only showing top 5 rows



In [None]:
df_train_under.select("node1").union(df_train_under.select("node2")).distinct().count()

45018

In [None]:
df_test_under.select("node1").union(df_test_under.select("node2")).distinct().count()

19947

Firstly, for Neo4J to be able to manipulate our train and test pairs, we need to feed them as lists of dictionaries. This will enable us to consider each element in this list as a parameter for a Neo4J query. This element's attributes will be accessible to the query.


- Run the following cell to ransform each data frame into a list of dictionaries.

```
[
  {
  "node1": 15589,
  "node2": 2567,
  "label": 1
  } ,
  ... ,
  {
  "node1": 5466,
  "node2": 78122,
  "label": 0
  }
]

```

In [None]:
df_train_under_pairs = [{"node1": node1, "node2": node2, 'label':label}  for node1, node2, label in df_train_under.select("node1", "node2", "label").collect()]
df_test_under_pairs = [{"node1": node1, "node2": node2, 'label':label}  for node1, node2, label in df_test_under.select("node1", "node2", "label").collect()]

In [None]:
df_train_under.count()

162153

In [None]:
def queryex(q,d):
  with d.session() as session:
      result = session.run(q)
      for row in result:
        print(row)

In [None]:
#### test 


query ="""
match (l) return l.communityId limit 30;
"""

queryex(query,driver)



<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>
<Record l.communityId=None>


## Generating graphy features

We will start by creating 3 features extracted from the **graph topology** to describe each pair of nodes: 
- [common neighbors](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/common-neighbors/)
- [preferential attachment](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/preferential-attachment/)
- [total neighbors](https://neo4j.com/docs/graph-data-science/current/alpha-algorithms/total-neighbors/)

We want a final Data Frame with the following structure:

| **node1** | **node2** | **label** | **cn** | **pa** | **tn** |

By using the *UNWIND* clause, we can manipulate each element in a list as an individual row in Cypher.

For example:
```
# A list of students dictionaries
my_list = [{ "id": '0001', "age": 28}, {"id": 0002, "age": 35}]

# A parameterized query to retrieve each students' name from its attributes
query = 
""" 
  UNWIND $list_of_students as student // We use a dollar sign to denote variables
    MATCH (s:Student) 
    WHERE ID(s) = student.id AND s.age = student.age
    RETURN s.name
"""

with driver.session() as session:
    result = session.run(query=query, parameters={"list_of_students": my_list})
```

- Complete the following function to compute the 3 graph measures for pairs of nodes.





In [None]:
# Test the individual queries in your Neo4J Browser first.
# Then complete the cell to define this function for future execution in Colab.

def add_graphy_features(pairs, rel_type):
    # Common neighbors
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          pair.node1 AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.commonNeighbors(... , ... , {relationshipQuery: $relType}) AS cn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_cn = spark.createDataFrame([dict(record) for record in result]) 

    # Preferential attachment
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          pair.node1 AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.preferentialAttachment(... , ... , {relationshipQuery: $relType}) AS pa
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_pa = spark.createDataFrame([dict(record) for record in result])

    # Total neighbors
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = ...
        RETURN 
          pair.node1 AS node1,
          ... AS node2,
          ... AS label,
          gds.alpha.linkprediction.totalNeighbors(... , ... , {relationshipQuery: $relType}) AS tn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_tn = spark.createDataFrame([dict(record) for record in result])  

    # Join the three feature dfs
    final_df = ... .join(
        ... , on=["node1", "node2", "label"], how='inner'
        ).join(
        ... , on=["node1", "node2", "label"], how='inner'
        )

    return final_df

In [None]:
#title Solution

def add_graphy_features(pairs, rel_type):
    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.commonNeighbors(p1, p2, {relationshipQuery: $relType}) AS cn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_cn = spark.createDataFrame([dict(record) for record in result]) 

    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.preferentialAttachment(p1, p2, {relationshipQuery: $relType}) AS pa
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_pa = spark.createDataFrame([dict(record) for record in result])

    query = """
      UNWIND $pairs AS pair
        MATCH (p1) WHERE ID(p1) = pair.node1
        MATCH (p2) WHERE ID(p2) = pair.node2
        RETURN 
          pair.node1 AS node1,
          pair.node2 AS node2,
          pair.label AS label,
          gds.alpha.linkprediction.totalNeighbors(p1, p2, {relationshipQuery: $relType}) AS tn
    """
    params = {
        "pairs": pairs, 
        "relType": rel_type
        }
    with driver.session() as session:
        result = session.run(query, params)
        features_tn = spark.createDataFrame([dict(record) for record in result])  

    final_df = features_cn.join(
        features_pa, on=["node1", "node2", "label"], how='inner'
        ).join(
        features_tn, on=["node1", "node2", "label"], how='inner'
        )

    return final_df

Let's apply the function to our training DataFrame and do a quick sanity check of the number of resulting rows (that should be the same).



In [None]:
print('Counts before applying graph features engineering: ')
print(df_train_under.count())
print(df_test_under.count())

df_train_under_graph_f = add_graphy_features(df_train_under_pairs, "CO_AUTHOR_EARLY")
df_test_under_graph_f = add_graphy_features(df_test_under_pairs, "CO_AUTHOR_LATE")

print('Counts after applying graph features engineering: ')
print(df_train_under_graph_f.count())
print(df_test_under_graph_f.count())

Counts before applying graph features engineering: 
162153
129716
Counts after applying graph features engineering: 
162153
129716


Let's see how it looks:

In [None]:
df_train_under_graph_f.filter(F.col('label') == 0).show(5)
df_test_under_graph_f.filter(F.col('label') == 1).show(5)

+------+------+-----+---+----+----+
| node1| node2|label| cn|  pa|  tn|
+------+------+-----+---+----+----+
|102696|234576|    0|0.0|24.0|14.0|
| 13208|300830|    0|0.0|33.0|14.0|
|114271|202618|    0|2.0|98.0|19.0|
|101611|234390|    0|0.0| 6.0| 5.0|
| 13487|172807|    0|0.0|30.0|11.0|
+------+------+-----+---+----+----+
only showing top 5 rows

+------+------+-----+----+-----+----+
| node1| node2|label|  cn|   pa|  tn|
+------+------+-----+----+-----+----+
|123600|224242|    1| 1.0|  6.0| 4.0|
|113972|113973|    1| 2.0| 54.0|19.0|
| 90497| 90498|    1| 2.0|  9.0| 4.0|
|322963|343616|    1|21.0|484.0|23.0|
|213213|213214|    1| 5.0| 36.0| 7.0|
+------+------+-----+----+-----+----+
only showing top 5 rows



## Generating community features

Community detection algorithms evaluate how a group is clustered or partitioned. Nodes are considered more similar to nodes that fall in their community than to nodes in other communities.

We will extract one feature based on a community detection algorithm:

- [Louvain community](https://neo4j.com/docs/graph-data-science/current/algorithms/louvain/)

The Louvain algorithm returns intermediate communities found in the graph. We will add a property to each node containing the first community that the algorithm found for this specific node. Thus, it will constitue a categorical feature : the first community to which each node belongs. As we are considering pairs of nodes, we will derive a *'same_community_louvain'* binary feature (True or False) to further describe each pair of nodes. 

Note that we need to restrict the execution of the Louvain community detection algorithm to the train and test subgraphs separately.

- Set a property on each node in the *CO_AUTHOR_EARLY* subgraph, containing the first community ID that the Louvain algorithm found for this node.

In [None]:
query ="""CALL gds.graph.project.cypher(
  'early',
  'MATCH (n:Author)-[:CO_AUTHOR_EARLY]-() RETURN id(n) AS id,  labels(n) AS labels',
  'MATCH (n:Author)-[r:CO_AUTHOR_EARLY]->(m:Author) RETURN id(n) AS source, id(m) AS target, type(r) AS type')
YIELD
  graphName AS graph, nodeQuery, nodeCount AS nodes, relationshipQuery, relationshipCount AS rels
  """


queryex(query,driver)

In [None]:
query = """
CALL gds.louvain.stream('early')
YIELD nodeId, communityId, intermediateCommunityIds
RETURN  nodeId AS identity, communityId
"""


with driver.session() as session:
        result = session.run(query)
        features_lv_early = spark.createDataFrame([dict(record) for record in result])

In [None]:
features_lv_early.show(10)

+-----------+--------+
|communityId|identity|
+-----------+--------+
|      12258|   11419|
|      12258|   11420|
|      12258|   11421|
|      42092|   11424|
|          7|   11434|
|          7|   11435|
|          7|   11436|
|          7|   11437|
|          9|   11456|
|          9|   11457|
+-----------+--------+
only showing top 10 rows



In [None]:
features_lv_early.count()

45018

In [None]:
df_train_under_graph_f.show(10)

+------+------+-----+----+-----+----+
| node1| node2|label|  cn|   pa|  tn|
+------+------+-----+----+-----+----+
|123735|138301|    1| 3.0| 20.0| 6.0|
|101210|173721|    1| 3.0| 20.0| 6.0|
| 98840|172025|    1| 2.0| 18.0| 7.0|
|138574|305377|    1| 1.0| 20.0| 8.0|
| 14914| 14917|    1| 4.0| 35.0| 8.0|
|114221|115543|    1| 9.0|832.0|49.0|
|130378|130382|    1|13.0|196.0|15.0|
| 90258| 90259|    1| 1.0|  6.0| 4.0|
|122146|190243|    1| 3.0| 35.0| 9.0|
|101499|101507|    1| 6.0| 49.0| 8.0|
+------+------+-----+----+-----+----+
only showing top 10 rows



In [None]:
df_train_under_join1 = df_train_under_graph_f.join(features_lv_early,  df_train_under_graph_f.node1 == features_lv_early.identity).withColumnRenamed("communityId", "CI1")


In [None]:
df_train_under_join1.show(5)

+------+------+-----+---+----+---+-----+--------+
| node1| node2|label| cn|  pa| tn|  CI1|identity|
+------+------+-----+---+----+---+-----+--------+
|123735|138301|    1|3.0|20.0|6.0|10311|  123735|
|101210|173721|    1|3.0|20.0|6.0|37965|  101210|
| 98840|172025|    1|2.0|18.0|7.0|20977|   98840|
|138574|305377|    1|1.0|20.0|8.0|20899|  138574|
| 14914| 14917|    1|4.0|35.0|8.0|17522|   14914|
+------+------+-----+---+----+---+-----+--------+
only showing top 5 rows



In [None]:
df_train_under_join2 = df_train_under_graph_f.join(features_lv,  df_train_under_graph_f.node2 == features_lv.identity).select("node1", "node2", "communityId").withColumnRenamed("communityId", "CI2")


In [None]:
df_train_under_join2.show(5)

+------+------+-----+
| node1| node2|  CI2|
+------+------+-----+
| 98840|172025|12322|
|138574|305377|30184|
|114221|115543|27151|
|122146|190243|30947|
| 14806|101037|10480|
+------+------+-----+
only showing top 5 rows



In [None]:
total_early= df_train_under_join1.join(df_train_under_join2, on=["node1", "node2"], how='inner' )

In [None]:
total_early.where("CI1<CI2").show(5)

+------+------+-----+---+-----+----+-----+--------+-----+
| node1| node2|label| cn|   pa|  tn|  CI1|identity|  CI2|
+------+------+-----+---+-----+----+-----+--------+-----+
| 14042|264342|    1|1.0|  4.0| 3.0|34865|   14042|38869|
|224437|224438|    1|1.0|  4.0| 3.0|29674|  224437|37005|
|145480|153405|    1|2.0|112.0|20.0|12258|  145480|17923|
|101409|285445|    1|6.0|162.0|21.0|16121|  101409|28404|
|152877|224652|    1|1.0| 14.0| 8.0|20899|  152877|32745|
+------+------+-----+---+-----+----+-----+--------+-----+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import when

In [None]:
final_early = total_early.withColumn("sl", when(total_early.CI1==total_early.CI2, 1).otherwise(0)).drop("CI1", "CI2", "identity")

In [None]:
final_early.show(5)

+------+------+-----+---+----+----+---+
| node1| node2|label| cn|  pa|  tn| sl|
+------+------+-----+---+----+----+---+
|191715|191716|    1|2.0|12.0| 5.0|  0|
| 14042|264342|    1|1.0| 4.0| 3.0|  0|
|108213|300750|    1|1.0| 4.0| 3.0|  0|
|108385|179319|    1|6.0|70.0|11.0|  0|
|224437|224438|    1|1.0| 4.0| 3.0|  0|
+------+------+-----+---+----+----+---+
only showing top 5 rows



Let's now do the same for the test collection

In [None]:
query ="""CALL gds.graph.project.cypher(
  'late',
  'MATCH (n:Author)-[:CO_AUTHOR_LATE]-() RETURN id(n) AS id,  labels(n) AS labels',
  'MATCH (n:Author)-[r:CO_AUTHOR_LATE]->(m:Author) RETURN id(n) AS source, id(m) AS target, type(r) AS type')
YIELD
  graphName AS graph, nodeQuery, nodeCount AS nodes, relationshipQuery, relationshipCount AS rels
  """


queryex(query,driver)

In [None]:
query = """
CALL gds.louvain.stream('late')
YIELD nodeId, communityId, intermediateCommunityIds
RETURN  nodeId AS identity, communityId 
"""


with driver.session() as session:
        result = session.run(query)
        features_lv = spark.createDataFrame([dict(record) for record in result])

In [None]:
features_lv.count()

39238

In [None]:
features_lv.select("identity").distinct().count()

39238

In [None]:
features_lv.show(10)

+-----------+--------+
|communityId|identity|
+-----------+--------+
|      39237|    8627|
|          2|   11365|
|          2|   11366|
|          5|   11367|
|          5|   11368|
|          5|   11369|
|      39222|   11370|
|      38670|   11371|
|      25148|   11372|
|      19949|   11373|
+-----------+--------+
only showing top 10 rows



In [None]:
df_test_under_join1 = df_test_under_graph_f.join(features_lv,  df_test_under_graph_f.node1 == features_lv.identity).withColumnRenamed("communityId", "CI1")




In [None]:
df_test_under_join1.show(10)

+------+------+-----+---+-----+----+-----+--------+
| node1| node2|label| cn|   pa|  tn|  CI1|identity|
+------+------+-----+---+-----+----+-----+--------+
| 13372|288913|    0|0.0| 25.0|10.0|27865|   13372|
| 14491|207393|    0|1.0|828.0|80.0|30976|   14491|
|102566|139550|    0|1.0| 16.0| 9.0|32041|  102566|
| 13278|138252|    0|1.0| 24.0|10.0|28917|   13278|
| 13625|273490|    0|1.0|152.0|41.0|30976|   13625|
| 14599|293386|    0|0.0|156.0|25.0|13621|   14599|
|107579|289527|    0|0.0| 12.0| 7.0|28961|  107579|
| 13552|323940|    0|1.0|126.0|22.0|28917|   13552|
| 15179|276876|    0|1.0| 72.0|21.0|22183|   15179|
|115266|337955|    0|0.0| 27.0|12.0|35601|  115266|
+------+------+-----+---+-----+----+-----+--------+
only showing top 10 rows



In [None]:
df_test_under_join2 = df_test_under_graph_f.join(features_lv,  df_test_under_graph_f.node2 == features_lv.identity).select("node1", "node2", "communityId").withColumnRenamed("communityId", "CI2")
df_test_under_join2.show(10)

+------+------+-----+
| node1| node2|  CI2|
+------+------+-----+
| 13372|288913|30976|
| 14491|207393|28961|
|102566|139550|27631|
| 13278|138252|28917|
| 13625|273490|37440|
| 14599|293386|13621|
|107579|289527|28961|
| 13552|323940|28917|
| 15179|276876|27865|
|115266|337955|35601|
+------+------+-----+
only showing top 10 rows



In [None]:
total_late = df_test_under_join1.join(df_test_under_join2, on=["node1", "node2"], how='inner' )

In [None]:
total_late.show(5)

+------+------+-----+---+-----+----+-----+--------+-----+
| node1| node2|label| cn|   pa|  tn|  CI1|identity|  CI2|
+------+------+-----+---+-----+----+-----+--------+-----+
| 13625|273490|    0|1.0|152.0|41.0|30976|   13625|37440|
|114962|185863|    0|0.0| 12.0| 8.0|28917|  114962|28917|
| 14511|240201|    0|1.0| 42.0|12.0|27865|   14511|27865|
|113192|173584|    0|1.0| 45.0|13.0|36384|  113192|36384|
| 11461| 11544|    0|1.0|  3.0| 3.0|  171|   11461|  171|
+------+------+-----+---+-----+----+-----+--------+-----+
only showing top 5 rows



In [None]:
df_test_under_graph_f.count()

129716

In [None]:
total_late.count()

129716

In [None]:
total_late.where("CI1<CI2").show(5)

+-----+------+-----+---+-----+----+-----+--------+-----+---+-----+----+-----+--------+
|node1| node2|label| cn|   pa|  tn|  CI1|identity|label| cn|   pa|  tn|  CI2|identity|
+-----+------+-----+---+-----+----+-----+--------+-----+---+-----+----+-----+--------+
|13625|273490|    0|1.0|152.0|41.0|30976|   13625|    0|1.0|152.0|41.0|37440|  273490|
|13478|300759|    0|0.0| 36.0|15.0|27865|   13478|    0|0.0| 36.0|15.0|28545|  300759|
|15199|301055|    0|0.0|  9.0| 6.0|30976|   15199|    0|0.0|  9.0| 6.0|35601|  301055|
|13362|173239|    0|1.0|714.0|54.0|32041|   13362|    0|1.0|714.0|54.0|32878|  173239|
|13572|296083|    0|0.0| 27.0|12.0|28961|   13572|    0|0.0| 27.0|12.0|30976|  296083|
+-----+------+-----+---+-----+----+-----+--------+-----+---+-----+----+-----+--------+
only showing top 5 rows



In [None]:
final_late = total_late.withColumn("sl", when(total_late.CI1==total_late.CI2, 1).otherwise(0)).drop("CI1", "CI2", "identity")

In [None]:
final_late.show(5)

+------+------+-----+---+-----+----+---+
| node1| node2|label| cn|   pa|  tn| sl|
+------+------+-----+---+-----+----+---+
| 13625|273490|    0|1.0|152.0|41.0|  0|
|114962|185863|    0|0.0| 12.0| 8.0|  1|
| 14511|240201|    0|1.0| 42.0|12.0|  1|
|113192|173584|    0|1.0| 45.0|13.0|  1|
| 11461| 11544|    0|1.0|  3.0| 3.0|  1|
+------+------+-----+---+-----+----+---+
only showing top 5 rows



In [None]:
final_late.count()

129716

- Similarly, set a property on each node in the *CO_AUTHOR_LATE* subgraph, containing the first community ID that the Louvain algorithm found for this node.

- Now, each node in our graph contains 2 new properties. What are the names of these properties? 


**Hint:** Feel free to use the Neo4J Browser if it doesn't look intuitive from the code blocks above.

Let's now build a derived feature to express for each pair wether the nodes belong to the same Louvain community or not.

- Complete the function below to create this derived feature for each pair of nodes

Let's apply the function to our training DataFrame and do a quick sanity check of the number of resulting rows (that should be the same).

In [None]:
print('Counts before applying community feature engineering: ')
print(df_train_under.count())
print(df_test_under.count())

df_train_under_community_f = add_community_feature(df_train_under_pairs, "louvainTrain")
df_test_under_community_f = add_community_feature(df_test_under_pairs, "louvainTest")

print('Counts after applying community feature engineering: ')
print(df_train_under_community_f.count())
print(df_test_under_community_f.count())

Let's see how it looks:

In [None]:
df_train_under_community_f.filter(F.col('label') == 0).show(5)
df_test_under_community_f.filter(F.col('label') == 1).show(5)

# Save train and test DataFrames

- Join the graph topology features df with the community feature df.

In [None]:
print('Counts before joining df: ')
print(df_train_under_graph_f.count())
print(df_train_under_community_f.count())

print(df_test_under_graph_f.count())
print(df_test_under_community_f.count())

df_train_under = df_train_under_graph_f.join(df_train_under_community_f, on=['node1', 'node2', 'label'], how='inner')
df_test_under = df_test_under_graph_f.join(df_test_under_community_f, on=['node1', 'node2', 'label'], how='inner')

print('Counts after joining df: ')
print(df_train_under.count())
print(df_test_under.count())

Save our final train and test DataFrames to CSV files for use in the next notebook.


In [None]:
#final_early.write.csv(save_folder +  "df_train_under_all.csv", mode='overwrite', header=True)
final_late.write.csv(save_folder + "df_test_under_all.csv", mode='overwrite', header=True)

Please check that both datasets have been written to your Drive at the desired location because we are going to need them later for model training and testing.