<a href="https://colab.research.google.com/github/kanawanttotimetravel/INT3229-final-project/blob/main/Interact_with_Neo4j_Aura.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Configure Neo4j Aura parameters

In [None]:
neo4j_url = "neo4j+s://14efed30.databases.neo4j.io"
neo4j_user = "neo4j"
neo4j_password = "2q4226dEap8hs-dev1bMMbVgMC4ZJ6YUwrMXcF57fFI"

# Configure the Spark Environment

In [None]:
spark_version = '3.4.4'

In [None]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-$spark_version/spark-$spark_version-bin-hadoop3.tgz

In [None]:
!tar xf spark-$spark_version-bin-hadoop3.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop3"

# Create Spark Session

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master('local[*]')
        .appName('Data science workflow with Neo4j and Spark')
        .config('spark.ui.port', '4050')
        # Just to show dataframes as tables
        #.config('spark.sql.repl.eagerEval.enabled', False)
        .config('spark.jars.packages', 'org.neo4j:neo4j-connector-apache-spark_2.12:5.1.0_for_spark_3')
        # As we're using always the same database instance we'll
        # define them as global variables
        # so we don't need to repeat them each time
        .config("neo4j.url", neo4j_url)
        .config("neo4j.authentication.type", "basic")
        .config("neo4j.authentication.basic.username", neo4j_user)
        .config("neo4j.authentication.basic.password", neo4j_password)
        .getOrCreate())
spark

In [None]:
# import utility functions that we'll use in the notebook
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Run the queries

In [None]:
# Must run on neo4j
constraint_query = """
CREATE CONSTRAINT FOR (u:User) REQUIRE u.id IS UNIQUE
CREATE CONSTRAINT FOR (t:Team) REQUIRE t.id IS UNIQUE
CREATE CONSTRAINT FOR (c:TeamChatSession) REQUIRE c.id IS UNIQUE
CREATE CONSTRAINT FOR (i:ChatItem) REQUIRE i.id IS UNIQUE"""

In [None]:
def execute_write_cypher_query(query):
    try :
        # Create a dummy DataFrame. This dataframe's purpose is purely to execute write queries
        df = spark.createDataFrame([("dummy",)], ["col"])
        df.write.format("org.neo4j.spark.DataSource") \
            .option("url", neo4j_url) \
            .option("authentication.basic.username", neo4j_user) \
            .option("authentication.basic.password", neo4j_password) \
            .option("query", query) \
            .mode("overwrite") \
            .save()
        df.show()
    except Exception as e:
        print(f"Error executing query: {e}")

In [None]:
def execute_read_cypher_query(query):
    try:
        result_df = spark.read.format("org.neo4j.spark.DataSource") \
            .option("url", neo4j_url) \
            .option("authentication.basic.username", neo4j_user) \
            .option("authentication.basic.password", neo4j_password) \
            .option("query", query) \
            .load()
        result_df.limit(10).show()
    except Exception as e:
        print(f"Error executing query: {e}")

In [None]:
chat_create_team_chat_queries = """LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_create_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (t:Team {id: toInteger(row[1])})
MERGE (c:TeamChatSession {id: toInteger(row[2])})
MERGE (u)-[:CreatesSession {timeStamp: row[3]}]->(c)
MERGE (c)-[:OwnedBy {timeStamp: row[3]}]->(t)"""

execute_write_cypher_query(chat_create_team_chat_queries)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
# must run on neo4j
chat_item_team_chat_query = """
CALL apoc.periodic.iterate('LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_item_team_chat.csv" AS row RETURN row',
'MERGE (u:User {id: toInteger(row[0])})
MERGE (t:TeamChatSession {id: toInteger(row[1])})
MERGE (c:ChatItem {id: toInteger(row[2])})
MERGE (u)-[:CreatesChat {timeStamp: row[3]}]->(c)
MERGE (c)-[:PartOf {timeStamp: row[3]}]->(t)', {batchSize:1000})
"""

# Can't run with spark
# execute_write_cypher_query(chat_item_team_chat_query)

Error executing query: Please provide a valid WRITE query


In [None]:
chat_join_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_join_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (c:TeamChatSession {id: toInteger(row[1])})
MERGE (u)-[:Joins {timeStamp: row[2]}]->(c)"""

execute_write_cypher_query(chat_join_team_chat_query)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
chat_leave_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_leave_team_chat.csv" AS row
MERGE (u:User {id: toInteger(row[0])})
MERGE (c:TeamChatSession {id: toInteger(row[1])})
MERGE (u)-[:Leaves {timeStamp: row[2]}]->(c)"""

