In [None]:
import os

# Install java
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
#! pip install --ignore-installed pyspark==2.4.4

! pip install --upgrade -q pyspark==3.1.2 

# Install Spark NLP
! pip install  spark-nlp

openjdk version "1.8.0_292"
OpenJDK Runtime Environment (build 1.8.0_292-8u292-b10-0ubuntu1~18.04-b10)
OpenJDK 64-Bit Server VM (build 25.292-b10, mixed mode)


In [None]:
! pip install neo4j
! pip install tqdm



### Building Knowledge Graph
This application is about organizing information and making it easy to access by humans and computers alike. This is known as a knowledge base. The popularity of knowledge bases in the field of NLP has waned in recent decades as the focus has moved away from "expert systems" to statistical machine learning approaches.


In [None]:
! wget https://dumps.wikimedia.org/simplewiki/latest/simplewiki-latest-pages-articles-multistream.xml.bz2

--2021-12-10 18:25:53--  https://dumps.wikimedia.org/simplewiki/latest/simplewiki-latest-pages-articles-multistream.xml.bz2
Resolving dumps.wikimedia.org (dumps.wikimedia.org)... 208.80.154.7, 2620:0:861:1:208:80:154:7
Connecting to dumps.wikimedia.org (dumps.wikimedia.org)|208.80.154.7|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 231325259 (221M) [application/octet-stream]
Saving to: ‘simplewiki-latest-pages-articles-multistream.xml.bz2.2’


2021-12-10 18:26:38 (4.99 MB/s) - ‘simplewiki-latest-pages-articles-multistream.xml.bz2.2’ saved [231325259/231325259]



### Implement the solution

In [None]:
import json
import re
import pandas as pd
import sparknlp

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import lit, col,udf,explode , split

import sparknlp
from sparknlp import DocumentAssembler, Finisher
from sparknlp.annotator import *

In [None]:
from pyspark.sql.types import MapType, StringType, IntegerType, ArrayType 

In [None]:
import pandas as pd

In [None]:
from neo4j import GraphDatabase, basic_auth
import time
from tqdm import tqdm

In [None]:
from urllib.request import urlopen
import urllib.request

In [None]:
from datetime import datetime

In [None]:
packages = [
    "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.4",
    'com.databricks:spark-xml_2.12:0.9.0'
]

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Knowledge Graph") \
    .config("spark.driver.memory", "12g") \
    .config("spark.jars.packages", ','.join(packages)) \
    .getOrCreate()

In [None]:
# spark.sparkContext.getConf().getAll()

##### Read the data from the wikipedia

In [None]:
data_start_time = time.time()
print(data_start_time)

1639160798.752957


In [None]:
df = spark.read\
    .format('xml')\
    .option("rootTag", "mediawiki")\
    .option("rowTag", "page")\
    .load("simplewiki-latest-pages-articles-multistream.xml.bz2")\
    .persist()

In [None]:
# sections = df.filter('redirect IS NULL').filter('ns == 0').select(col('title'),col('revision.text._VALUE').alias('text'))\
# .filter('title =  "Titanic (1997 movie)"')


In [None]:
def cleanInfoBox(answer):
  infoboxDict = {}
  try:
    x=answer.index("{{Infobox")
    y=answer.index("'''")
    answer = answer[x:y].replace("\n","")
    answer = re.sub(r"<ref.*<\/ref>|<ref.*?\/>|\*|\;","",answer)
    #answer = re.sub(r"<ref.*?\/>|\*|\;","",answer)
    answer = re.sub(r"\s\s+|&nbsp"," ",answer)
    #answer = re.sub(r"&nbsp"," ",answer)
    answer = re.sub(r" = ","=",answer)
    answer = re.sub(r"\<\!\-\-.*?\-\-\>","",answer)
    #answer = re.sub(r"(?<=\[\[)(.*?)(?=\]\])", "\\1,",answer)
    answer = re.sub(r"\[\[(.*?)\]\]", "\\1,",answer)
    answer = re.sub(r"={{(.*?)\|(.*?)}}", "=\\2,",answer)
    answer = re.sub(r"{|}|<br />|<br/>|<br>","",answer)
    #answer = re.sub(r"(\|+.*?=)","#-\\1",answer)
    def replacePipe(answer):
      answer = re.sub(r"(\|.[^=]*?\|)","#-\\1",answer)
      res = re.subn(r"#-\|"," ",answer)
      answer=res[0]
      if res[1]>0:
        replacePipe(answer)
    answer = re.sub(r"\|\|","|",answer)
    infoBox = answer.split("|")
    for item in infoBox:
      if "=" in item:
        key = item.split("=")[0].strip(' ')
        value = item.split("=")[1].rstrip(',').strip(' ')
        if "," in value:
          value = list(filter(None, value.split(",")))
        if len(value)>0:
          infoboxDict[key] = value
    infoboxDict['responseCode'] = "Success"
    return infoboxDict
  except:
    infoboxDict['responseCode'] = "Parsing error"
    return infoboxDict
