In [None]:
! sudo cp /home/jovyan/work/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar /usr/local/spark/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar

In [None]:
!pip install -q cassandra-driver

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
# Madhumitha Saravanan
# msaravan@syr.edu

### Question 1

In the cell below configure a spark session that is configured to connect to `mongodb`, `minio`, `cassandra`, '`elasticsearch` and `neo4j`.

In [None]:
#1 Spark session
import pyspark
from pyspark.sql import SparkSession

cassandra_host = "cassandra"
s3_host = "minio"
s3_server = f"http://{s3_host}:9000"
s3_access_key = "minio"
s3_secret_key = "SU2orange!"
s3_bucket = "enrollments"
elastic_host = "elasticsearch"
elastic_port = "9200"
mongo_uri = f"mongodb://admin:mongopw@mongo:27017/admin?authSource=admin"
bolt_url = "bolt://neo4j:7687"
jars = [
     "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1",
    "com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0",
    "org.elasticsearch:elasticsearch-spark-20_2.12:7.15.0"
    ]

spark = SparkSession.builder\
        .master("local")\
        .appName('jupyter-pyspark')\
        .config("spark.jars.packages",",".join(jars) )\
        .config("spark.cassandra.connection.host",cassandra_host)\
        .config("spark.es.nodes",elastic_host)\
        .config("spark.es.port",elastic_port)\
        .config("spark.es.nodes.wan.only", "true") \
        .config("spark.mongodb.input.uri",mongo_uri)\
        .config("spark.mongodb.output.uri",mongo_uri)\
        .config("spark.hadoop.fs.s3a.endpoint", s3_server ) \
        .config("spark.hadoop.fs.s3a.access.key", s3_access_key) \
        .config("spark.hadoop.fs.s3a.secret.key", s3_secret_key) \
        .config("spark.hadoop.fs.s3a.fast.upload", True) \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
         .getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR") # Keeps the noise down








In [None]:
# Read the process-oriented data enrollments and sections from minio using PySpark
#2a enrollments
enrollments= spark.read.csv("s3a://enrollments/enrollments.csv",header=True)
enrollments= enrollments.withColumnRenamed("term", "enrollmentterm")
enrollments= enrollments.withColumnRenamed("course", "enrollmentcourse")
enrollments= enrollments.withColumnRenamed("section","enrollmentsection")
enrollments.show()



+--------------+-----------------+----------------+-----------------+-----------------+-----+------------+
|enrollmentterm|course_enrollment|enrollmentcourse|enrollmentsection|       student_id|grade|grade_points|
+--------------+-----------------+----------------+-----------------+-----------------+-----+------------+
|          1221|                1|          IST659|             M001|      orenjouglad|    C|         2.0|
|          1221|                2|          IST659|             M001|      billmelator|    A|         4.0|
|          1221|                3|          IST659|             M001|       morrisless|    A|         4.0|
|          1221|                4|          IST659|             M001|amberwavesofgrain|   A-|       3.667|
|          1221|                5|          IST659|             M001|         abbykuss|    A|         4.0|
|          1221|                6|          IST659|             M001|       tallyitupp|    A|         4.0|
|          1221|                7|   

In [None]:
#2b sections
sections= spark.read.csv("s3a://enrollments/sections.csv",header=True)

sections.show()


+----+------+-------+----------+--------+
|term|course|section|enrollment|capacity|
+----+------+-------+----------+--------+
|1221|IST659|   M001|        20|      20|
|1221|IST659|   M002|        20|      20|
|1221|IST722|   M001|        25|      28|
|1221|IST615|   M001|        22|      28|
|1221|IST621|   M001|        22|      24|
|1221|IST687|   M001|        20|      20|
|1221|IST687|   M002|        21|      24|
|1221|IST707|   M001|        28|      28|
|1222|IST659|   M001|        24|      24|
|1222|IST769|   M001|        19|      24|
|1222|IST615|   M001|        19|      24|
|1222|IST714|   M001|        17|      20|
|1222|IST621|   M001|        28|      28|
|1222|IST621|   M002|        22|      24|
|1222|IST687|   M001|        18|      20|
|1222|IST687|   M002|        20|      20|
|1222|IST718|   M001|        28|      28|
|1231|IST659|   M001|        20|      20|
|1231|IST659|   M002|        20|      20|
|1231|IST722|   M001|        23|      28|
+----+------+-------+----------+--

