##  <center> Import Job : Marvels CSV Files data into Spark and create Neo4j data. </center> 

## Imports

In [1]:
# Imports

import timeit
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Window
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType,StructField, StringType

# imports  - to connect to neo4j DB
from py2neo import Graph,Node,Relationship

# Draw the neo4j data
##  from scripts.vis import draw

In [2]:
time_start = timeit.default_timer()
print("Start Time: " + str(time_start))

Start Time: 21254.533222461


In [3]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars /home/jovyan/jars/neo4j-connector-apache-spark_2.12-4.1.2_for_spark_3.jar pyspark-shell"

In [4]:
spark = SparkSession.builder.appName('Marvel Graph CSV test data').getOrCreate()

print("Get Spark Time: " + str(timeit.default_timer() - time_start))
print(f"spark version: {spark.version}")


22/07/18 21:06:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/07/18 21:06:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/07/18 21:06:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


Get Spark Time: 2.2902986530025373
spark version: 3.1.2


In [5]:
## Read Nodes data
marvel_nodes_df = spark \
            .read \
            .csv(r"./data/nodes.csv",header = True)

In [6]:
marvel_nodes_df = marvel_nodes_df.withColumn("node",F.trim(F.col("node")))

In [7]:
marvel_nodes_df.show(5)

+--------------------+-----+
|                node| type|
+--------------------+-----+
|             2001 10|comic|
|              2001 8|comic|
|              2001 9|comic|
|24-HOUR MAN/EMMANUEL| hero|
|3-D MAN/CHARLES CHAN| hero|
+--------------------+-----+
only showing top 5 rows



In [8]:
## Read Edges data
marvel_edges_df = spark.read.csv(r"./data/edges.csv",header = True)

marvel_edges_df = marvel_edges_df.withColumn("hero",F.trim(F.col("hero"))).withColumn("comic",F.trim(F.col("comic")))
marvel_edges_df.show(5)

+--------------------+------+
|                hero| comic|
+--------------------+------+
|24-HOUR MAN/EMMANUEL|AA2 35|
|3-D MAN/CHARLES CHAN| AVF 4|
|3-D MAN/CHARLES CHAN| AVF 5|
|3-D MAN/CHARLES CHAN| COC 1|
|3-D MAN/CHARLES CHAN|H2 251|
+--------------------+------+
only showing top 5 rows



In [9]:
## Read hero network data
marvel_network_df = spark.read.csv(r"./data/hero-network.csv",header= True)
marvel_network_df = marvel_network_df.withColumn("hero1",F.trim(F.col("hero1"))).withColumn("hero2",F.trim(F.col("hero2")))

marvel_network_df.show(5)

+--------------------+--------------------+
|               hero1|               hero2|
+--------------------+--------------------+
|       LITTLE, ABNER|      PRINCESS ZANDA|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|
|BLACK PANTHER/T'CHAL|      PRINCESS ZANDA|
|       LITTLE, ABNER|      PRINCESS ZANDA|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|
+--------------------+--------------------+
only showing top 5 rows



## PreProcessing and Pre- Analysis

### Syncing hero and comic names across all the 3 data files (node, edges, hero network) and get the un matched names.

##### Check if all the hero networks does exists in nodes(hero data)

In [10]:
hero_df = marvel_nodes_df.select(F.col("node")).where(F.col("type") == 'hero')

## Un Matched hero1 names
marvel_network_df.select("hero1").distinct() \
.join(hero_df,F.col("hero1") == F.col("node"),"anti").show()

+--------------------+
|               hero1|
+--------------------+
|               BLADE|
|SPIDER-MAN/PETER PAR|
|               SABRE|
+--------------------+



In [11]:
## Un Matched hero2 names
marvel_network_df.select("hero2").distinct() \
.join(hero_df,F.col("hero2") == F.col("node"),"anti").show()

+--------------------+
|               hero2|
+--------------------+
|               BLADE|
|SPIDER-MAN/PETER PAR|
|               SABRE|
+--------------------+



##### Check if all the hero networks does exists in edges(edges data)

In [12]:
## Un Matched hero2 names
marvel_edges_df.select("hero").distinct() \
.join(hero_df,F.col("hero") == F.col("node"),"anti").show()

+--------------------+
|                hero|
+--------------------+
|SPIDER-MAN/PETER ...|
+--------------------+