getInfoboxUDF = udf(lambda x:cleanInfoBox(x))

In [None]:
import time


In [None]:
def getPageIndexinCat(category, continueFlag, gcmcontinue,list_pageIndex):
    list_local = list_pageIndex
    url = "https://simple.wikipedia.org/w/api.php?action=query&generator=categorymembers&gcmlimit=500&gcmtitle=Category:"
    url= "".join([url,category])             
    if(continueFlag == "TRUE"):
      url= "".join([url,"&continue=gcmcontinue||&gcmcontinue="])
      url= "".join([url,gcmcontinue])
    
    url = "".join([url,"&format=json"])
    jsonData = urlopen(url).read()
    json_object = json.loads(jsonData)

    for pageindex in json_object['query']['pages']:
        list_local.append(pageindex)
    try:
      if json_object['continue'] != "":
        getPageIndexinCat(category, "TRUE", json_object['continue']['gcmcontinue'],list_local)
    except:
      print("{} - ERROR Fetching page index for category : {}, Records found: {}".format(datetime.now(), category,len(list_local)))

    print("{} - Fetching page index for category : {}, Records found: {}".format(datetime.now(), category,len(list_local)))
    
    #index_rdd = spark.sparkContext.parallelize(list_local)
    #index_rdd.take(5)
    return list_local

In [None]:
@udf
def cleanInfoBoxtemp(answer):
  infoboxDict = {}
  try:
    x=answer.index("{{Infobox")
    y=answer.index("'''")
    answer = answer[x:y].replace("\n","")
    answer = re.sub(r"<ref.*<\/ref>|<ref.*?\/>|\*|\;","",answer)
    #answer = re.sub(r"<ref.*?\/>|\*|\;","",answer)
    answer = re.sub(r"\s\s+|&nbsp"," ",answer)
    #answer = re.sub(r"&nbsp"," ",answer)
    answer = re.sub(r" = ","=",answer)
    answer = re.sub(r"\<\!\-\-.*?\-\-\>","",answer)
    #answer = re.sub(r"(?<=\[\[)(.*?)(?=\]\])", "\\1,",answer)
    answer = re.sub(r"\[\[(.*?)\]\]", "\\1,",answer)
    answer = re.sub(r"={{(.*?)\|(.*?)}}", "=\\2,",answer)
    answer = re.sub(r"{|}|<br />|<br/>|<br>","",answer)
    #answer = re.sub(r"(\|+.*?=)","#-\\1",answer)
    def replacePipe(answer):
      answer = re.sub(r"(\|.[^=]*?\|)","#-\\1",answer)
      res = re.subn(r"#-\|"," ",answer)
      answer=res[0]
      if res[1]>0:
        replacePipe(answer)
    answer = re.sub(r"\|\|","|",answer)
    infoBox = answer.split("|")
    for item in infoBox:
      if "=" in item:
        key = item.split("=")[0].strip(' ')
        value = item.split("=")[1].rstrip(',').strip(' ')
        if "," in value:
          value = list(filter(None, value.split(",")))
        if len(value)>0:
          infoboxDict[key] = value
    infoboxDict['responseCode'] = "Success"
    return str(infoboxDict)
  except:
    infoboxDict['responseCode'] = "Parsing error"
    return str(infoboxDict)