In [None]:
# Read the reference-oriented data terms, students, courses, and program reference data from MongoDb using PySpark
#3a terms
terms = spark.read.format("mongo") \
    .option("database","ischooldb") \
    .option("collection","terms") \
    .load()
#terms.toPandas()
terms= terms.withColumnRenamed("code", "icode")
terms = terms.withColumnRenamed("name","termname")
terms = terms.withColumnRenamed("_id","termid")

terms.show()


+------+-------------+-----+-----------+--------+----+
|termid|academic_year|icode|   termname|semester|year|
+------+-------------+-----+-----------+--------+----+
|  1221|    2021-2022| 1221|  Fall 2021|    Fall|2021|
|  1222|    2021-2022| 1222|Spring 2022|  Spring|2022|
|  1231|    2022-2023| 1231|  Fall 2022|    Fall|2022|
|  1232|    2022-2023| 1232|Spring 2023|  Spring|2023|
+------+-------------+-----+-----------+--------+----+



In [None]:
#3b courses
courses = spark.read.format("mongo") \
    .option("database","ischooldb") \
    .option("collection","courses") \
    .load()
#courses.toPandas()
courses= courses.withColumnRenamed("code", "ischoolcode")
courses = courses.withColumnRenamed("name","coursename")
courses = courses.withColumnRenamed("_id","courseid")

courses.show()

+--------+-----------+-------+--------------------+--------------------+----------------+--------------------+-------------+--------------------+
|courseid|ischoolcode|credits|         description|elective_in_programs| key_assignments|          coursename|prerequisites|required_in_programs|
+--------+-----------+-------+--------------------+--------------------+----------------+--------------------+-------------+--------------------+
|  IST659|     IST659|      3|Definition, devel...|                  []|       [project]|Data Administrati...|           []|            [IS, DS]|
|  IST722|     IST722|      3|Introduction to c...|                [IS]| [project, exam]|    Data Warehousing|     [IST659]|                  []|
|  IST769|     IST769|      3|Analyze relationa...|                [DS]| [project, exam]|Advanced Big Data...|     [IST659]|                  []|
|  IST615|     IST615|      3|Cloud services cr...|                  []|[project, paper]|    Cloud Management|           []|

In [None]:
#3c Programs
programs = spark.read.format("mongo") \
    .option("database","ischooldb") \
    .option("collection","programs") \
    .load()
programs = programs.withColumnRenamed("name","programname")
programs = programs.withColumnRenamed("_id","programid")
programs = programs.withColumnRenamed("credits","programcredits")
programs.toPandas()

programs.show()

