# Google Storage Read CSV with Neo4j Spark Connector and PySpark

In the examples that follows, we will be using the Spark Connector running under PySpark
[Neo4j spark connector under Python](https://neo4j.com/docs/spark/current/python/).  This workbook has no dependencies to any other Neo4j client library (Python client or GDS client).

Please run this notebook from a valid Spark environment.  It was tested under [DataProc](https://cloud.google.com/dataproc).

## Setup Neo4j instance
Create a free account at https://sandbox.neo4j.com. Choose the “Blank Sandbox - Graph Data Science” option.
When your sandbox has been created, fill in the Bolt URL and password below.

## Environment Setup

In [None]:
pip install --upgrade seaborn

In [None]:
pip install --upgrade matplotlib

## Setup Neo4j Spark Connector imports

In [None]:
from pyspark.sql import SparkSession
import seaborn as sns
from matplotlib import pyplot as plt
import pandas as pd

Define Neo4j connection variables.  Yours will be different.

In [None]:
neo4j_url = "bolt://***removed***:7687"
neo4j_user = "neo4j"
neo4j_password = "***removed***"
neo4j_database= "neo4j"
tmp_working_bucket = "neo4j-sandbox/dataproc-working"

Create Spark Session, seeded with Neo4j packages.  If you don't want to wait for the download each time, load the connector into the master node using SSH.

In [None]:
spark = (SparkSession.builder
        .appName('Leverage Neo4j with Apache Spark')
        .master('local[*]')
        # Just to show dataframes as tables
        .config('spark.sql.repl.eagerEval.enabled', True)
        # On DataProc we must use spark 2.x
        .config('spark.jars.packages', 'org.neo4j:neo4j-connector-apache-spark_2.12:4.1.3_for_spark_2.4')       
        .config("neo4j.url", neo4j_url)
        .config("neo4j.authentication.type", "basic")
        .config("neo4j.authentication.basic.username", neo4j_user)
        .config("neo4j.authentication.basic.password", neo4j_password)
        .getOrCreate())
# output spark version
spark

Here we are going to create a utility class for the Spark Connector.  On DataProc the global configuration is not picked up so we need to supply credentials every time (clunk).  Also, some valid cypher statements will not run in the Spark Connector, as cypher.  They must be executed as side effects through the "script" argument of a dummy write operation.

In [None]:
class SparkConnector:
    
    def __init__( self, spark_session, uri, user, pwd):
        
        self.__spark_session = spark_session
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__count_cypher = "MATCH (n) RETURN count(n)"
        self.__no_side_effects = "MATCH (n) WHERE 1 = 2 SET n.placeholder = false"
        self.__df = self.read_cypher(self.__count_cypher)
            
    def read_cypher(self, query):
        
        response = None
        try: 
            response = self.__spark_session.read.format("org.neo4j.spark.DataSource") \
                .option("query", query) \
                .option("url", self.__uri) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", self.__user) \
                .option("authentication.basic.password", self.__pwd) \
                .option("partitions", "1") \
                .load()
        except Exception as e:
            print("Read query failed:", e)

        return response
    
    def write_cypher(self, query):
        
        response = None
        try: 
            response = self.__df.write.format("org.neo4j.spark.DataSource") \
                .option("query", query) \
                .option("url", self.__uri) \
                .option("authentication.type", "basic") \
                .option("authentication.basic.username", self.__user) \
                .option("authentication.basic.password", self.__pwd) \
                .option("partitions", "1") \
                .save()
        except Exception as e:
            print("Write query failed:", e)

        return response
    
    def write_script(self,cypher):
        
        response = None
        try: 
            response = self.__df.write.format("org.neo4j.spark.DataSource") \
              .option("url", self.__uri) \
              .option("authentication.type", "basic") \
              .option("authentication.basic.username", self.__user) \
              .option("authentication.basic.password", self.__pwd) \
              .option("query", self.__no_side_effects) \
              .option("script",cypher) \
              .option("partitions", "1") \
              .save()
        except Exception as e:
            print("Write query failed:", e)

        return response
     

             

Let's now check that GDS is running on the server by executing this Cypher query.
We only need to supply credentials once per notebook.

In [None]:
sparkConnector = SparkConnector(spark_session=spark,uri=neo4j_url, user=neo4j_user, pwd=neo4j_password)

In [None]:
df=sparkConnector.read_cypher("return gds.version() as gds_version")
df.show();

In [None]:
sparkConnector.read_cypher("MATCH (n:MSA) RETURN count(n)").show()

Optional if database is not empty!  Reset it!

In [None]:
reset_db_query = """CREATE OR REPLACE DATABASE `neo4j`"""
sparkConnector.write_script(reset_db_query)

Check that it's empty now.

In [None]:
sparkConnector.read_cypher("MATCH (n:MSA) RETURN count(n)").show()

# Load MSA data

Create unique constraint on MSA

In [None]:
sparkConnector.write_script("""
CREATE CONSTRAINT msa_name IF NOT EXISTS ON (m:MSA) ASSERT m.name IS NODE KEY
""")

Load CSV data

In [None]:
load_csv_query = """
LOAD CSV WITH HEADERS FROM 'https://raw.githubusercontent.com/smithna/datasets/main/CensusDemographicsByMetroArea.csv' 
AS row
WITH row WHERE row.name CONTAINS 'Metro'
MERGE (m:MSA {name:row.name})
SET m.population = toInteger(row.population),
m.medianHouseholdIncome = toInteger(row.medianHouseholdIncome),
m.medianHomePrice = toInteger(row.medianHomePrice),
m.percentOver25WithBachelors = toFloat(row.percentOver25WithBachelors)
RETURN count(m) as msaCount"""

sparkConnector.write_script(load_csv_query)

In [None]:
msa_df_query = """
MATCH (m:MSA)
RETURN m.name AS msa, 
m.population AS population,
m.medianHouseholdIncome AS medianHouseholdIncome,
m.medianHomePrice AS medianHomePrice,
m.percentOver25WithBachelors as percentOver25WithBachelors"""

msa_df=sparkConnector.read_cypher(msa_df_query)
msa_df.show()

Convert Spark dataframe to pandas to display histogram

In [None]:
pandas_msa_df=msa_df.toPandas()
print(pandas_msa_df)

In [None]:
fig, axes = plt.subplots(4, 2)
fig.set_size_inches(15,30)
for i in range(1,5):
    sns.histplot(pandas_msa_df.iloc[:,i], ax=axes[i-1,0])
    sns.histplot(pandas_msa_df.iloc[:, i], log_scale=True, ax=axes[i-1,1])

That log transformation looks like it will help. Run the Cypher to store the transformed values in the graph.

In [None]:
msa_df_update_query = """
MATCH (m:MSA)
SET 
m.logPopulation = log(m.population),
m.logMedianHouseholdIncome = log(m.medianHouseholdIncome),
m.logMedianHomePrice = log(m.medianHomePrice),
m.logPercentOver25WithBachelors = log(m.percentOver25WithBachelors)
"""

sparkConnector.write_cypher(msa_df_update_query)

Check that logs were committed to the database

In [None]:
msa_df_log_query = """
MATCH (m:MSA)
RETURN m.name AS msa, 
m.population AS population,
m.logPopulation,
m.medianHouseholdIncome AS medianHouseholdIncome
"""

msa_df = sparkConnector.read_cypher(msa_df_log_query)

msa_df.show()

## Create in-memory graph projection
Passing `"*"` as the third argument to `gds.graph.project` tells GDS to include any relationships that exist in the database in the in-memory graph. Because no relationships have been created in the graph yet, there will be no relationships in the in-memory graph projection when it is created.

In [None]:
graph_project_query = """
    CALL gds.graph.project(
    'msa-graph', 
    'MSA', 
    '*', 
    {nodeProperties: ["logPopulation", 
        "logMedianHouseholdIncome", 
        "logMedianHomePrice", 
        "logPercentOver25WithBachelors"]})
"""

sparkConnector.write_script(graph_project_query)


## Apply MinMax scalar to property values

In [None]:
graph_scale_properties_mutations = """
CALL gds.alpha.scaleProperties.mutate("msa-graph", {
                                 nodeProperties: [
                                     "logPopulation", 
                                     "logMedianHouseholdIncome", 
                                     "logMedianHomePrice", 
                                     "logPercentOver25WithBachelors"], 
                                 scaler : "MinMax",
                                 mutateProperty : "scaledProperties"
                                 })
                                 """

sparkConnector.write_script(graph_scale_properties_mutations)

This next line streams node properties to the procedure caller.

In [None]:
graph_stream_scaled_properties_query = """
CALL gds.graph.streamNodeProperty('msa-graph', 'scaledProperties')
YIELD nodeId, propertyValue
RETURN nodeId, propertyValue
                                 """
sp_df = sparkConnector.read_cypher(graph_stream_scaled_properties_query)

pandas_sp_df=sp_df.toPandas()
pd.DataFrame(list(pandas_sp_df['propertyValue'])).iloc[:,0].hist()

Cleanup resources

In [None]:
pd.DataFrame(list(pandas_sp_df['propertyValue'])).iloc[:,1].hist()

In [None]:
pd.DataFrame(list(pandas_sp_df['propertyValue'])).iloc[:,2].hist()

In [None]:
pd.DataFrame(list(pandas_sp_df['propertyValue'])).iloc[:,3].hist()

Run KNN to create relationships to nearest neighbors
First run in stats mode and look at the similarity distribution.

In [None]:
knn_stats_query = """CALL gds.knn.stats("msa-graph",
   {
      nodeProperties:{
      scaledProperties:"EUCLIDEAN"},
      topK:15,
      similarityCutoff: 0.8350143432617188,
      sampleRate:1,
      randomSeed:42,
      concurrency:1
   }
) 
YIELD similarityDistribution 
RETURN similarityDistribution """
                                    
knn_stats=sparkConnector.read_cypher(knn_stats_query)
knn_stats.collect()[0]

Write KNN nearest neighbors

In [None]:
knn_write = f"""CALL gds.knn.mutate("msa-graph",
               {{nodeProperties: {{scaledProperties: "EUCLIDEAN"}},
               topK: 15,
               mutateRelationshipType: "IS_SIMILAR",
               mutateProperty: "similarity",
               similarityCutoff: {knn_stats.collect()[0]['similarityDistribution']['p1']},
               sampleRate:1,
               randomSeed:42,
               concurrency:1}}
              ) """

#print(knn_write)
sparkConnector.write_script(knn_write)

Write back to Neo4j graph

In [None]:
similarity_relationship_writeback = """CALL gds.graph.writeRelationship(
    "msa-graph",
    "IS_SIMILAR",
    "similarity"
)"""

sparkConnector.write_script(similarity_relationship_writeback)

Add rank updates

In [None]:
add_rank_update = """
MATCH (m:MSA)-[s:IS_SIMILAR]->()
WITH m, s ORDER BY s.similarity DESC
WITH m, collect(s) as similarities, range(0, 11) AS ranks
UNWIND ranks AS rank
WITH rank, similarities[rank] AS rel
SET rel.rank = rank + 1
"""

sparkConnector.write_cypher(add_rank_update)

## Run Louvain Community Detection

See how many communities Louvain is going to recommend

In [None]:
read_louvain = """
CALL gds.louvain.stats('msa-graph',
{relationshipTypes: ["IS_SIMILAR"],
relationshipWeightProperty:"similarity"})
YIELD communityCount, modularities
RETURN communityCount, modularities
"""
sparkConnector.read_cypher(read_louvain).show()

Now commit louvain communities to database

In [None]:
write_louvain = """
CALL gds.louvain.write('msa-graph',
{relationshipTypes: ["IS_SIMILAR"],
relationshipWeightProperty:"similarity",
 writeProperty:"communityId"})
YIELD communityCount, modularities
RETURN communityCount, modularities
"""

sparkConnector.write_cypher(write_louvain)

In [None]:
community_query = """
MATCH (m:MSA)
WITH m 
ORDER BY apoc.coll.sum([(m)-[s:IS_SIMILAR]->(m2) 
WHERE m.communityId = m2.communityId | s.similarity]) desc
RETURN m.communityId as communityId,
count(m) as msaCount, 
avg(m.population) as avgPopulation,
avg(m.medianHomePrice) as avgHomePrice,
avg(m.medianHouseholdIncome) as avgIncome,
avg(m.percentOver25WithBachelors) as avgPctBachelors,
collect(m.name)[..3] as exampleMSAs
"""

## Removed final sort because this doesn]t work with Spark 
## ORDER BY avgPopulation DESC
## post sorting in spark
                                      
community_df=sparkConnector.read_cypher(community_query).sort("avgPopulation")

In [None]:
pd_community_df=community_df.toPandas()
pd_community_df

In [None]:
fig, axes = plt.subplots(5, 1)
fig.set_size_inches(6,20)
for i in range(1,6):
    sns.barplot(data=pd_community_df, x="communityId", y=pd_community_df.columns[i], ax=axes[i-1])

Mean can give us a quick overview of properties, but can be skewed by outliers. Compare emperical cumulative distribution function (ECDF) at various proportions to get a more complete picture of distributions.

In [None]:
# we need to remove sort by here
detail_query="""
MATCH (m:MSA)
RETURN "community " + m.communityId as communityId,
m.population as population,
m.medianHomePrice as medianHomePrice,
m.medianHouseholdIncome as medianIncome,
m.percentOver25WithBachelors as pctBachelors
"""

## post sorting in spark

detail_df=sparkConnector.read_cypher(detail_query).sort("communityId")


In [None]:
pd_detail_df=detail_df.toPandas()
pd_detail_df

In [None]:
fig, axes = plt.subplots(4, 1)
fig.set_size_inches(6,20)
for i in range(1,5):
    sns.ecdfplot(data=pd_detail_df, hue="communityId", x=pd_detail_df.columns[i], log_scale=True, ax=axes[i-1])

Compare two-dimensions on scatter plots

In [None]:
splot = sns.scatterplot(data=pd_detail_df, x="medianIncome", y="population", hue="communityId")
splot.set(yscale="log")
splot.set(xscale="log")

In [None]:
splot = sns.scatterplot(data=pd_detail_df, x="pctBachelors", y="medianHomePrice", hue="communityId")
splot.set(yscale="log")
splot.set(xscale="log")

## Optional: assign human-friendly names to the clusters discovered.
The Louvain community detection algorithm is not deterministic. You should have roughly the same clusters from previous runs, but some edge cases might be assigned to different communities. The community numbers might be shuffled between across different runs.  
**This step requires adjustment by hand: choose from community IDs above.**

In [None]:
update_community_name_query="""
MATCH (m:MSA) 
  SET m.communityName = CASE m.communityId 
  WHEN 56 THEN "Large mid-cost metros"
  WHEN 83 THEN "College towns"
  WHEN 254 THEN "Large high-cost metros"
  WHEN 266 THEN "Mid-size metros"
  WHEN 277 THEN "Small metros"
  WHEN 315 THEN "Mid-price metros"
  WHEN 333 THEN "Low-income metros"
  END
"""

sparkConnector.write_cypher(update_community_name_query)

Check on updates

In [None]:
describe_query="""
MATCH (m:MSA)
return m.communityName, m.communityId, count(*)
"""

sparkConnector.read_cypher(describe_query)

Create an index on the communityName property to make it searchable in Bloom.

In [None]:
sparkConnector.write_script("""
CREATE INDEX msa_community_name IF NOT EXISTS
FOR (m:MSA) ON (m.communityName)
""")

Now open Bloom and do some additional analysis!

## Cleanup

In [None]:
graph_project_drop = """
    CALL gds.graph.drop(
    'msa-graph')
"""
sparkConnector.write_script(graph_project_drop)