In [1]:
import re, csv, datetime

import pyspark.sql.functions as sparkfunctions
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkContext

MEDIAWIKI_ARTICLES_DUMPFILE_PATH = 'data/enwiki-latest-pages-articles1.xml'
MEDIAWIKI_ABSTRACTS_DUMPFILE_PATH = 'data/enwiki-latest-abstract.xml'
DBPEDIA_ABSTRACTS_DUMPFILE_PATH = '/data/long-abstracts_lang=en.ttl'

WIKI_ABSTRACTS_SAMPLE_PATH = 'data/raw_sample_data/wiki_abstracts_sample.xml'
DBPEDIA_ABSTRACTS_SAMPLE_PATH = 'data/raw_sample_data/dbpedia_abstracts'

PARSED_ABSTRACTS_PATH = '../../data/document_base/parsed_abstracts_spark.csv'

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
def parse_wiki_articles_file(spark, path):
    abstract_pattern = r"'''([^\=]*)(?=(={1,6})([^\n]+?)(={1,6})[ \t]*(\n|\Z))"
    customSchema = StructType([
        StructField('title', StringType()),
        StructField('revision', StructType([
            StructField('text', StructType([
                StructField('_VALUE', StringType())
            ]))
        ]))
    ])
    df = spark.read.format('com.databricks.spark.xml')\
        .options(rowTag='page').load(path, schema=customSchema)
    all_abstracts_rdd = df.rdd.map(
        lambda loop: (
            loop['title'].replace('Wikipedia:',''),
            abstract_text.group(0).replace('\n', ' ')
            if (abstract_text := re.search(pattern=abstract_pattern,
                                           string=loop['revision']['text']['_VALUE'],
                                           flags=re.MULTILINE | re.DOTALL)
                ) is not None else None
        ))
    all_abstracts = all_abstracts_rdd.toDF(['title', 'abstract'])
    valid_abstracts = all_abstracts.dropna()
    return valid_abstracts

def parse_wiki_abstracts_file(spark, path):
    title_pattern = r"^Wikipedia: ([^\n]*)"
    customSchema = StructType([
        StructField('title', StringType(), False),
        StructField('abstract', StringType(), False)
    ])
    df = spark.read.format("com.databricks.spark.xml")\
        .options(rowTag='doc').load(path, schema=customSchema)
    wiki_abstract_rdd = df.rdd.map(
        lambda loop: (
            re.search(pattern=title_pattern, string=loop['title']).group(1).replace('\n', ' '),
            loop['abstract']
        ))
    all_wiki_abstracts = wiki_abstract_rdd.toDF(['title', 'abstract'])
    valid_abstracts = all_wiki_abstracts.dropna()
    return valid_abstracts

def parse_dbpedia_abstract_file(spark, path):
    title_pattern = r"^<http:\/\/dbpedia\.org\/resource\/([^\>]*)> <"
    abstract_pattern = re.compile(r'"([^"]*)"')
    sc = spark.sparkContext
    rdd = sc.textFile(path)
    rdd = rdd.map(
        lambda loop: (
            title.group(1)
            if (title := re.search(title_pattern, loop)) is not None else None,
            abstract.group(1)
            if (abstract := re.search(abstract_pattern, loop)) is not None else None
        ))
    rdd = rdd.filter(lambda row: filter(None, row))
    dataframe = rdd.toDf(['title', 'abstract'])
    return dataframe

In [3]:
# Parse abstracts from articles
import time, os, glob
data_dir = "d:\Study\STU FIIT\ZS 21-22\Information Retrival\wikidata\wiki_articles"
result_dir = "d:\Study\STU FIIT\ZS 21-22\Information Retrival\wikidata\parsed_wiki_abstracts"

# get all xml files from directory
os.chdir(data_dir)
all_files = glob.glob(f"*.xml-*")
xml_files = []
for file_name in all_files:
    if re.match(r"(.*)(?=(\.bz2))", file_name): continue
    else: xml_files.append(file_name)
print(xml_files)

#
absolute_path = data_dir + '/' + xml_files[0]
if os.path.exists(absolute_path):
        start_time_single = time.time()
        parsed_wikipedia_abstracts_0 = parse_wiki_articles_file(spark, path=data_dir + '/' + xml_files[0])
        # res = parsed_wikipedia_abstracts_0.union(parsed_wikipedia_abstracts_1)
        print(f"parisng time: {time.time() - start_time_single}")
        parsed_wikipedia_abstracts_0.printSchema()

        try:
            parsed_wikipedia_abstracts_0.write.csv(path=result_dir,sep='\t',mode='overwrite')
        except:
            print(f'document {0} - Error, cannot write this file to csv')
        else:
            print(f"document{0} elapsed: {time.time() - start_time_single}")

#multiple articles dump
# start_time = time.time()
# for idx, file in enumerate(xml_files):
#     absolute_path = data_dir + "/" + file
#     mode = 'overwrite' if idx == 0 else 'append'
#     if os.path.exists(absolute_path):
#         start_time_single = time.time()
#         parsed_wikipedia_abstracts = parse_wiki_articles_file(spark, path=absolute_path)
#         print(f"parisng time: {time.time() - start_time_single}")
#         try:
#             parsed_wikipedia_abstracts.write.csv(path=result_dir,sep='\t',mode=mode)
#         except:
#             print(f'document {file} - Error, cannot write this file to csv')
#         else:
#             print(f"document {file} elapsed: {time.time() - start_time_single}")
#
# print(f"elapsed: {time.time() - start_time}")