### *** Cleaning data ***

### Correcting the  hero names values in Hero network dataframe


In [13]:
## Cleaning Nodes data
marvel_nodes_df = marvel_nodes_df.withColumn("node",F.when(F.col("node") == 'SPIDER-MAN/PETER PARKERKER', \
                                                           F.lit("SPIDER-MAN/PETER PARKER")).otherwise(marvel_nodes_df.node))

###### Cleaning Network data

In [14]:
marvel_network_df = marvel_network_df.withColumn("hero1",F.when(F.col("hero1") == 'SPIDER-MAN/PETER PAR', \
                                                                F.lit("SPIDER-MAN/PETER PARKER")) \
                                                         .when(F.col("hero1") == 'BLADE', \
                                                                F.lit("BLADE//")) \
                                                         .when(F.col("hero1") == 'SABRE', \
                                                                F.lit("SABRE//"))
                                                         .otherwise(marvel_network_df.hero1)) \
                                    .withColumn("hero2",F.when(F.col("hero2") == 'SPIDER-MAN/PETER PAR', \
                                                                F.lit("SPIDER-MAN/PETER PARKER")) \
                                                         .when(F.col("hero2") == 'BLADE', \
                                                                F.lit("BLADE//")) \
                                                         .when(F.col("hero2") == 'SABRE', \
                                                                F.lit("SABRE//"))
                                                         .otherwise(marvel_network_df.hero2))

In [15]:
nodes_count = marvel_nodes_df.select(F.col("node")).distinct().count()
print(f"Total Unique nodes: {nodes_count}")


Total Unique nodes: 19090


In [16]:
# Node types
marvel_nodes_df.groupBy(F.col("type")).count().show()

+-----+-----+
| type|count|
+-----+-----+
|comic|12651|
| hero| 6439|
+-----+-----+



In [17]:
#check duplicates in marvels node data:

(marvel_nodes_df.select(F.col("node")).count() > marvel_nodes_df.dropDuplicates(["node"]).count())

False

In [18]:
## Unique Hero's in Edges data
marvel_edges_df.select("hero").distinct().count()

6439

In [19]:
## Unique comic's in Edges data
marvel_edges_df.select("comic").distinct().count()

12651

In [20]:
## Hero network data analysis -Top 5 Networks
marvel_network_df.groupBy("hero1").count().sort(F.desc("count")).show(5)

+--------------------+-----+
|               hero1|count|
+--------------------+-----+
|     CAPTAIN AMERICA| 8149|
|SPIDER-MAN/PETER ...| 6652|
| IRON MAN/TONY STARK| 5850|
|THOR/DR. DONALD BLAK| 5712|
|THING/BENJAMIN J. GR| 5369|
+--------------------+-----+
only showing top 5 rows



In [21]:
## Check if same name appeares with network data.(Valid duplicates in network)
marvel_network_df = marvel_network_df.withColumn("duplicate_network",F.when((marvel_network_df.hero1 == marvel_network_df.hero2),F.lit("True")).otherwise(F.lit("False")))
duplicate_count = marvel_network_df.where(F.col("duplicate_network") == 'True').count()
print(f"{duplicate_count} duplicate relations found in hero network data where hero1 and hero2 are same")

2232 duplicate relations found in hero network data where hero1 and hero2 are same


In [22]:
marvel_network_df.groupBy(F.col("duplicate_network")).count().show()

+-----------------+------+
|duplicate_network| count|
+-----------------+------+
|            False|572235|
|             True|  2232|
+-----------------+------+



In [23]:
## Sample duplicate data
marvel_network_df.where(F.col("duplicate_network") == 'True').show(2)

+--------------------+--------------------+-----------------+
|               hero1|               hero2|duplicate_network|
+--------------------+--------------------+-----------------+
|MISS AMERICA/MADELIN|MISS AMERICA/MADELIN|             True|
|MISS AMERICA/MADELIN|MISS AMERICA/MADELIN|             True|
+--------------------+--------------------+-----------------+
only showing top 2 rows



##### Dropping same networks (hero1 = hero2)

In [24]:
##drop duplicates
marvel_hero_network_unique_df = marvel_network_df.where(F.col("duplicate_network") == 'False').drop(F.col("duplicate_network"))
marvel_hero_network_unique_df.show(2)

