In [None]:
from pyspark.sql import SparkSession
from google.colab import userdata
neo4j_password = userdata.get('neo4j_password')

url = "neo4j+s://7ffc03b9.databases.neo4j.io:7687"
username = "neo4j"

spark = (
    SparkSession.builder.config("neo4j.url", url)
    .config("spark.jars.packages", "org.neo4j:neo4j-connector-apache-spark_2.12:5.3.3_for_spark_3,graphframes:graphframes:0.8.4-spark3.5-s_2.12") \
    .config("neo4j.authentication.basic.username", username)
    .config("neo4j.authentication.basic.password", neo4j_password)
    .getOrCreate()
)

Let's reimplement our Fraud ring example.

In [None]:
has_address_df = (spark.read
            .format('org.neo4j.spark.DataSource')
            .option('relationship', 'HAS_ADDRESS')
            .option('relationship.source.labels', ':AccountHolder')
            .option('relationship.target.labels', ':Address')
            .load())

has_address_df.show()

+-------------------+-----------+-----------+---------------+---------------+---------------+----------------+-----------+---------------+--------------+-----------------+------------+-------------+
|           <rel.id>| <rel.type>|<source.id>|<source.labels>|source.UniqueId|source.LastName|source.FirstName|<target.id>|<target.labels>|target.ZipCode|    target.Street|target.State|  target.City|
+-------------------+-----------+-----------+---------------+---------------+---------------+----------------+-----------+---------------+--------------+-----------------+------------+-------------+
|1152921504606846998|HAS_ADDRESS|         22|[AccountHolder]|        JohnDoe|            Doe|            John|         25|      [Address]|         94101|123 NW 1st Street|  California|San Francisco|
|1152921504606846999|HAS_ADDRESS|         23|[AccountHolder]|  JaneAppleseed|      Appleseed|            Jane|         25|      [Address]|         94101|123 NW 1st Street|  California|San Francisco|
|1152

In [None]:
from pyspark.sql import functions as F

account_address_df = has_address_df.select(
    F.col("`source.UniqueId`").alias("account_id"),
    F.col("`source.FirstName`").alias("account_firstname"),
    F.col("`source.LastName`").alias("account_lastname"),
    F.col("`target.ZipCode`").alias("address_zip"),
    F.col("`target.Street`").alias("address_street"),
    F.col("`target.City`").alias("address_city"),
    F.col("`target.State`").alias("address_state")
)

account_address_df.show()

+-------------+-----------------+----------------+-----------+-----------------+-------------+-------------+
|   account_id|account_firstname|account_lastname|address_zip|   address_street| address_city|address_state|
+-------------+-----------------+----------------+-----------+-----------------+-------------+-------------+
|      JohnDoe|             John|             Doe|      94101|123 NW 1st Street|San Francisco|   California|
|JaneAppleseed|             Jane|       Appleseed|      94101|123 NW 1st Street|San Francisco|   California|
|    MattSmith|             Matt|           Smith|      94101|123 NW 1st Street|San Francisco|   California|
+-------------+-----------------+----------------+-----------+-----------------+-------------+-------------+



In [None]:
fraud_rings_df = account_address_df.groupBy(
    "address_zip", "address_street", "address_city", "address_state"
).agg(
    F.countDistinct("account_id").alias("ring_size"),
    F.collect_list("account_id").alias("account_ids"),
    F.collect_list("account_firstname").alias("first_names"),
    F.collect_list("account_lastname").alias("last_names")
)

# Filter to find fraud rings with more than 1 AccountHolder
fraud_rings_df = fraud_rings_df.filter("ring_size > 1")

fraud_rings_df.show()