In [None]:
index_start_time = time.time()
print(index_start_time)

1639160988.5006332


In [None]:
category = "Movies_based_on_books"

pageIndexdf = spark.createDataFrame(getPageIndexinCat(category, "N", "",[]), StringType()).withColumnRenamed("value","PageIndex")

pageContentdf = pageIndexdf.join(df,pageIndexdf.PageIndex == df.id,"inner").select('PageIndex','revision.text._VALUE')

print("Category: {}, Total pages: {}".format(category, pageContentdf.count()))

pageContentdf.persist()






2021-12-10 18:29:50.189683 - ERROR Fetching page index for category : Movies_based_on_books, Records found: 1058
2021-12-10 18:29:50.190877 - Fetching page index for category : Movies_based_on_books, Records found: 1058
2021-12-10 18:29:50.191825 - Fetching page index for category : Movies_based_on_books, Records found: 1058
2021-12-10 18:29:50.192145 - Fetching page index for category : Movies_based_on_books, Records found: 1058
Category: Movies_based_on_books, Total pages: 1055


DataFrame[PageIndex: string, _VALUE: string]

In [None]:
pageidDF = pageContentdf.filter(pageContentdf._VALUE.contains('{{Infobox')).select('PageIndex')
pageidlist = [int(row.PageIndex) for row in pageidDF.collect()]

In [None]:
len(pageidlist)

227

In [None]:
start_time = time.time()
print(start_time)

1639161148.131105


In [None]:
data = df.filter('redirect IS NULL').selectExpr('id','title',
    'revision.text._VALUE AS text'
).filter('redirect IS NULL').filter('ns == 0')

In [None]:
InfoData = data.filter(col('id').isin(pageidlist)).cache()

In [None]:
# InfoData.select(getInfoboxUDF('text').alias('info')).collect()

In [None]:
def extractMovieEntity(info):
  Movie_attrib_list = ['name','budget','released','runtime','gross']
  
  MovieDict ={}
  
  try :
    MovieDict =  {k:info[k] for k in Movie_attrib_list if k in info }
  except:
    MovieDict['responseCode'] = "Parsing error"
  return MovieDict


In [None]:
movieEntityUDF = udf(lambda x : extractMovieEntity(x) ,MapType(StringType(),StringType())  )

In [None]:
infodata= InfoData.select('title',getInfoboxUDF(col('text')).alias('info'))

In [None]:
movieEntityNeodf = infodata.select('title',movieEntityUDF('info').alias('MovieEntity')).toPandas()


In [None]:
movieEntityDFlist = list(movieEntityNeodf['MovieEntity'])

In [None]:
movieEntityLoadDF = pd.DataFrame(movieEntityDFlist)

In [None]:

# movieEntityNeodf['MovieEntity'].to_csv('movieent.csv')

#### Loading entities

In [None]:
global person_role_list
person_role_list= ['starring','producer','writer','director','music']

In [None]:
def extractPersonRelation(info):
  # person_role_list = ['starring','producer','writer','director','music']
  
  PersonRoleArray = []
  PersonNameArray = []
  
  result_array = []
  for k in person_role_list:
    ## iterate through the role list to check if the entry isavailable
    if k in info:
      dictValue = info[k]

    ## check if the value is not a standalone string to be exploded
      if (~isinstance(dictValue,str)) & (len(dictValue) > 0):

        for value in dictValue:
          PersonRoleArray.append(k)
          PersonNameArray.append(value)
          result_array.append(k+"~"+value.strip())
    ## otherwise assign the value
      else:
        PersonRoleArray.append(k)
        PersonNameArray.append(dictValue)
        result_array.append(k+"~"+dictValue.strip())

    
        
  # try :
  #   PersonRoleDict =  {k:info[k] for k in person_role_list if k in info }
  # except:
  #   PersonRoleDict['responseCode'] = "Parsing error"
  return  result_array

In [None]:
### Extracting person and relation

In [None]:
PersonRelationUDF = udf(lambda x : extractPersonRelation(x),ArrayType(StringType()))

In [None]:
infodata.printSchema()