execute_write_cypher_query(chat_leave_team_chat_query)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
chat_mention_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_mention_team_chat.csv" AS row
MERGE (c:ChatItem {id: toInteger(row[0])})
MERGE (u:User {id: toInteger(row[1])})
MERGE (c)-[:Mentioned {timeStamp: row[2]}]->(u)"""

execute_write_cypher_query(chat_mention_team_chat_query)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
chat_respond_team_chat_query = """LOAD CSV FROM "https://raw.githubusercontent.com/kanawanttotimetravel/INT3229-final-project/refs/heads/main/chat_data/chat_respond_team_chat.csv" AS row
MERGE (c1:ChatItem {id: toInteger(row[0])})
MERGE (c2:ChatItem {id: toInteger(row[1])})
MERGE (c1)-[:ResponseTo {timeStamp: row[2]}]->(c2)"""

execute_write_cypher_query(chat_respond_team_chat_query)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
# Check node
check_node_query = "MATCH (n) RETURN COUNT(n) AS NumberOfNode"

execute_read_cypher_query(check_node_query)

+------------+
|NumberOfNode|
+------------+
|       45463|
+------------+



In [None]:
check_edge_query = "MATCH()-[r]->() RETURN COUNT(r) AS numberOfEdges"

execute_read_cypher_query(check_edge_query)

+-------------+
|numberOfEdges|
+-------------+
|       118502|
+-------------+



In [None]:
query_Q1_a = """MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WITH p, length(p) AS length
ORDER BY length DESC
WITH nodes(p) AS longestChain, length AS longestChainLength
LIMIT 1
RETURN longestChain, longestChainLength
"""

execute_read_cypher_query(query=query_Q1_a)

+--------------------+------------------+
|        longestChain|longestChainLength|
+--------------------+------------------+
|[{7364, [ChatItem...|                 9|
+--------------------+------------------+



In [None]:
query_Q1_b = """MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WITH length(p) AS longestChainLength
ORDER BY longestChainLength DESC
LIMIT 1

// Find all paths with this length
MATCH p=(start:ChatItem)-[:ResponseTo*]->(end:ChatItem)
WHERE length(p) = longestChainLength
WITH p

// Extract users involved in these paths
MATCH (u:User)-[:CreatesChat]->(ci:ChatItem)
WHERE ci IN nodes(p)
RETURN count(distinct u) AS numberOfUsers"""

execute_read_cypher_query(query_Q1_b)

+-------------+
|numberOfUsers|
+-------------+
|            5|
+-------------+



In [None]:
query_Q2_a = """MATCH (u:User)-[:CreatesChat]->(c:ChatItem)
RETURN u.id AS userId, count(c) AS chatCount
ORDER BY chatCount DESC
"""

execute_read_cypher_query(query_Q2_a)

+------+---------+
|userId|chatCount|
+------+---------+
|   394|      115|
|  2067|      111|
|   209|      109|
|  1087|      109|
|   554|      107|
|   999|      105|
|   516|      105|
|  1627|      105|
|   668|      104|
|   461|      104|
+------+---------+



In [None]:
query_Q2_b = """MATCH (ci:ChatItem)-[:PartOf]->(tcs:TeamChatSession)-[:OwnedBy]->(t:Team)
RETURN t.id AS teamId, count(ci) AS chatCount
ORDER BY chatCount DESC
"""

execute_read_cypher_query(query_Q2_b)

+------+---------+
|teamId|chatCount|
+------+---------+
|    82|     1324|
|   185|     1036|
|   112|      957|
|    18|      844|
|   194|      836|
|   129|      814|
|    52|      788|
|   136|      783|
|   146|      746|
|    81|      736|
+------+---------+



In [None]:
query_Q2_c = """
// Identify top 10 chattiest users
WITH [394, 2067, 1087, 209, 554, 1627, 516, 999, 668, 461] AS topChattiestUsers

// Identify top 10 chattiest teams
WITH topChattiestUsers, [82, 185, 112, 18, 194, 129, 52, 136, 146, 81] AS topChattiestTeams