+-------------+--------------------+
|        hero1|               hero2|
+-------------+--------------------+
|LITTLE, ABNER|      PRINCESS ZANDA|
|LITTLE, ABNER|BLACK PANTHER/T'CHAL|
+-------------+--------------------+
only showing top 2 rows



#### removing duplicates in network records

In [25]:
marvel_hero_network_final_df = marvel_hero_network_unique_df.distinct()
marvel_hero_network_final_df.count()

224099

In [26]:
marvel_hero_network_final_df.select("hero1").distinct().count()

                                                                                

6206

# Prepare data to import into NEO4J database

###### Import Nodes

In [27]:
## Extract all the heros
marvels_hero_node_df = marvel_nodes_df.where(F.col("type") == 'hero')
marvels_hero_node_df = marvels_hero_node_df.withColumnRenamed("node","name").withColumn("nodeId",F.monotonically_increasing_id()+1)
marvels_hero_node_df.show(5)

+--------------------+----+------+
|                name|type|nodeId|
+--------------------+----+------+
|24-HOUR MAN/EMMANUEL|hero|     1|
|3-D MAN/CHARLES CHAN|hero|     2|
|    4-D MAN/MERCURIO|hero|     3|
|             8-BALL/|hero|     4|
|        ABBOTT, JACK|hero|     5|
+--------------------+----+------+
only showing top 5 rows



In [28]:
## Extract all the comics
marvels_comic_node_df = marvel_nodes_df.where(F.col("type") == 'comic')
marvels_comic_node_df = marvels_comic_node_df.withColumnRenamed("node","name").withColumn("nodeId",F.monotonically_increasing_id()+1)
marvels_comic_node_df.show(5)

+-------+-----+------+
|   name| type|nodeId|
+-------+-----+------+
|2001 10|comic|     1|
| 2001 8|comic|     2|
| 2001 9|comic|     3|
|  A '00|comic|     4|
|  A '01|comic|     5|
+-------+-----+------+
only showing top 5 rows



#### Write Hero Nodes

In [29]:
# ## Write Hero nodes
marvels_hero_node_df \
        .write \
        .format("org.neo4j.spark.DataSource") \
        .option("url", "bolt://neo4j:7687") \
        .option("authentication.type", "basic") \
        .option("authentication.basic.username", "neo4j") \
        .option("authentication.basic.password", "test") \
        .option("labels", ":Hero") \
        .option("node.keys", "nodeId") \
        .mode("Overwrite") \
        .save()

                                                                                

#### Write Comin nodes

In [30]:
# ## Write comic nodes
# marvels_comic_node_df \
#         .write \
#         .format("org.neo4j.spark.DataSource") \
#         .option("url", "bolt://neo4j:7687") \
#         .option("authentication.type", "basic") \
#         .option("authentication.basic.username", "neo4j") \
#         .option("authentication.basic.password", "test") \
#         .option("labels", ":Comic") \
#         .option("node.keys", "nodeId") \
#         .mode("Overwrite") \
#         .save()


## Write Hero --> Appeared In-> Comic (Relations/Edges )


In [31]:
marvel_edges_df.show(2)

+--------------------+------+
|                hero| comic|
+--------------------+------+
|24-HOUR MAN/EMMANUEL|AA2 35|
|3-D MAN/CHARLES CHAN| AVF 4|
+--------------------+------+
only showing top 2 rows



##### Get nodeId's for each Hero and Comic

In [32]:
marvel_hero_interim_df = (marvel_edges_df.join(marvels_hero_node_df,F.col("name") == F.col("hero"), "inner").select(
                                                                                        F.col("nodeId").alias('nodeId_hero'),
                                                                                        F.col("comic")))
marvel_hero_interim_df.show(5)

+-----------+------+
|nodeId_hero| comic|
+-----------+------+
|          1|AA2 35|
|          2| AVF 4|
|          2| AVF 5|
|          2| COC 1|
|          2|H2 251|
+-----------+------+
only showing top 5 rows



In [33]:
marvel_relation_AppearedIn_df = marvel_hero_interim_df.join(marvels_comic_node_df,F.col("comic") == F.col("name"), "inner").select(F.col("nodeId_hero"),
                                                                                                F.col("nodeId").alias("nodeId_comic"))

marvel_relation_AppearedIn_df.show(5)