root
 |-- title: string (nullable = true)
 |-- info: string (nullable = true)



In [None]:
relationPersonDF = infodata.select('title',PersonRelationUDF('info').alias('extractPersonRelation')).select('title',explode(col('extractPersonRelation')).alias('PersonRelation'))

In [None]:
movie_person_rel_df = relationPersonDF.select('title',split(col('PersonRelation'),'~')[0].alias('relation'),split(col('PersonRelation'),'~')[1].alias('person')).toPandas()

In [None]:
person_entity = pd.DataFrame(movie_person_rel_df['person'].unique(),columns=['name'])

In [None]:
# movie_person_rel_df.groupby(['person']).count().sort_values(['title'],ascending=False).head(10)

In [None]:
end_time = time.time()
print(end_time)

1639165740.1410468


In [None]:
print(end_time-start_time)
print(start_time-index_start_time)
print(end_time-data_start_time)
print(start_time - data_start_time)

4592.00994181633
159.63047170639038
4941.38808965683
349.3781478404999


 Loading Data into Neo 4 j

In [None]:
class Neo4jConnection:
    
    def __init__(self, uri, user, pwd):
        
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
        except Exception as e:
            print("Failed to create the driver:", e)
        
    def close(self):
        
        if self.__driver is not None:
            self.__driver.close()
        
    def query(self, query, parameters=None, db=None):
        
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        
        try: 
            session = self.__driver.session(database=db) if db is not None else self.__driver.session() 
            response = list(session.run(query, parameters))
        except Exception as e:
            print("Query failed:", e)
        finally: 
            if session is not None:
                session.close()
        return response

In [None]:
uri = 'bolt://3.87.48.16:7687'
pwd = 'prints-discontinuance-holders'
user= 'neo4j'
conn = Neo4jConnection(uri=uri, user=user , pwd=pwd)

In [None]:
delete_all_nodes = 'MATCH (n) DETACH DELETE n;'

conn.query(delete_all_nodes)

[]

In [None]:
query = '''
UNWIND $rows as row
 CREATE (e:Movie {  title : row.name, name : row.name ,budget : row.budget, released :row.released ,runtime :row.runtime } )
 '''
batch_size = 1000
batch_id = 0 
while batch_id < len(movieEntityLoadDF)/batch_size:

  res = conn.query(query, parameters = {'rows':movieEntityLoadDF[batch_id*batch_size: (batch_id+1)*batch_size].reset_index().to_dict('records')})
  batch_id += 1

In [None]:
query = '''
UNWIND $rows as row
 CREATE (e:Person { name : row.name } )
 '''
batch_size = 1000
batch_id = 0 
while batch_id < len(person_entity)/batch_size:

  res = conn.query(query, parameters = {'rows':person_entity[batch_id*batch_size: (batch_id+1)*batch_size].reset_index().to_dict('records')})
  batch_id += 1

In [None]:
# query = ''' 
# UNWIND $rows as row
# MATCH (entity1:Movie {name: row.title}),(entity2:Person {name: row.person})
# CREATE (entity1)-[:LINKED {value: row.relation}]->(entity2)
# '''
# batch_size = 100
# batch_id = 1 

# while batch_id < len(movie_person_rel_df)/batch_size:

#   res = conn.query(query, parameters = {'rows':movie_person_rel_df[batch_id*batch_size: (batch_id+1)*batch_size].reset_index(drop=True).to_dict('records')})
#   batch_id += 1

In [None]:
query = ''' 
UNWIND $rows as row
MATCH (entity1:Movie {name: row.title}),(entity2:Person {name: row.person})
CALL apoc.create.relationship(entity1, row.relation,NULL, entity2) YIELD rel
RETURN entity1.name, type(rel), entity2.name 
'''


batch_size = 10
batch_id = 1 

while batch_id < len(movie_person_rel_df)/batch_size:
# while batch_id < 3:

  res = conn.query(query, parameters = {'rows':movie_person_rel_df[batch_id*batch_size: (batch_id+1)*batch_size].reset_index(drop=True).to_dict('records')})
  batch_id += 1

['starring', 'producer', 'writer', 'director', 'music']