['enwiki-20211101-pages-articles1.xml-p1p41242', 'enwiki-20211101-pages-articles10.xml-p4045403p5399366', 'enwiki-20211101-pages-articles2.xml-p41243p151573', 'enwiki-20211101-pages-articles3.xml-p151574p311329', 'enwiki-20211101-pages-articles4.xml-p311330p558391', 'enwiki-20211101-pages-articles6.xml-p958046p1483661', 'enwiki-20211101-pages-articles7.xml-p1483662p2134111', 'enwiki-20211101-pages-articles8.xml-p2134112p2936260', 'enwiki-20211101-pages-articles9.xml-p2936261p4045402']
parisng time: 10.799468040466309
root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)

document0 elapsed: 50.28661775588989


In [6]:
absolute_path = data_dir + '/' + xml_files[1]
start_time_single = time.time()
erroring_parsed_wikipedia_abstracts = parse_wiki_articles_file(spark, path=absolute_path)
print(f"parisng time: {time.time() - start_time_single}")
erroring_parsed_wikipedia_abstracts.printSchema()
erroring_parsed_wikipedia_abstracts.count()

parisng time: 2.6326537132263184
root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)



Py4JJavaError: An error occurred while calling o248.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 11.0 failed 1 times, most recent failure: Lost task 12.0 in stage 11.0 (TID 64) (147.175.179.16 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\ollyt\AppData\Local\Temp/ipykernel_592/3782531833.py", line 17, in <lambda>
  File "C:\Users\ollyt\AppData\Local\Programs\Python\Python39\lib\re.py", line 201, in search
    return _compile(pattern, flags).search(string)
TypeError: expected string or bytes-like object

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 619, in main
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 611, in process
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\ollyt\PycharmProject\vinf\WikiAbstractParser\irenv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\ollyt\AppData\Local\Temp/ipykernel_592/3782531833.py", line 17, in <lambda>
  File "C:\Users\ollyt\AppData\Local\Programs\Python\Python39\lib\re.py", line 201, in search
    return _compile(pattern, flags).search(string)
TypeError: expected string or bytes-like object

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [85]:
# parsed_wikipedia_abstracts.count()
 # .csv(path=result_dir, sep='\t', mode='overwrite', header=False)

In [None]:
# Parse abstracts from abstracts dump
abstracts_absolute_path = data_dir + '\enwiki-latest-abstract.xml'
if os.path.exists(abstracts_absolute_path):
    start_time = time.time()
    wiki_abstracts = parse_wiki_abstracts_file(spark, path=abstracts_absolute_path)
    print(f"elapsed: {time.time() - start_time}")
    wiki_abstracts.show(20)

In [5]:
# Parse abstracts from dbpedia dump
dbpedia_absolute_path = data_dir + '\long-abstracts_lang=en.ttl'
if os.path.exists(dbpedia_absolute_path):
    start_time = time.time()
    dbpedia_abstracts = parse_dbpedia_abstract_file(spark, path='../../' + DBPEDIA_ABSTRACTS_DUMPFILE_PATH)
    print(f"elapsed: {time.time() - start_time}")
    dbpedia_abstracts.show(20)

elapsed: 0.04381728172302246
+--------------------+--------------------+
|               title|            abstract|
+--------------------+--------------------+
|                null|                null|
|     Animalia_(book)|Animalia is an il...|
|Agricultural_science|Agricultural scie...|
|              Albedo|Albedo () (Latin:...|
|        Alain_Connes|Alain Connes (Fre...|
|International_Ato...|International Ato...|
|                   A|A or a is the fir...|
|An_American_in_Paris|An American in Pa...|
|List_of_Atlas_Shr...|This is a list of...|
|          Allan_Dwan|Allan Dwan (born ...|
|          Astronomer|An astronomer is ...|
|            Achilles|In Greek mytholog...|
|           Anarchism|Anarchism is a po...|
|        Anthropology|Anthropology is t...|
|              Autism|Autism is a devel...|
|      Academy_Awards|The Academy Award...|
|             Actrius|Actresses (Catala...|
|        Answer_(law)|In law, an Answer...|
|Academy_Award_for...|The Academy Award...|
|Ap

In [8]:
# print(f"count parsed wiki abstracts: {parsed_wikipedia_abstracts.count()}")
# print(f"count dbpedia {dbpedia_abstracts.count()}")
# print(f"count wiki abstracts {wiki_abstracts.count()}")



In [6]:
def write_df_to_csv(dataframe, path_to_csv):
    # dataframe = dataframe.coalesce(1)
    dataframe.write.csv(path_to_csv, sep='\t', header=False, mode='overwrite')

#### PARSED ABSTRACTS TO CSV

In [7]:
start_time = time.time()

write_df_to_csv(parsed_wikipedia_abstracts,
                path_to_csv='../../data/document_base/parsed_abstracts_spark')
print(f"elapsed: {time.time() - start_time}")

elapsed: 43.86532926559448


In [8]:
start_time = time.time()

write_df_to_csv(wiki_abstracts,
                path_to_csv='../../data/document_base/wiki_abstracts_spark')
print(f"elapsed: {time.time() - start_time}")

elapsed: 219.0451045036316


In [9]:

start_time = time.time()

write_df_to_csv(dbpedia_abstracts,
                path_to_csv='../../data/document_base/dbpedia_abstracts_spark')
print(f"elapsed: {time.time() - start_time}")

elapsed: 87.75686526298523


In [14]:
spark.stop()