+-----------+------------+
|nodeId_hero|nodeId_comic|
+-----------+------------+
|          1|         510|
|          2|        1388|
|          2|        1389|
|          2|        2410|
|          2|        4745|
+-----------+------------+
only showing top 5 rows



In [34]:
#Total Relations (Appeared_In) are 
print(f"Total {marvel_relation_AppearedIn_df.count()} Relations/edges (Appeared_In) are found")

Total 96104 Relations/edges (Appeared_In) are found


In [35]:
# marvel_relation_AppearedIn_df \
#         .write \
#         .format("org.neo4j.spark.DataSource") \
#         .option("url", "bolt://0.0.0.0:7687") \
#         .option("authentication.type", "basic") \
#         .option("authentication.basic.username", "neo4j") \
#         .option("authentication.basic.password", "1234512345") \
#         .option("relationship", "APPEARED_IN") \
#         .option("relationship.save.strategy", "keys") \
#         .option("relationship.source.labels", ":Hero") \
#         .option("relationship.source.node.keys", "nodeId_hero:nodeId") \
#         .option("relationship.source.save.mode", "Overwrite") \
#         .option("relationship.target.labels", ":Comic") \
#         .option("relationship.target.node.keys", "nodeId_comic:nodeId") \
#         .option("relationship.target.save.mode", "Overwrite") \
#         .mode("Overwrite") \
#         .save()


## Write Hero --> Appeared with-> Hero (Relations/Edges )


In [36]:
marvel_network_df.show(5)

+--------------------+--------------------+-----------------+
|               hero1|               hero2|duplicate_network|
+--------------------+--------------------+-----------------+
|       LITTLE, ABNER|      PRINCESS ZANDA|            False|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|            False|
|BLACK PANTHER/T'CHAL|      PRINCESS ZANDA|            False|
|       LITTLE, ABNER|      PRINCESS ZANDA|            False|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|            False|
+--------------------+--------------------+-----------------+
only showing top 5 rows



In [37]:
## selecting unique relations
marvel_hero_network_unique_df = marvel_network_df.where(F.col("duplicate_network") == 'False').drop(F.col("duplicate_network"))

In [38]:
marvel_hero_network_unique_df.show(3)

+--------------------+--------------------+
|               hero1|               hero2|
+--------------------+--------------------+
|       LITTLE, ABNER|      PRINCESS ZANDA|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|
|BLACK PANTHER/T'CHAL|      PRINCESS ZANDA|
+--------------------+--------------------+
only showing top 3 rows



In [39]:
## Dropping duplicate network records

marvel_hero_network_final_df = marvel_hero_network_unique_df.distinct()
marvel_hero_network_final_df.count()

224099

In [40]:
marvel_hero_network_final_df.show(3)

+-------------------+--------------------+
|              hero1|               hero2|
+-------------------+--------------------+
|   ROM, SPACEKNIGHT|       RESTON, CLIVE|
|               UATU|THING/BENJAMIN J. GR|
|ANDROMEDA/ANDROMEDA|     CAPTAIN AMERICA|
+-------------------+--------------------+
only showing top 3 rows



##### Get Hero nodeid's

In [41]:
marvels_hero_node_df.show(3)

+--------------------+----+------+
|                name|type|nodeId|
+--------------------+----+------+
|24-HOUR MAN/EMMANUEL|hero|     1|
|3-D MAN/CHARLES CHAN|hero|     2|
|    4-D MAN/MERCURIO|hero|     3|
+--------------------+----+------+
only showing top 3 rows



In [42]:
marvel_network_df1 = marvel_hero_network_final_df.join(marvels_hero_node_df,F.col("hero1") == F.col("name"), "inner").select(
                                                                                        F.col("nodeId").alias('nodeId_hero1'),
                                                                                        F.col("hero2"))
marvel_network_df_final = marvel_network_df1.join(marvels_hero_node_df,F.col("hero2") == F.col("name"), "inner").select(
                                                                                        F.col("nodeId_hero1"),
                                                                                        F.col("nodeId").alias('nodeId_hero2'))

marvel_network_df_final.show(3)

+------------+------------+
|nodeId_hero1|nodeId_hero2|
+------------+------------+
|        4727|        4622|
|        5917|        5704|
|         127|         858|
+------------+------------+
only showing top 3 rows



In [43]:
marvel_network_df_final.count()

224012