# Example PySpark Development using Neo4j

## Import Python Libraries

In [None]:
from pyspark.sql.functions import udf, struct, col, to_json, json_tuple
from pyspark.sql.types import * 
from pyspark import SparkFiles
from neo4j import GraphDatabase
import pyspark.sql.functions as func
import pandas as pd

## Helper Functions

In [None]:
# Spark Neo4j Help function

def write_df_query(df, neo4j_url, neo4j_username, neo4j_pw, neo4j_db, query, batch_size=5000):
    """
    Writes a cypher query and commits it to the Graph DB
    """
    df.write.format("org.neo4j.spark.DataSource") \
      .mode("Overwrite") \
      .option("url", neo4j_url) \
      .option("authentication.basic.username", neo4j_username) \
      .option("authentication.basic.password", neo4j_pw) \
      .option("database", neo4j_db) \
      .option("batch.size", batch_size) \
      .option("query", query) \
      .save()

In [None]:
class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

## Credentials

In [None]:
# Change URL
uri = "<URL>:7687"
user="neo4j"
# Change Password
pwd = "<PWD>"
db="neo4j"

## Create node data

In [None]:
sdf_categories = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load('/mnt/neo4j/categories.csv'))

In [None]:
# Load nodes and draw location at relationships
query = '''
    MERGE (c:Categories2 {categoryID: toString(event.categoryID)})
    ON CREATE SET c.categoryName = event.categoryName
    '''

write_df_query(sdf_categories.repartition(10), uri, user, pwd, db, query)

## Connect Categories2 and Category

In [None]:
query = '''
MATCH (c1:Categories2 {categoryID: toString(event.categoryID)})
MATCH (c2:Category {categoryID: toString(event.categoryID)})
MERGE (c1)-[:SAME_AS]->(c2);
'''

write_df_query(sdf_categories.repartition(10), uri, user, pwd, db, query)

## Data Retrieval

In [None]:
conn = Neo4jConnection(uri=uri, user=user, pwd=pwd)

In [None]:
query = '''
MATCH (c:Customer {customerID: $customerId})
RETURN c;
'''
result = conn.query(query, parameters={"customerId": "ANTON"}, db=db)

In [None]:
df_result = pd.DataFrame([dict(record["c"]) for record in result])
df_result

Unnamed: 0,phone,companyName,customerID,fax
0,(5) 555-3932,Antonio Moreno Taquería,ANTON,