// Find users who belong to the top chattiest teams
MATCH (u:User)-[:CreatesChat]->(:ChatItem)-[:PartOf]->(:TeamChatSession)-[:OwnedBy]->(t:Team)
WHERE u.id IN topChattiestUsers
AND t.id IN topChattiestTeams
RETURN DISTINCT u.id AS User, t.id AS Team"""

execute_read_cypher_query(query_Q2_c)

+----+----+
|User|Team|
+----+----+
| 999|  52|
+----+----+



In [None]:
query_Q3_a = """MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:Mentioned]->(u2:User)
CREATE (u1)-[:InteractsWith]->(u2)
UNION
MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:ResponseTo]->(:ChatItem)<-[:CreatesChat]-(u2:User)
CREATE (u1)-[:InteractsWith]->(u2)
UNION
MATCH (u:User)-[r:InteractsWith]->(u)
DELETE r;
"""
execute_write_cypher_query(query_Q3_a)

+-----+
|  col|
+-----+
|dummy|
+-----+



In [None]:
query_Q3_b = """MATCH (u:User)-[c:CreatesChat]->()
WITH u, COUNT(c) as Chats
ORDER BY Chats DESC LIMIT 10 WITH [u] as ChattiestUsers
//Getting the neighbours of all Users and the count
MATCH (u1:User)-[:InteractsWith]->(u2:User)
WHERE u1 in ChattiestUsers
WITH u1.id AS UserID, COLLECT(DISTINCT u2.id) AS Neighbours RETURN UserID, Neighbours, SIZE(Neighbours) AS k"""

execute_read_cypher_query(query_Q3_b)

+------+--------------------+---+
|UserID|          Neighbours|  k|
+------+--------------------+---+
|   394|[1012, 2011, 1997...|  4|
|  2067|[516, 209, 1627, ...|  6|
|   209|[516, 1627, 63, 2...|  5|
|  1087|[1311, 426, 929, ...|  6|
|   554|[1959, 1687, 2018...|  5|
|   999|[1554, 1506, 1398...|  8|
|   516|[209, 1627, 63, 2...|  6|
|  1627|[516, 209, 63, 20...|  6|
|   668|    [698, 2034, 648]|  3|
|   461|   [1482, 1675, 482]|  3|
+------+--------------------+---+



In [None]:
query_Q3_c = """// Getting TOP 10 Chattiest Users
MATCH (u:User)-[c:CreatesChat]->()
WITH u, COUNT(c) AS Chats
ORDER BY Chats DESC
LIMIT 10
WITH COLLECT(u) AS ChattiestUsers

// Getting the neighbours of TOP 10 Users and the count
MATCH (u1:User)-[:InteractsWith]->(u2:User)
WHERE u1 IN ChattiestUsers
WITH u1.id AS UserID, COLLECT(DISTINCT u2.id) AS Neighbours, SIZE(COLLECT(DISTINCT u2.id)) AS k

// Find Intersecting Users
MATCH (u1:User)-[:InteractsWith]->(u2:User)
// Such that both belong in Neighbours list
WHERE u1.id IN Neighbours AND u2.id IN Neighbours
// For every valid combination of a User and its two neighbours,
// Value is 1 if neighbours have interacted at least once, k is no. of Neighbours
WITH DISTINCT UserID, u1.id AS N1, u2.id AS N2, CASE WHEN (u1)-[:InteractsWith]->(u2) THEN 1 ELSE 0 END AS VALUE, k
WITH UserID, SUM(VALUE) AS NUM, k, k*(k-1) AS DENUM
RETURN UserID, NUM, DENUM, NUM/(DENUM*1.0) AS ClusteringCoefficient
ORDER BY ClusteringCoefficient DESC"""

execute_read_cypher_query(query_Q3_c)

+------+---+-----+---------------------+
|UserID|NUM|DENUM|ClusteringCoefficient|
+------+---+-----+---------------------+
|   668|  6|    6|                  1.0|
|   209| 20|   20|                  1.0|
|   999| 53|   56|   0.9464285714285714|
|   516| 28|   30|   0.9333333333333333|
|  1627| 28|   30|   0.9333333333333333|
|  2067| 28|   30|   0.9333333333333333|
|   461|  5|    6|   0.8333333333333334|
|   554| 16|   20|                  0.8|
|   394|  9|   12|                 0.75|
|  1087| 22|   30|   0.7333333333333333|
+------+---+-----+---------------------+



# Interact with neo4j python

In [None]:
!pip install neo4j

Collecting neo4j
  Downloading neo4j-5.27.0-py3-none-any.whl.metadata (5.9 kB)
Downloading neo4j-5.27.0-py3-none-any.whl (301 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m301.7/301.7 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: neo4j
Successfully installed neo4j-5.27.0


In [None]:
from neo4j import GraphDatabase
import pandas as pd

In [None]:
class Neo4jConnection:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    def close(self):
        self.driver.close()

    def query(self, query, parameters=None):
        with self.driver.session() as session:
            result = session.run(query, parameters)
            return [record.data() for record in result]

In [None]:
conn = Neo4jConnection(neo4j_url, neo4j_user, neo4j_password)

In [None]:
que = "MATCH (n) RETURN COUNT(n) AS NumberOfNode"
result = conn.query(que)
result

[{'NumberOfNode': 45463}]

In [None]:
que = "MATCH ()-[e]->() RETURN COUNT(e) AS NumberOfEdges"
result = conn.query(que)
result

[{'NumberOfEdges': 145167}]

In [None]:
create_interact_query_mention = """
MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:Mentioned]->(u2:User)
CREATE (u1)-[:InteractsWith]->(u2);
"""

create_interact_query_response = """
MATCH (u1:User)-[:CreatesChat]->(:ChatItem)-[:ResponseTo]->(:ChatItem)<-[:CreatesChat]-(u2:User)
CREATE (u1)-[:InteractsWith]->(u2);
"""

conn.query(create_interact_query_mention)
conn.query(create_interact_query_response)

[]

In [None]:
que = "MATCH ()-[e]->() RETURN COUNT(e) AS NumberOfEdges"
result = conn.query(que)
result

[{'NumberOfEdges': 167324}]