+---------+----+--------------+--------------------+--------------------+--------------------+-----------+
|programid|code|programcredits|    elective_courses|         programname|    required_courses|       type|
+---------+----+--------------+--------------------+--------------------+--------------------+-----------+
|       IS|  IS|            36|[IST722, IST714, ...| Information Systems|[IST659, IST615, ...|    Masters|
|       DS|  DS|            34|    [IST769, IST714]|        Data Science|[IST659, IST615, ...|    Masters|
|      BDC| BDC|             9|                null|Data Engineering ...|[IST659, IST722, ...|Certificate|
|      CCC| CCC|             9|                null|Cloud Computing C...|[IST621, IST615, ...|Certificate|
|      MLC| MLC|             9|                null|Machine Learning ...|[IST687, IST707, ...|Certificate|
+---------+----+--------------+--------------------+--------------------+--------------------+-----------+



In [None]:
#3d students
students = spark.read.format("mongo") \
    .option("database","ischooldb") \
    .option("collection","students") \
    .load()
students = students.withColumnRenamed("_id","studentid")
students.show(5)



+------------+-------------+-------+
|   studentid|         name|program|
+------------+-------------+-------+
|    abbykuss|    Abby Kuss|     DS|
|  adamantium|  Adam Antium|     IS|
|   addieowse|   Addie Owse|     IS|
|aidensomewun|Aiden Somewun|     IS|
|aidenknowone|Aiden Knowone|     DS|
+------------+-------------+-------+
only showing top 5 rows



In [None]:
# Prepared the section data for loading into cassandra and elasticsearch with Spark or Spark SQL
#4 wide_sections
sectionNew = sections.join(terms,sections.term == terms.termid, "inner")
#sectionNew.show()

section = sectionNew.join(courses,sectionNew.course==courses.courseid,"inner")
section.show()
section.printSchema()





+----+------+-------+----------+--------+------+-------------+-----+-----------+--------+----+--------+-----------+-------+--------------------+--------------------+----------------+--------------------+-------------+--------------------+
|term|course|section|enrollment|capacity|termid|academic_year|icode|   termname|semester|year|courseid|ischoolcode|credits|         description|elective_in_programs| key_assignments|          coursename|prerequisites|required_in_programs|
+----+------+-------+----------+--------+------+-------------+-----+-----------+--------+----+--------+-----------+-------+--------------------+--------------------+----------------+--------------------+-------------+--------------------+
|1221|IST615|   M001|        22|      28|  1221|    2021-2022| 1221|  Fall 2021|    Fall|2021|  IST615|     IST615|      3|Cloud services cr...|                  []|[project, paper]|    Cloud Management|           []|            [IS, DS]|
|1222|IST615|   M001|        19|      24|  1

                                                                                

In [None]:
# Constructed a python code to connect cassandra within Jupyter and created a keyspace named ischooldb
from cassandra.cluster import Cluster


# Connect to Cassandra cluster
with Cluster([cassandra_host]) as cluster:
    session = cluster.connect()

    drop_table_query = "DROP TABLE IF EXISTS ischooldb.sections;"
    session.execute(drop_table_query)

    # Create keyspace if not exists
    session.execute("CREATE KEYSPACE IF NOT EXISTS ischooldb WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};")

    create_table_query = """
    CREATE TABLE IF NOT EXISTS ischooldb.sections (
        term text,
        course text,
        section text,
        enrollment text,
        capacity text,
        termid text,
        academic_year text,
        icode text,
        termname text,
        coursename text,
        semester text,
        year int,
        courseid text,
        ischoolcode text,
        credits int,
        description text,
        elective_in_programs list<text>,
        key_assignments list<text>,
        prerequisites list<text>,
        required_in_programs list<text>,
        PRIMARY KEY ((section, term), course)
    );
    """
    session.execute(create_table_query)


In [None]:
sections =spark.read.format("org.apache.spark.sql.cassandra")\
    .options(table="sections", keyspace="ischooldb") \
    .load()

sections.show()
sections.printSchema()

+-------+----+------+-------------+--------+--------+----------+-------+-----------+--------------------+----------+-----+-----------+---------------+-------------+--------------------+--------+------+--------+----+
|section|term|course|academic_year|capacity|courseid|coursename|credits|description|elective_in_programs|enrollment|icode|ischoolcode|key_assignments|prerequisites|required_in_programs|semester|termid|termname|year|
+-------+----+------+-------------+--------+--------+----------+-------+-----------+--------------------+----------+-----+-----------+---------------+-------------+--------------------+--------+------+--------+----+
+-------+----+------+-------------+--------+--------+----------+-------+-----------+--------------------+----------+-----+-----------+---------------+-------------+--------------------+--------+------+--------+----+

root
 |-- section: string (nullable = false)
 |-- term: string (nullable = false)
 |-- course: string (nullable = true)
 |-- academic_y

In [None]:
#6 load wide_sections into cassandra

section.write.format("org.apache.spark.sql.cassandra")\
  .mode("Append")\
  .option("table", "sections")\
  .option("keyspace","ischooldb")\
  .save()



                                                                                

In [None]:
df= spark.read.format("org.apache.spark.sql.cassandra") \
       .options(table="sections", keyspace="ischooldb") \
       .load()
df.show()

+-------+----+------+-------------+--------+--------+--------------------+-------+--------------------+--------------------+----------+-----+-----------+----------------+-------------+--------------------+--------+------+-----------+----+
|section|term|course|academic_year|capacity|courseid|          coursename|credits|         description|elective_in_programs|enrollment|icode|ischoolcode| key_assignments|prerequisites|required_in_programs|semester|termid|   termname|year|
+-------+----+------+-------------+--------+--------+--------------------+-------+--------------------+--------------------+----------+-----+-----------+----------------+-------------+--------------------+--------+------+-----------+----+
|   M002|1232|IST615|    2022-2023|      24|  IST615|    Cloud Management|      3|Cloud services cr...|                  []|        20| 1232|     IST615|[project, paper]|           []|            [IS, DS]|  Spring|  1232|Spring 2023|2023|
|   M002|1232|IST621|    2022-2023|      24|

In [None]:
# Flattened the Nested data to load data into Elastic Search
from pyspark.sql.functions import col, expr, array_contains

elastic_sections = section \
    .withColumn("course_is_elective_for_IS", array_contains(col("elective_in_programs"), "IS").cast("string")) \
    .withColumn("course_is_elective_for_DS", array_contains(col("elective_in_programs"), "DS").cast("string")) \
    .withColumn("course_is_required_for_IS", array_contains(col("required_in_programs"), "IS").cast("string")) \
    .withColumn("course_is_required_for_DS", array_contains(col("required_in_programs"), "DS").cast("string")) \
    .drop("elective_in_programs", "required_in_programs", "prerequisites", "key_assignments","icode","ischoolcode","termid","courseid")

elastic_sections.show(34)


+----+------+-------+----------+--------+-------------+-----------+--------+----+-------+--------------------+--------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|term|course|section|enrollment|capacity|academic_year|   termname|semester|year|credits|         description|          coursename|course_is_elective_for_IS|course_is_elective_for_DS|course_is_required_for_IS|course_is_required_for_DS|
+----+------+-------+----------+--------+-------------+-----------+--------+----+-------+--------------------+--------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|1221|IST615|   M001|        22|      28|    2021-2022|  Fall 2021|    Fall|2021|      3|Cloud services cr...|    Cloud Management|                    false|                    false|                     true|                     true|
|1222|IST615|   M001|        19|      24|    2021-2022|S

In [None]:
#8 load wide_sections_flattened into elasticsearch
elastic_sections.write.mode("Overwrite").format("es").save("sections/_doc")


sectionsEs = spark.read.format("es").load("sections/_doc")
sectionsEs.toPandas()


                                                                                

Unnamed: 0,academic_year,capacity,course,course_is_elective_for_DS,course_is_elective_for_IS,course_is_required_for_DS,course_is_required_for_IS,coursename,credits,description,enrollment,section,semester,term,termname,year
0,2021-2022,28,IST615,False,False,True,True,Cloud Management,3,Cloud services creation and management. Practi...,22,M001,Fall,1221,Fall 2021,2021
1,2021-2022,24,IST615,False,False,True,True,Cloud Management,3,Cloud services creation and management. Practi...,19,M001,Spring,1222,Spring 2022,2022
2,2022-2023,24,IST615,False,False,True,True,Cloud Management,3,Cloud services creation and management. Practi...,21,M001,Fall,1231,Fall 2022,2022
3,2022-2023,24,IST615,False,False,True,True,Cloud Management,3,Cloud services creation and management. Practi...,20,M002,Spring,1232,Spring 2023,2023
4,2022-2023,28,IST615,False,False,True,True,Cloud Management,3,Cloud services creation and management. Practi...,21,M001,Spring,1232,Spring 2023,2023
5,2021-2022,20,IST659,False,False,True,True,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",20,M002,Fall,1221,Fall 2021,2021
6,2021-2022,20,IST659,False,False,True,True,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",20,M001,Fall,1221,Fall 2021,2021
7,2021-2022,24,IST659,False,False,True,True,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",24,M001,Spring,1222,Spring 2022,2022
8,2022-2023,20,IST659,False,False,True,True,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",20,M002,Fall,1231,Fall 2022,2022
9,2022-2023,20,IST659,False,False,True,True,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",20,M001,Fall,1231,Fall 2022,2022


In [None]:
#9 create wide_enrollments

programjoin = programs.join(students,programs.programid == students.program, "inner")

programjoin = programjoin.drop("code","program","required_courses","elective_courses")
programjoin.show()


+---------+--------------+------------+-------+-----------------+------------------+
|programid|programcredits| programname|   type|        studentid|              name|
+---------+--------------+------------+-------+-----------------+------------------+
|       DS|            34|Data Science|Masters|         abbykuss|         Abby Kuss|
|       DS|            34|Data Science|Masters|     aidenknowone|     Aiden Knowone|
|       DS|            34|Data Science|Masters|         alfrecso|         Al Frecso|
|       DS|            34|Data Science|Masters|          alkohol|          Al Kohol|
|       DS|            34|Data Science|Masters|amberwavesofgrain|Amber Wavesofgrain|
|       DS|            34|Data Science|Masters|      anitashower|      Anita Shower|
|       DS|            34|Data Science|Masters|    anitasandwich|    Anita Sandwich|
|       DS|            34|Data Science|Masters|       aprilfirst|       April First|
|       DS|            34|Data Science|Masters|       arialphoto|

In [None]:
enrollmentJoin = enrollments.join(section,enrollments.enrollmentterm == section.term,"inner")
enrollmentJoin = enrollmentJoin.drop("enrollmentcourse","termid","icode","ischoolcode","courseid","enrollmentterm","enrollmentsection","elective_in_programs","key_assignments","prerequisites","required_in_programs")
enrollmentJoin.show()

+-----------------+----------------+-----+------------+----+------+-------+----------+--------+-------------+---------+--------+----+-------+--------------------+----------------+
|course_enrollment|      student_id|grade|grade_points|term|course|section|enrollment|capacity|academic_year| termname|semester|year|credits|         description|      coursename|
+-----------------+----------------+-----+------------+----+------+-------+----------+--------+-------------+---------+--------+----+-------+--------------------+----------------+
|               28|      peteterpan|    A|         4.0|1221|IST615|   M001|        22|      28|    2021-2022|Fall 2021|    Fall|2021|      3|Cloud services cr...|Cloud Management|
|               27|      robinbanks|    B|         3.0|1221|IST615|   M001|        22|      28|    2021-2022|Fall 2021|    Fall|2021|      3|Cloud services cr...|Cloud Management|
|               26|    elieenonyewe|    A|         4.0|1221|IST615|   M001|        22|      28|    2

In [None]:
enrollmentsNew = enrollmentJoin.join(programjoin, enrollmentJoin.student_id == programjoin.studentid,"inner")

enrollmentsNew.printSchema()
enrollmentsNew.show()


root
 |-- course_enrollment: string (nullable = true)
 |-- student_id: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- grade_points: string (nullable = true)
 |-- term: string (nullable = true)
 |-- course: string (nullable = true)
 |-- section: string (nullable = true)
 |-- enrollment: string (nullable = true)
 |-- capacity: string (nullable = true)
 |-- academic_year: string (nullable = true)
 |-- termname: string (nullable = true)
 |-- semester: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- credits: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- coursename: string (nullable = true)
 |-- programid: string (nullable = true)
 |-- programcredits: integer (nullable = true)
 |-- programname: string (nullable = true)
 |-- type: string (nullable = true)
 |-- studentid: string (nullable = true)
 |-- name: string (nullable = true)



                                                                                

+-----------------+----------+-----+------------+----+------+-------+----------+--------+-------------+-----------+--------+----+-------+--------------------+--------------------+---------+--------------+-------------------+-------+----------+-----------+
|course_enrollment|student_id|grade|grade_points|term|course|section|enrollment|capacity|academic_year|   termname|semester|year|credits|         description|          coursename|programid|programcredits|        programname|   type| studentid|       name|
+-----------------+----------+-----+------------+----+------+-------+----------+--------+-------------+-----------+--------+----+-------+--------------------+--------------------+---------+--------------+-------------------+-------+----------+-----------+
|                3|  nattural|    A|         4.0|1232|IST615|   M002|        20|      24|    2022-2023|Spring 2023|  Spring|2023|      3|Cloud services cr...|    Cloud Management|       IS|            36|Information Systems|Masters|

In [None]:
#10 wide_enrollments to elastic search
enrollmentsNew.write.mode("Overwrite").format("es").save("enrollments/_doc")
enrollmentEs = spark.read.format("es").load("enrollments/_doc")
enrollmentEs.toPandas()


                                                                                

Unnamed: 0,academic_year,capacity,course,course_enrollment,coursename,credits,description,enrollment,enrollmentterm,grade,...,programid,programname,section,semester,student_id,studentid,term,termname,type,year
0,2021-2022,28,IST615,5,Cloud Management,3,Cloud services creation and management. Practi...,22,,A,...,DS,Data Science,M001,Fall,dustindewinned,dustindewinned,1221,Fall 2021,Masters,2021
1,2021-2022,28,IST615,10,Cloud Management,3,Cloud services creation and management. Practi...,22,,B-,...,DS,Data Science,M001,Fall,dustindewinned,dustindewinned,1221,Fall 2021,Masters,2021
2,2021-2022,24,IST615,15,Cloud Management,3,Cloud services creation and management. Practi...,19,,A-,...,DS,Data Science,M001,Spring,dustindewinned,dustindewinned,1222,Spring 2022,Masters,2022
3,2022-2023,24,IST615,21,Cloud Management,3,Cloud services creation and management. Practi...,21,,A,...,DS,Data Science,M001,Fall,dustindewinned,dustindewinned,1231,Fall 2022,Masters,2022
4,2022-2023,24,IST615,16,Cloud Management,3,Cloud services creation and management. Practi...,21,,A,...,DS,Data Science,M001,Fall,dustindewinned,dustindewinned,1231,Fall 2022,Masters,2022
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6331,2021-2022,28,IST718,8,Big Data Analytics,3,A broad introduction to big data analytical an...,28,,A,...,DS,Data Science,M001,Spring,tydibol,tydibol,1222,Spring 2022,Masters,2022
6332,2022-2023,28,IST718,21,Big Data Analytics,3,A broad introduction to big data analytical an...,28,,A,...,DS,Data Science,M001,Spring,tydibol,tydibol,1232,Spring 2023,Masters,2023
6333,2021-2022,20,IST714,9,Cloud Architecture,3,"Advanced, lab-based exploration of enterprise ...",17,,A,...,DS,Data Science,M001,Spring,tydibol,tydibol,1222,Spring 2022,Masters,2022
6334,2021-2022,20,IST714,8,Cloud Architecture,3,"Advanced, lab-based exploration of enterprise ...",17,,A,...,DS,Data Science,M001,Spring,tydibol,tydibol,1222,Spring 2022,Masters,2022


In [None]:
# This  Spark Code  clears the neo4j database of all nodes and relationships and resets neo4j database
!pip install neo4j

from neo4j import GraphDatabase


neo4j_user = "neo4j_username"
neo4j_password = "neo4j_password"

# Define a function to clear the Neo4j database
def clear_neo4j_database():
    driver = GraphDatabase.driver(bolt_url, auth=(neo4j_user, neo4j_password))
    with driver.session() as session:
        session.run("MATCH (n) DETACH DELETE n")

# Call the function to clear the database
clear_neo4j_database()




In [None]:
#12a Loading courses into Neo4j

cypher = "MERGE (c:Courses {courseid: event.courseid, credits: event.credits, description: event.description, coursename: event.coursename})"
coursesN= courses.select("courseid", "credits", "description", "coursename")
coursesN.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
  .option("url", bolt_url) \
  .option("query",cypher) \
  .save()

course = "MATCH (c:Courses) RETURN c.coursename, c.credits, c.courseid, c.description"

coursecheck = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", bolt_url) \
    .option("query", course) \
    .load()



In [None]:

   coursecheck.show()

+--------------------+---------+----------+--------------------+
|        c.coursename|c.credits|c.courseid|       c.description|
+--------------------+---------+----------+--------------------+
|Data Administrati...|        3|    IST659|Definition, devel...|
|    Data Warehousing|        3|    IST722|Introduction to c...|
|Advanced Big Data...|        3|    IST769|Analyze relationa...|
|    Cloud Management|        3|    IST615|Cloud services cr...|
|  Cloud Architecture|        3|    IST714|Advanced, lab-bas...|
|Information Manag...|        3|    IST621|Information and t...|
|Introduction to D...|        3|    IST687|Introduces inform...|
|Applied Machine L...|        3|    IST707|General overview ...|
|  Big Data Analytics|        3|    IST718|A broad introduct...|
+--------------------+---------+----------+--------------------+



In [None]:
#12b Loading programs into neo4j

cypher = "MERGE (p:Program {programname: event.programname, programcredits: event.programcredits, programid: event.programid, type: event.type})"
program = programs.select("programname", "programcredits", "programid", "type")
program.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
  .option("url", bolt_url) \
  .option("query",cypher) \
  .save()

programcheck = "MATCH (p:Program) RETURN p.programname, p.programcredits, p.programid, p.type"

programdf = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", bolt_url) \
    .option("query", programcheck) \
    .load()

programdf.show()


+--------------------+----------------+-----------+-----------+
|       p.programname|p.programcredits|p.programid|     p.type|
+--------------------+----------------+-----------+-----------+
| Information Systems|              36|         IS|    Masters|
|        Data Science|              34|         DS|    Masters|
|Data Engineering ...|               9|        BDC|Certificate|
|Cloud Computing C...|               9|        CCC|Certificate|
|Machine Learning ...|               9|        MLC|Certificate|
+--------------------+----------------+-----------+-----------+



In [None]:
# Define the Cypher query with parameters to demonstrate relationships in Neo4j


from pyspark.sql.functions import explode_outer
reqcoursesdf = programs.select("programname", "programcredits", "programid", "type", explode_outer("required_courses").alias("required_course"))

cql = '''
MATCH (p:Program), (c:Courses)
where p.programid = event.programid and c.courseid = event.required_course
MERGE (p)-[:REQUIRES]->(c)
'''

new = reqcoursesdf.select("programid","required_course")
new.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
  .option("url", bolt_url) \
  .option("query",cql) \
  .save()
new.show()

q4 = '''
MATCH (p)-[:REQUIRES]->(c)
RETURN p.programname, p.programid, p.programcredits, p.type, c.coursename as required_courses, c.credits, c.description, c.courseid
'''

requiredcourse= spark.read.format("org.neo4j.spark.DataSource") \
  .option("url", bolt_url) \
  .option("query",q4) \
  .load()
requiredcourse.toPandas()


+---------+---------------+
|programid|required_course|
+---------+---------------+
|       IS|         IST659|
|       IS|         IST615|
|       IS|         IST621|
|       DS|         IST659|
|       DS|         IST615|
|       DS|         IST687|
|       DS|         IST718|
|       DS|         IST707|
|      BDC|         IST659|
|      BDC|         IST722|
|      BDC|         IST769|
|      CCC|         IST621|
|      CCC|         IST615|
|      CCC|         IST714|
|      MLC|         IST687|
|      MLC|         IST707|
|      MLC|         IST718|
+---------+---------------+



Unnamed: 0,p.programname,p.programid,p.programcredits,p.type,required_courses,c.credits,c.description,c.courseid
0,Information Systems,IS,36,Masters,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",IST659
1,Information Systems,IS,36,Masters,Cloud Management,3,Cloud services creation and management. Practi...,IST615
2,Information Systems,IS,36,Masters,Information Management and Technology,3,Information and technology management overview...,IST621
3,Data Science,DS,34,Masters,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",IST659
4,Data Science,DS,34,Masters,Cloud Management,3,Cloud services creation and management. Practi...,IST615
5,Data Science,DS,34,Masters,Introduction to Data Science,3,Introduces information professionals to fundam...,IST687
6,Data Science,DS,34,Masters,Big Data Analytics,3,A broad introduction to big data analytical an...,IST718
7,Data Science,DS,34,Masters,Applied Machine Learning,3,General overview of industry standard machine ...,IST707
8,Data Engineering Certificate,BDC,9,Certificate,Data Administration Concepts and Database Mana...,3,"Definition, development, and management of dat...",IST659
9,Data Engineering Certificate,BDC,9,Certificate,Data Warehousing,3,Introduction to concepts of business intellige...,IST722


In [None]:
#13b program course electives
# Define the Cypher query with parameters


from pyspark.sql.functions import explode_outer
elective_courses = programs.select("programname", "programcredits", "programid", "type", explode_outer("elective_courses").alias("elective_course"))

cql = '''
MATCH (p:Program), (c:Courses)
where p.programid = event.programid and c.courseid = event.elective_course
MERGE (p)-[:ELECTIVE]->(c)
'''

new = elective_courses.select("programid","elective_course")
new.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
  .option("url", bolt_url) \
  .option("query",cql) \
  .save()
new.show()

q4 = '''
MATCH (p)-[:ELECTIVE]->(c)
RETURN p.programname, p.programid, p.programcredits, p.type, c.coursename as elective_courses , c.credits, c.description, c.courseid
'''

electivecourse= spark.read.format("org.neo4j.spark.DataSource") \
  .option("url", bolt_url) \
  .option("query",q4) \
  .load()
electivecourse.toPandas()



+---------+---------------+
|programid|elective_course|
+---------+---------------+
|       IS|         IST722|
|       IS|         IST714|
|       IS|         IST687|
|       IS|         IST707|
|       DS|         IST769|
|       DS|         IST714|
|      BDC|           null|
|      CCC|           null|
|      MLC|           null|
+---------+---------------+



Unnamed: 0,p.programname,p.programid,p.programcredits,p.type,elective_courses,c.credits,c.description,c.courseid
0,Information Systems,IS,36,Masters,Data Warehousing,3,Introduction to concepts of business intellige...,IST722
1,Information Systems,IS,36,Masters,Cloud Architecture,3,"Advanced, lab-based exploration of enterprise ...",IST714
2,Information Systems,IS,36,Masters,Introduction to Data Science,3,Introduces information professionals to fundam...,IST687
3,Information Systems,IS,36,Masters,Applied Machine Learning,3,General overview of industry standard machine ...,IST707
4,Data Science,DS,34,Masters,Advanced Big Data Management,3,Analyze relational and non-relational database...,IST769
5,Data Science,DS,34,Masters,Cloud Architecture,3,"Advanced, lab-based exploration of enterprise ...",IST714


In [None]:
#14 course prerequisites

from pyspark.sql.functions import explode_outer

preqcourses = courses.select("coursename", "credits", "courseid", "description", explode_outer("prerequisites").alias("preq"))

cql = '''
MATCH (c:Courses),(k:Courses)
where c.courseid =  event.courseid and k.courseid = event.preq
MERGE (c)-[:PREREQUISITES]->(k)
'''

df = preqcourses.select("courseid","preq")
df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
  .option("url", bolt_url) \
  .option("query",cql) \
  .save()
df.show()

+--------+------+
|courseid|  preq|
+--------+------+
|  IST659|  null|
|  IST722|IST659|
|  IST769|IST659|
|  IST615|  null|
|  IST714|IST615|
|  IST621|  null|
|  IST687|  null|
|  IST707|IST687|
|  IST718|IST687|
+--------+------+



In [None]:
#15 Cypher query courses required in DS and IS

CQL = '''
MATCH (p:Program)
WHERE p.programid IN ['IS', 'DS']
MATCH (p)-[:REQUIRES]->(c:Courses)
WITH c, COUNT(DISTINCT p) AS program_count
WHERE program_count = 2
RETURN c.courseid AS courseid, c.coursename AS course_title
'''

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", bolt_url) \
    .option("query", CQL) \
    .load()

df.show()


+--------+--------------------+
|courseid|        course_title|
+--------+--------------------+
|  IST615|    Cloud Management|
|  IST659|Data Administrati...|
+--------+--------------------+



In [None]:
#16 Cypher to spark table


q4 = '''
MATCH (p)-[:REQUIRES]->(c)
RETURN c.courseid as course_code , c.coursename as course_title , COUNT(c.coursename) as required_courses_count
'''

requiredcourse= spark.read.format("org.neo4j.spark.DataSource") \
  .option("url", bolt_url) \
  .option("query",q4) \
  .load()
requiredcourse.toPandas()


Unnamed: 0,course_code,course_title,required_courses_count
0,IST659,Data Administration Concepts and Database Mana...,3
1,IST615,Cloud Management,3
2,IST621,Information Management and Technology,2
3,IST687,Introduction to Data Science,2
4,IST718,Big Data Analytics,2
5,IST707,Applied Machine Learning,2
6,IST722,Data Warehousing,1
7,IST769,Advanced Big Data Management,1
8,IST714,Cloud Architecture,1