+-----------+-----------------+-------------+-------------+---------+--------------------+------------------+--------------------+
|address_zip|   address_street| address_city|address_state|ring_size|         account_ids|       first_names|          last_names|
+-----------+-----------------+-------------+-------------+---------+--------------------+------------------+--------------------+
|      94101|123 NW 1st Street|San Francisco|   California|        3|[JaneAppleseed, M...|[Jane, Matt, John]|[Appleseed, Smith...|
+-----------+-----------------+-------------+-------------+---------+--------------------+------------------+--------------------+



Now let's use Graphframes to solve the same problem

In [None]:
pip install graphframes



In [None]:
from graphframes import GraphFrame

has_address_edges_df = (spark.read
            .format('org.neo4j.spark.DataSource')
            .option('relationship', 'HAS_ADDRESS')
            .option('relationship.source.labels', ':AccountHolder')
            .option('relationship.target.labels', ':Address')
            .load())

account_df = spark.read \
            .format('org.neo4j.spark.DataSource') \
            .option('labels', ':AccountHolder') \
            .load()

account_df = account_df \
    .select(
        F.col("<id>").alias("id"),
        F.col("<labels>").alias("labels"),
    )

address_df = spark.read \
            .format('org.neo4j.spark.DataSource') \
            .option('labels', ':Address') \
            .load()

address_df = address_df \
    .select(
        F.col("<id>").alias("id"),
        F.col("<labels>").alias("labels"),
    )

vertices_df = account_df.union(address_df)

edges_df = has_address_edges_df \
    .select(
        F.col("`<source.id>`").alias("src"),
        F.col("`<target.id>`").alias("dst"),
        F.col("`<rel.type>`").alias("relationship")
    )

g = GraphFrame(vertices_df, edges_df)

g.vertices.show()
g.edges.show()

+---+---------------+
| id|         labels|
+---+---------------+
| 22|[AccountHolder]|
| 23|[AccountHolder]|
| 24|[AccountHolder]|
| 25|      [Address]|
+---+---------------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
| 22| 25| HAS_ADDRESS|
| 23| 25| HAS_ADDRESS|
| 24| 25| HAS_ADDRESS|
+---+---+------------+



In [None]:
df_AC = (
    g.find("(A)-[r]->(C)")
    .select("A", "C", "r")
)
df_AC.show(truncate=False)

df_grouped = (
    df_AC
    .groupBy("C")
    .agg(
        F.countDistinct("A").alias("RingSize"),
        F.collect_list("r").alias("relationships")
    )
    .filter("RingSize > 1")
)

df_grouped.show(truncate=False)


+---------------------+---------------+---------------------+
|A                    |C              |r                    |
+---------------------+---------------+---------------------+
|{24, [AccountHolder]}|{25, [Address]}|{24, 25, HAS_ADDRESS}|
|{23, [AccountHolder]}|{25, [Address]}|{23, 25, HAS_ADDRESS}|
|{22, [AccountHolder]}|{25, [Address]}|{22, 25, HAS_ADDRESS}|
+---------------------+---------------+---------------------+

+---------------+--------+---------------------------------------------------------------------+
|C              |RingSize|relationships                                                        |
+---------------+--------+---------------------------------------------------------------------+
|{25, [Address]}|3       |[{22, 25, HAS_ADDRESS}, {24, 25, HAS_ADDRESS}, {23, 25, HAS_ADDRESS}]|
+---------------+--------+---------------------------------------------------------------------+



You can also use a Cypher query to read data from Neo4j into a Dataframe. This example reads the entire graph which in most cases is probably overkill.

In [None]:
full_graph_df = (spark.read
            .format('org.neo4j.spark.DataSource')
            .option('query', '''
              MATCH (n)-[r]->(m)
              RETURN n, r, m
              ''')
            .load())

full_graph_df.show()

+--------------------+--------------------+--------------------+
|                   n|                   r|                   m|
+--------------------+--------------------+--------------------+
|{22, [AccountHold...|{1152921504606846...|{25, [Address], N...|
|{22, [AccountHold...|{1152922604118474...|{26, [PhoneNumber...|
|{22, [AccountHold...|{1152923703630102...|{32, [SSN], 241-2...|
|{22, [AccountHold...|{1152924803141730...|{33, [CreditCard]...|
|{22, [AccountHold...|{1152925902653358...|{34, [BankAccount...|
|{23, [AccountHold...|{1152921504606846...|{25, [Address], N...|
|{23, [AccountHold...|{1152922604118474...|{26, [PhoneNumber...|
|{23, [AccountHold...|{1152923703630102...|{27, [SSN], 241-2...|
|{23, [AccountHold...|{1152924803141730...|{35, [CreditCard]...|
|{23, [AccountHold...|{1152925902653358...|{36, [BankAccount...|
|{23, [AccountHold...|{1152927002164985...|{37, [UnsecuredLo...|
|{24, [AccountHold...|{1152921504606847...|{25, [Address], N...|
|{24, [AccountHold...|{11

In [None]:
full_graph_df.printSchema()

root
 |-- n: struct (nullable = true)
 |    |-- <id>: long (nullable = false)
 |    |-- <labels>: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- UniqueId: string (nullable = true)
 |    |-- LastName: string (nullable = true)
 |    |-- FirstName: string (nullable = true)
 |-- r: struct (nullable = true)
 |    |-- <rel.id>: long (nullable = false)
 |    |-- <rel.type>: string (nullable = false)
 |    |-- <source.id>: long (nullable = false)
 |    |-- <target.id>: long (nullable = false)
 |-- m: struct (nullable = true)
 |    |-- <id>: long (nullable = false)
 |    |-- <labels>: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- SSN: string (nullable = true)

