In [3]:
"""
This notebook prepares structured data to fed for further analysis.
It creates these datasets:

Page based:

page_info.parquet: 
PageId, PageName, TextLength, NumUniqueWords, NumFiles, 
NumExternalLinks, NumInfoBoxes, NumSections, Categories

this is spark dataframe and partitioned on PageId.

page_links.tsv:
SourcePageId, DestinationPageId

page_inlink_counts.tsv:
PageId, NumInlinks

Category based:

-> Category pages

These are jsons (one per line) with the following structure:
{
    "category_name": "Category name",
    "sub_categories": ["Subcategory 1", "Subcategory 2", ...],
    "articles": [("Article Title 1", page_id_1), ("Article Title 2", page_id_2), ...],
    "parent_categories": ["Parent Category 1", "Parent Category 2", ...],
    "internal_links": ["link 1", "link 2", ...]
}

-> category_id_to_parent_category_ids.tsv

This is a tsv file with the following structure:
CategoryId, ParentCategoryId

Run DFS on the category tree (starting from categories with no parents) to get the following data:

-> category_id_to_dfs_metadata.tsv

This is a tsv file with the following structure:
CategoryId, DiscoveryTime, FinishingTime, Depth, NumDescendants

-> category_edges_dfs_classification.tsv

This is a tsv file with the following structure:
CategoryId, SubCategoryId, EdgeType

EdgeType is one of the following: TreeEdge, BackEdge, ForwardEdge, CrossEdge

-> category_id_to_page_ids.tsv

This is a tsv file with the following structure:
CategoryId, PageId

Note: this mapping is as-is from the category json. If a page is in category c then it may 
or may not be in its parent categories.

-> category_id_to_stats.tsv

This is a tsv file with the following structure:
CategoryId, NumPages, NumChildCategories, NumParentCategories, NumDescendantCategories

Note: NumPages is the sum of number of pages in the category and its descendants.
"""
_ = 1

In [4]:
import pandas as pd
import time, json, os, random, traceback, pyperclip, importlib
import src.wiki_analysis_utils as wiki_analysis_utils
from collections import Counter
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import json

_ = importlib.reload(wiki_analysis_utils)

In [5]:
data_root_dir = r'C:\Users\mohitvyas\MyDesktop\WikipediaDataset\data\\'
NUM_DASK_PARTITIONS = 20

In [6]:
# Initialize SparkSession
spark = SparkSession.builder.appName("PrepareStructuredData")\
    .enableHiveSupport()\
    .getOrCreate()

In [8]:
numbers_rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
squared_numbers_rdd = numbers_rdd.map(lambda x: x*x)
print(squared_numbers_rdd.collect())

[1, 4, 9, 16, 25]


In [9]:
"""
Save page_info.parquet: 
PageName, RedirectTitle, PageId, TextLength, NumInternalLinks, NumUniqueWords, NumFiles,
NumExternalLinks, NumInfoBoxes, NumSections
"""

schema = ["PageName", "RedirectTitle", "PageId", "TextLength", "NumInternalLinks", "NumUniqueWords", "NumFiles", "NumExternalLinks", "NumInfoBoxes", "NumSections", "Categories"]

def process_line(line):
    page = json.loads(line)
    if page['namespace'] == 0:
        return [(page['title'], page.get('redirect_title', None), page['page_id'], page.get('text_length', None), len(page.get('internal_links', [])), page.get('num_unique_words', None), page.get('number_of_files', None), page.get('number_of_external_links', None), page.get('number_of_info_boxes', None), page.get('number_of_sections', None), json.dumps(page.get('categories', [])))]
    return []

# Read and process the files
df = spark.sparkContext\
    .textFile(data_root_dir + 'processed_summaries/part-*.txt')\
    .flatMap(process_line)\
    .toDF(schema)

# Write the DataFrame to a Parquet file, partitioned by PageId
df.write\
    .bucketBy(20, "PageId")\
    .sortBy("PageId")\
    .option("path", data_root_dir + f'structured_data/page_info')\
    .saveAsTable("page_info", format='parquet')

Py4JJavaError: An error occurred while calling o90.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 119 in stage 3.0 failed 1 times, most recent failure: Lost task 119.0 in stage 3.0 (TID 136) (MININT-FGLE9V0 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:511)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:229)
	at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:183)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:700)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:678)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:571)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more


In [9]:
page_name_to_page_id, page_id_to_page_name, error_counts = \
    wiki_analysis_utils.load_page_name_to_id_map(data_root_dir)

Processed 1000000 lines. Fail count: 0. Success count: 1000000. Time taken: 0.03520254294077555 minutes.
Processed 2000000 lines. Fail count: 0. Success count: 2000000. Time taken: 0.0746461828549703 minutes.
Processed 3000000 lines. Fail count: 0. Success count: 3000000. Time taken: 0.11801429986953735 minutes.
Processed 4000000 lines. Fail count: 0. Success count: 4000000. Time taken: 0.17254188060760497 minutes.
Processed 5000000 lines. Fail count: 0. Success count: 5000000. Time taken: 0.224252720673879 minutes.
Processed 6000000 lines. Fail count: 0. Success count: 6000000. Time taken: 0.27663596868515017 minutes.
Processed 7000000 lines. Fail count: 0. Success count: 7000000. Time taken: 0.3174879471460978 minutes.
Processed 8000000 lines. Fail count: 0. Success count: 8000000. Time taken: 0.36652742624282836 minutes.
Processed 9000000 lines. Fail count: 0. Success count: 9000000. Time taken: 0.4145349780718485 minutes.
Processed 10000000 lines. Fail count: 0. Success count: 1000

In [10]:
(len(page_name_to_page_id), len(page_id_to_page_name), len(error_counts))

(16812294, 17753527, 0)

In [11]:
# load source_page_id to destination_page_id for redirect pages
# this will be used to resolve redirects in the cells below

redirect_pid_to_pid, error_counts = \
    wiki_analysis_utils.load_page_redirect_mapping(data_root_dir, page_name_to_page_id)

Processed 1000000 lines. Fail count: 70. Success count: 999930. Time taken: 0.03364824851353963 minutes.
Processed 2000000 lines. Fail count: 204. Success count: 1999796. Time taken: 0.06828335920969646 minutes.
Processed 3000000 lines. Fail count: 391. Success count: 2999609. Time taken: 0.10596334934234619 minutes.
Processed 4000000 lines. Fail count: 587. Success count: 3999413. Time taken: 0.1513728141784668 minutes.
Processed 5000000 lines. Fail count: 770. Success count: 4999230. Time taken: 0.19528584082921346 minutes.
Processed 6000000 lines. Fail count: 956. Success count: 5999044. Time taken: 0.23620920578638713 minutes.
Processed 7000000 lines. Fail count: 1182. Success count: 6998818. Time taken: 0.2848101774851481 minutes.
Processed 8000000 lines. Fail count: 1470. Success count: 7998530. Time taken: 0.33345146973927814 minutes.
Processed 9000000 lines. Fail count: 1709. Success count: 8998291. Time taken: 0.3767818530400594 minutes.
Processed 10000000 lines. Fail count: 1

In [12]:
(len(redirect_pid_to_pid), len(set(redirect_pid_to_pid.values())), len(error_counts))

(10904676, 3690970, 29764)

In [13]:
"""
Save page_links.tsv:
SourcePageId, DestinationPageId
"""

start_time = time.time()
processed_line_count = 0
num_pages_processed = 0
failure_counts = {
    'Unknown Page': 0
}
total_link_count = 0
log_level = 'ERROR'
with open(data_root_dir + 'page_links.tsv', 'w') as page_links_f:
    page_links_f.write("SourcePageId\tDestinationPageId\n")
    for i in range(10):
        with open(data_root_dir + f'processed_summaries/part-{i}.txt', 'r') as f:
            for line in f:
                page = json.loads(line)
                namespace = page['namespace']
                title = page['title']
                page_id = page['page_id']
                redirect_title = page['redirect_title']

                if namespace == 0 and not redirect_title:
                    for link in page['internal_links']:
                        link = wiki_analysis_utils.normalized_page_name(link)
                        if link.startswith(':category:'):
                            continue
                        total_link_count += 1
                        if link not in page_name_to_page_id:
                            if log_level in ['WARN', 'INFO']:
                                print(f"Page {link} in page {page_id} not found in page name to id mapping.")
                            failure_counts['Unknown Page'] += 1
                            continue
                        dest_page_id = page_name_to_page_id[link]
                        while dest_page_id in redirect_pid_to_pid:
                            dest_page_id = redirect_pid_to_pid[dest_page_id]
                        page_links_f.write(f"{page_id}\t{dest_page_id}\n")
                    num_pages_processed += 1
                processed_line_count += 1
                if processed_line_count % 1000000 == 0:
                    print(f"Processed {processed_line_count} lines. {num_pages_processed} pages found till now.")
                    #break

        print(f"Finished part {i} in {(time.time() - start_time) / 60} minutes")
        #break

print (f"Failure counts: {failure_counts}")
print (f"{sum(failure_counts.values())} / {total_link_count} ({(sum(failure_counts.values()) / total_link_count) * 100}%) links couldn't be captured.")

Processed 1000000 lines. 520258 pages found till now.
Processed 2000000 lines. 967758 pages found till now.
Finished part 0 in 7.403570981820424 minutes
Processed 3000000 lines. 1375964 pages found till now.
Processed 4000000 lines. 1777375 pages found till now.
Finished part 1 in 10.953632120291392 minutes
Processed 5000000 lines. 2103953 pages found till now.
Processed 6000000 lines. 2488972 pages found till now.
Finished part 2 in 13.118326369921366 minutes
Processed 7000000 lines. 2860437 pages found till now.
Processed 8000000 lines. 3191915 pages found till now.
Finished part 3 in 15.521819802125295 minutes
Processed 9000000 lines. 3503469 pages found till now.
Processed 10000000 lines. 3842350 pages found till now.
Finished part 4 in 17.97135885953903 minutes
Processed 11000000 lines. 4156171 pages found till now.
Processed 12000000 lines. 4498292 pages found till now.
Finished part 5 in 19.871130939324697 minutes
Processed 13000000 lines. 4834557 pages found till now.
Processed

In [14]:
pdf = pd.read_csv(data_root_dir + 'page_links.tsv', sep='\t', nrows=4000)
pdf['SourcePageName'] = pdf['SourcePageId'].map(page_id_to_page_name)
pdf['DestinationPageName'] = pdf['DestinationPageId'].map(page_id_to_page_name)
pdf['SourcePageId'].value_counts()

303    826
316    802
307    549
308    514
305    428
12     387
290    242
39     146
309    106
Name: SourcePageId, dtype: int64

In [15]:
internal_links = pdf.query('SourcePageId==290')['DestinationPageName'].to_list()
print (len(internal_links))
pyperclip.copy(json.dumps(sorted(internal_links), indent=2))
sorted(internal_links)

242


['a (cyrillic)',
 'a (cyrillic)',
 'a (indic)',
 'a with breve (cyrillic)',
 'a-list',
 'aardvark',
 'abo blood group system',
 'afrikaans',
 'aleph',
 'aleph',
 'aleph',
 'aleph number',
 'algebra',
 'allograph',
 'alpha',
 'alpha',
 'alphabet',
 'anarchist symbolism',
 'ancient greece',
 'angstrom',
 'ansuz (rune)',
 'apple',
 'argentine austral',
 'argentine austral',
 'armenian alphabet',
 'article (grammar)',
 'ascii',
 'asymmetry',
 'at sign',
 'at sign',
 'australian english',
 'ayb (armenian letter)',
 'ayb (armenian letter)',
 'azerbaijani language',
 'bar (diacritic)',
 'bashkir language',
 'bengali alphabet',
 'blackletter',
 'bra',
 'bulgarian language',
 'carolingian minuscule',
 'caron',
 'catalan dialects',
 'catalan dialects',
 'catalan language',
 'chemnitz dialect',
 'chuvash language',
 'close-mid front unrounded vowel',
 'code point',
 'combining character',
 'consonant',
 'coptic script',
 'cursive',
 'cyrillic script',
 'czech language',
 'danish language',
 'deci

In [16]:
"""
Save page_inlink_counts.tsv:
PageId, NumInlinks
"""

page_inlink_counts = {}
start_time = time.time()
line_no = 0
with open(data_root_dir + 'page_links.tsv', 'r') as f:
    for line in f:
        line_no += 1
        if line_no==1: continue
        source_page, destination_page = line.split('\t')
        destination_page = int(destination_page)
        page_inlink_counts[destination_page] = page_inlink_counts.get(destination_page, 0) + 1
        if line_no % 10000000 == 0:
            print(f"Processed {line_no} lines. Time taken: {(time.time() - start_time) / 60} minutes.")
            # break

print (f"Processed {line_no} lines. Time taken: {(time.time() - start_time) / 60} minutes.")

with open(data_root_dir + 'page_inlink_counts.tsv', 'w') as f:
    f.write("PageId\tNumInlinks\n")
    for page_id, num_inlinks in sorted(page_inlink_counts.items()):
        f.write(f"{page_id}\t{num_inlinks}\n")

Processed 10000000 lines. Time taken: 0.2552937984466553 minutes.
Processed 20000000 lines. Time taken: 0.4910214861234029 minutes.
Processed 30000000 lines. Time taken: 0.7426513274510701 minutes.
Processed 40000000 lines. Time taken: 1.0243276198705038 minutes.
Processed 50000000 lines. Time taken: 1.3267032305399578 minutes.
Processed 60000000 lines. Time taken: 1.6137513518333435 minutes.
Processed 70000000 lines. Time taken: 1.9017590920130412 minutes.
Processed 80000000 lines. Time taken: 2.191321623325348 minutes.
Processed 90000000 lines. Time taken: 2.490134263038635 minutes.
Processed 100000000 lines. Time taken: 2.7884846766789755 minutes.
Processed 110000000 lines. Time taken: 3.0721547563870746 minutes.
Processed 120000000 lines. Time taken: 3.351078001658122 minutes.
Processed 130000000 lines. Time taken: 3.6386491854985556 minutes.
Processed 140000000 lines. Time taken: 3.9113255540529885 minutes.
Processed 150000000 lines. Time taken: 4.21607338587443 minutes.
Processed

In [17]:
len(page_inlink_counts)

6349743

In [40]:
# combine information from full dataset and save category pages from the processed summaries

"""
prepare category page details
1. List of sub-categories
2. List of articles
3. List of parent categories
4. List of internal links on category page

To parallelize, 
1. dump data for each partition separately
2. process categories based on their hashes in N parts and dumps complete info for each category
"""

def get_empty_category_data():
    return {
        'sub_categories': set(),
        'internal_links': [],
        'articles': [],
        'parent_categories': set()
    }

start_time = time.time()
processed_line_count = 0
norm = wiki_analysis_utils.normalized_page_name
for i in range(10):
    category_name_to_data = {}
    with open(data_root_dir + f'processed_summaries/part-{i}.txt', 'r') as f:
        for line in f:
            page = json.loads(line)
            namespace = page['namespace']
            title = page['title']
            page_id = page['page_id']
            redirect_title = page['redirect_title']

            if namespace == 14:
                category_name = norm(title)
                category_name_to_data[category_name] = category_name_to_data\
                    .get(category_name, get_empty_category_data())
                category_data = category_name_to_data[category_name]
                category_data['internal_links'].extend(map(norm, page['internal_links']))
                category_data['parent_categories'].update(map(norm, page['categories']))
                category_data['category_id'] = int(page_id)
                for parent_category in page['categories']:
                    parent_category = norm(parent_category)
                    category_name_to_data[parent_category] = category_name_to_data\
                        .get(parent_category, get_empty_category_data())
                    category_name_to_data[parent_category]['sub_categories'].add(category_name)
            elif namespace == 0 and not redirect_title:
                for category in page['categories']:
                    category = norm(category)
                    category_name_to_data[category] = category_name_to_data\
                        .get(category, get_empty_category_data())
                    page_id = int(page_id)
                    page_id = redirect_pid_to_pid.get(page_id, page_id)
                    category_name_to_data[category]['articles'].append((title, page_id))
            processed_line_count += 1
            if processed_line_count % 1000000 == 0:
                print(f"Processed {processed_line_count} lines in {(time.time() - start_time) / 60} minutes.")
    os.makedirs(data_root_dir+'tmp/', exist_ok=True)
    print (f"Dumping data for part {i} containing {len(category_name_to_data)} category partial jsons.")
    with open(data_root_dir + f'tmp/partial_category_page_data_part-{i}.txt', 'w') as out_f:
        for cat_name, data in category_name_to_data.items():
            data["category_name"] = cat_name
            data['sub_categories'] = list(data['sub_categories'])
            data['parent_categories'] = list(data['parent_categories'])
            out_f.write(json.dumps(data)+"\n")
    print(f"Finished part {i} in {(time.time() - start_time) / 60} minutes")
    #break

Processed 1000000 lines in 0.6364568471908569 minutes.
Processed 2000000 lines in 1.1012573758761088 minutes.
Dumping data for part 0 containing 934933 category partial jsons.
Finished part 0 in 1.5530202349026998 minutes
Processed 3000000 lines in 1.9946431159973144 minutes.
Processed 4000000 lines in 2.4322856227556864 minutes.
Dumping data for part 1 containing 885607 category partial jsons.
Finished part 1 in 2.8973546028137207 minutes
Processed 5000000 lines in 3.1667434334754945 minutes.
Processed 6000000 lines in 3.5835325280825296 minutes.
Dumping data for part 2 containing 830371 category partial jsons.
Finished part 2 in 3.9203997294108075 minutes
Processed 7000000 lines in 4.218890984853108 minutes.
Processed 8000000 lines in 4.613288362820943 minutes.
Dumping data for part 3 containing 855587 category partial jsons.
Finished part 3 in 4.940384646256764 minutes
Processed 9000000 lines in 5.235450784365336 minutes.
Processed 10000000 lines in 5.667127799987793 minutes.
Dumpin

In [41]:
os.makedirs(data_root_dir+'category_pages/', exist_ok=True)
start_time = time.time()
for partition in range(NUM_PARTITIONS):
    category_name_to_data = {}
    for i in range(10):
        with open(data_root_dir + f'tmp/partial_category_page_data_part-{i}.txt', 'r') as f:
            for line in f:
                line = line.strip()
                if line=="": continue
                data = json.loads(line)
                if (hash(data['category_name']) % NUM_PARTITIONS) != partition:
                    continue
                if data['category_name'] not in category_name_to_data:
                    category_name_to_data[data['category_name']] = get_empty_category_data()
                category_data = category_name_to_data[data['category_name']]
                if 'category_id' in data:
                    category_data['category_id'] = data['category_id']
                category_data['sub_categories'].update(data['sub_categories'])
                category_data['parent_categories'].update(data['parent_categories'])
                category_data['internal_links'].extend(data['internal_links'])
                category_data['articles'].extend(data['articles'])
    with open(data_root_dir + f'category_pages/part-{partition}.txt', 'w') as out_f:
        for cat_name, data in category_name_to_data.items():
            data["category_name"] = cat_name
            data['sub_categories'] = list(data['sub_categories'])
            data['parent_categories'] = list(data['parent_categories'])
            out_f.write(json.dumps(data)+"\n")
    print(f"Finished part {partition} in {(time.time() - start_time) / 60} minutes")

                

Finished part 0 in 2.061130122343699 minutes
Finished part 1 in 3.9293649673461912 minutes
Finished part 2 in 5.823503355185191 minutes
Finished part 3 in 8.542663621902467 minutes
Finished part 4 in 10.408614214261373 minutes
Finished part 5 in 12.252984670797984 minutes
Finished part 6 in 14.039662718772888 minutes
Finished part 7 in 15.66956444978714 minutes
Finished part 8 in 17.287773271401722 minutes
Finished part 9 in 18.950868968168894 minutes


In [42]:
# check for a few category pages that they look good

selected_categories = ["Platonic solids", "Sampling (statistics)", "Drama films", "Comedy novels"]
category_pages = wiki_analysis_utils.load_category_pages(data_root_dir, selected_categories=selected_categories)

Processed till part 0 in 0.07508935133616129 minutes
Processed till part 1 in 0.1482577443122864 minutes
Processed till part 2 in 0.2128618319829305 minutes
Processed till part 3 in 0.2742927074432373 minutes
Processed till part 4 in 0.35173648595809937 minutes
Processed till part 5 in 0.47813774744669596 minutes
Processed till part 6 in 0.5374097108840943 minutes
Processed till part 7 in 0.595884398619334 minutes
Processed till part 8 in 0.6622572024663289 minutes
Processed till part 9 in 0.7481602390607198 minutes


In [43]:
category_pages['selected'][1]
# category_pages['random'][2]

{'sub_categories': ['sampling techniques',
  'empirical evidence',
  'sample statistics',
  'survey methodology'],
 'internal_links': [],
 'articles': [['Census', 6889],
  ['Sampling bias', 17692],
  ['Rock paper scissors', 27032],
  ['Statistical unit', 27580],
  ['Stratified sampling', 27596],
  ['Infrastructure bias', 47280],
  ['Sampling (statistics)', 160361],
  ['Autodidacticism', 255591],
  ['Opinion poll', 277315],
  ['Margin of error', 277379],
  ['Self-selection bias', 292154],
  ['Lottery machine', 379930],
  ['Selection bias', 394392],
  ['Coin flipping', 494410],
  ['Sampling distribution', 520670],
  ['Recall bias', 1360950],
  ['Survivorship bias', 1745325],
  ['Sample size determination', 1776839],
  ['Sampling error', 1955561],
  ['Sampling frame', 2050041],
  ['Odds and evens (hand game)', 2234844],
  ['Sampling fraction', 2719222],
  ["Whipple's index", 4039291],
  ['Selective recruitment', 5054888],
  ['Scale analysis (statistics)', 6055749],
  ['Expander walk sampl

In [7]:
# load category name to id mappings from the category pages
categories, failure_counts = wiki_analysis_utils.load_category_name_to_id_map(data_root_dir, silent=True)

In [7]:
"""
-> category_id_to_parent_category_ids.tsv

This is a tsv file with the following structure:
CategoryId, ParentCategoryId
"""

category_id_to_data = {}

start_time = time.time()
failure_counts = {
    'Unknown Category': 0,
    'Unknown ParentCategory': 0
}
total_counts = {
    'Category': 0,
    'ParentCategory': 0
}
log_level = 'ERROR'
norm = wiki_analysis_utils.normalized_page_name
with open(data_root_dir + 'category_id_to_parent_category_ids.tsv', 'w') as out_f:
    out_f.write("CategoryId\tParentCategoryId\n")
    for partition in range(NUM_PARTITIONS):
        with open(data_root_dir + f'category_pages/part-{partition}.txt', 'r') as f:
            for line in f:
                if line=='': continue
                category_json = json.loads(line)
                total_counts['Category'] += 1
                category_id = category_json.get('category_id', None)
                if category_id is None:
                    if log_level in ['WARN', 'INFO']:
                        print(f"Category id not present in {line}.")
                    failure_counts['Unknown Category'] += 1
                    continue
                for parent_category in set(map(norm, category_json['parent_categories'])):
                    total_counts['ParentCategory'] += 1
                    parent_category_id = categories['name_to_id'].get(parent_category, None)
                    if parent_category_id is None:
                        if log_level in ['WARN', 'INFO']:
                            print(f"Parent category {parent_category} not found in category name to id mapping.")
                        failure_counts['Unknown ParentCategory'] += 1
                        continue
                    out_f.write(f"{category_id}\t{parent_category_id}\n")
        print(f"Processed till part {partition} in {(time.time() - start_time) / 60} minutes")

print (f"Failure counts: {failure_counts}")
print (f"Total counts: {total_counts}")


Processed till part 0 in 0.12108764251073202 minutes
Processed till part 1 in 0.23310102224349977 minutes
Processed till part 2 in 0.3451609174410502 minutes
Processed till part 3 in 0.4603490591049194 minutes
Processed till part 4 in 0.6193542083104452 minutes
Processed till part 5 in 0.7521270553270976 minutes
Processed till part 6 in 0.9076026757558187 minutes
Processed till part 7 in 1.0632949948310852 minutes
Processed till part 8 in 1.2504449168841043 minutes
Processed till part 9 in 1.5214941342671713 minutes
Failure counts: {'Unknown Category': 22359, 'Unknown ParentCategory': 170572}
Total counts: {'Category': 2369155, 'ParentCategory': 4766447}


In [4]:
parent_graph_adj_lists, child_graph_adj_lists = wiki_analysis_utils\
    .load_category_graph(data_root_dir, silent=True)

Loaded 500000 edges in 0.49964474042256674 minutes
Loaded 1000000 edges in 1.0060873905817667 minutes
Loaded 1500000 edges in 1.493864639600118 minutes
Loaded 2000000 edges in 2.1164226730664573 minutes
Loaded 2500000 edges in 2.6897119919459027 minutes
Loaded 3000000 edges in 3.182141168912252 minutes
Loaded 3500000 edges in 3.659866400559743 minutes
Loaded 4000000 edges in 4.230882132053376 minutes
Loaded 4500000 edges in 4.730266630649567 minutes


In [5]:
len(parent_graph_adj_lists), len(child_graph_adj_lists)

(1790399, 866913)

In [8]:
# lots of categories are isolated, i.e. they have no parent or child categories
# this is mainly due to parsing errors due to unexpanded templates
root_categories = [cat for cat in categories['id_to_name'] if cat not in parent_graph_adj_lists]
print (f"There are {len(root_categories)} root categories")

roots_with_children = [cat for cat in root_categories if cat in child_graph_adj_lists]
print (f"There are {len(roots_with_children)} root categories with children")

roots_with_no_children = [cat for cat in root_categories if cat not in child_graph_adj_lists]
print (f"There are {len(roots_with_no_children)} root categories with no children")

There are 556397 root categories
There are 72888 root categories with children
There are 483509 root categories with no children
There are 483509 root categories with no children


In [9]:
# # sample 10 random categories from the root categories of different types to see what they are
# sample_roots_children = random.sample(roots_with_children, 10)
# print ("Root categories with children:")
# for cat in sample_roots_children:
#     print (cat, categories['id_to_name'][cat], len(child_graph_adj_lists[cat]))

# print ("\nRoot categories without children")
# sample_roots_no_children = random.sample(roots_with_no_children, 10)
# for cat in sample_roots_no_children:
#     print (cat, categories['id_to_name'][cat])

Before proceeding, lets first understand the category DAG a little:

1. Verify that there are no cycles in the category DAG. 
    - label each edges as forward, backward, cross, or tree edges.
    - count and investigate. Remove the cycles (if any)
2. Do the root categorizes share a lot of descendants or it is a loose collection of small DAGs?
    - show distribution of number of descendants for roots
3. How deep is the DAG?
    - show distribution of depth of nodes


In [67]:
dfs_color = {}
dfs_discovery_time = {}
dfs_finishing_time = {}
dfs_depth = {}
edge_to_type = {}

def dfs_visit(category_id, dfs_time, depth):
    dfs_color[category_id] = 'GRAY'
    dfs_time += 1
    dfs_discovery_time[category_id] = dfs_time
    dfs_depth[category_id] = depth
    for child_category_id in child_graph_adj_lists.get(category_id, []):
        if dfs_color.get(child_category_id, 'WHITE') == 'WHITE':
            edge_to_type[(category_id, child_category_id)] = 'TreeEdge'
            dfs_time = dfs_visit(child_category_id, dfs_time, depth + 1)
        elif dfs_color[child_category_id] == 'GRAY':
            edge_to_type[(category_id, child_category_id)] = 'BackEdge'
        elif dfs_discovery_time[category_id] < dfs_discovery_time[child_category_id]:
            edge_to_type[(category_id, child_category_id)] = 'ForwardEdge'
        else:
            edge_to_type[(category_id, child_category_id)] = 'CrossEdge'
    dfs_color[category_id] = 'BLACK'
    dfs_time += 1
    dfs_finishing_time[category_id] = dfs_time
    return dfs_time

dfs_time = 0
for root_category in root_categories:
    if dfs_color.get(root_category, 'WHITE') == 'WHITE':
        dfs_time = dfs_visit(root_category, dfs_time, 0)

print (f"{len(dfs_color)} categories discovered based on root categories. ({len(dfs_color) / len(categories['id_to_name']) * 100}%)")
# there are some very few isolated cycles in the graph. So we need to run dfs on them as well
for category_id in categories['id_to_name']:
    if dfs_color.get(category_id, 'WHITE') == 'WHITE':
        dfs_time = dfs_visit(category_id, dfs_time, 0)

2346789 categories discovered based on root categories. (99.99970172098469%)


In [68]:
# calculate number of descendants for each category
dfs_num_descendants = {}

def dfs_num_descendants_visit(cid):
    num_descendants = 1
    for child_cid in child_graph_adj_lists.get(cid, []):
        if child_cid not in dfs_num_descendants and edge_to_type[(cid, child_cid)] == 'TreeEdge':
            dfs_num_descendants_visit(child_cid)
            num_descendants += dfs_num_descendants[child_cid]
    dfs_num_descendants[cid] = num_descendants

for cid in categories['id_to_name']:
    if cid not in dfs_num_descendants:
        dfs_num_descendants_visit(cid)

In [88]:
max(dfs_num_descendants.values())

718360

In [55]:
# analyze edge types
edge_type_counts = Counter(edge_to_type.values())
# wow! there are some back edges in the graph. 
# Fortunate not a lot of them. Same for forward edges.
print (edge_type_counts)


Counter({'CrossEdge': 2763815, 'TreeEdge': 1790392, 'ForwardEdge': 37798, 'BackEdge': 3862})


In [56]:
# lets look at some cycles to see if they actually exist in the data
cycles = []
for edge, edge_type in edge_to_type.items():
    if edge_type == 'BackEdge':
        u, v = edge
        # trace v -> u path by following tree edges starting from u
        path = [u]
        while path[-1] != v:
            for parent in parent_graph_adj_lists[path[-1]]:
                if edge_to_type[parent, path[-1]] == 'TreeEdge':
                    path.append(parent)
                    break
        path.append(u)
        cycles.append(path)

In [57]:
# see distribution of cycle lengths
# wow! median is 225! what are these crazy chains of categories?
pd.DataFrame([len(cycle) for cycle in cycles], columns=['CycleLength']).describe()

Unnamed: 0,CycleLength
count,3862.0
mean,276.170637
std,255.122625
min,2.0
25%,14.0
50%,225.0
75%,473.0
max,976.0


In [58]:
cycle_idx = 345
cycle_cat_names = [categories['id_to_name'][cat] for cat in cycles[cycle_idx]]
print (cycles[cycle_idx])
print (cycle_cat_names)

[2209407, 728172, 743165, 62779982, 2360882, 36231667, 11908750, 3199407, 743167, 31292581, 742730, 43934264, 42630814, 37654739, 36513227, 30939454, 945188, 2209407]
['languages of kuwait', 'culture of kuwait', 'kuwait', 'eastern arabia', 'persian gulf', 'bodies of water of bahrain', 'landforms of bahrain', 'geography of bahrain', 'bahrain', 'member states of the arab league', 'arab league', 'pan-arabist organizations', 'arab nationalist organizations', 'arab organizations', 'arab culture', 'arabic-language culture', 'arabic language', 'languages of kuwait']


In [59]:
"""
Cycle examples in catagory graph:

[36248715, 62373302, 36248715]
['kootenay ice players', 'winnipeg ice players', 'kootenay ice players']

[4216522, 4285903, 38997059, 4216009, 4299011, 4216522]
['colorado desert', 'deserts of the lower colorado river valley', 'natural history of the lower colorado river valley', 'lower colorado river valley', 'colorado river', 'colorado desert']

[2209407, 728172, 743165, 62779982, 2360882, 36231667, 11908750, 3199407, 743167, 31292581, 742730, 43934264, 42630814, 37654739, 36513227, 30939454, 945188, 2209407]
['languages of kuwait', 'culture of kuwait', 'kuwait', 'eastern arabia', 'persian gulf', 'bodies of water of bahrain', 'landforms of bahrain', 'geography of bahrain', 'bahrain', 'member states of the arab league', 'arab league', 'pan-arabist organizations', 'arab nationalist organizations', 'arab organizations', 'arab culture', 'arabic-language culture', 'arabic language', 'languages of kuwait']
"""
_=1

In [60]:
# lets look at some forward edges
forward_edge_dup_paths = []
for edge, edge_type in edge_to_type.items():
    if edge_type == 'ForwardEdge':
        u, v = edge
        # trace u -> v path by following tree edges starting from u
        path = [v]
        while path[-1] != u:
            for parent in parent_graph_adj_lists[path[-1]]:
                if edge_to_type[parent, path[-1]] == 'TreeEdge':
                    path.append(parent)
                    break
        forward_edge_dup_paths.append(path)

In [61]:
# see distribution of forward edge tree path lengths
# forward edges don't hop too much. Median is 3. These look much more benign.
pd.DataFrame([len(path) for path in forward_edge_dup_paths], columns=['PathLength']).describe()

Unnamed: 0,PathLength
count,37798.0
mean,38.99283
std,120.625247
min,3.0
25%,3.0
50%,3.0
75%,4.0
max,979.0


In [62]:
path_idx = 0
path_cat_names = [categories['id_to_name'][cat] for cat in forward_edge_dup_paths[path_idx]]
print (forward_edge_dup_paths[path_idx])
print (path_cat_names)

[68451063, 75485509, 3021025]
['residential schools in saskatchewan', 'former schools in saskatchewan', 'schools in saskatchewan']


In [73]:
# see distribution of number of descendants
# as expected, most categories have no descendants and some have a lot
pd.DataFrame(list(dfs_num_descendants.values()), columns=['NumDescendants']).describe()

Unnamed: 0,NumDescendants
count,2346796.0
mean,141.4041
std,7997.034
min,1.0
25%,1.0
50%,1.0
75%,1.0
max,718360.0


In [74]:
cats_descendants = []
for cid, num_descendants in dfs_num_descendants.items():
    if num_descendants > 1:
        cats_descendants.append({
            'CategoryName': categories['id_to_name'][cid],
            'NumDescendants': num_descendants
        })
cats_descendants = pd.DataFrame(cats_descendants)

In [78]:
cats_descendants.sort_values('NumDescendants', ascending=False).head(1000).tail(10)

Unnamed: 0,CategoryName,NumDescendants
41070,federal government of the united states,26332
138044,problem solving,25747
108099,liberalism by country,25606
108095,republicanism by country,25580
108093,republics,25564
324052,post–cold war era,25510
30067,islands of the pacific ocean,25366
137889,decision-making,25285
137877,policy,25261
137874,social policy,25246


In [81]:
# show distribution of depth
# display without scientific notation
with pd.option_context('float_format', '{:.2f}'.format):
    display(pd.DataFrame(list(dfs_depth.values()), columns=['Depth']).describe())

# category depth is very high. Median is 364

Unnamed: 0,Depth
count,2346796.0
mean,339.28
std,275.89
min,0.0
25%,1.0
50%,364.0
75%,532.0
max,1001.0


In [82]:
# save all dfs related data

# category_id_to_dfs_metadata.tsv: CategoryId, DiscoveryTime, FinishingTime, Depth, NumDescendants
with open(data_root_dir + 'category_id_to_dfs_metadata.tsv', 'w') as f:
    f.write("CategoryId\tDiscoveryTime\tFinishingTime\tDepth\tNumDescendants\n")
    for category_id in dfs_color:
        f.write(f"{category_id}\t{dfs_discovery_time[category_id]}\t{dfs_finishing_time[category_id]}\t{dfs_depth[category_id]}\t{dfs_num_descendants[category_id]}\n")


In [83]:
# category_edges_dfs_classification.tsv: CategoryId, SubCategoryId, EdgeType
with open(data_root_dir + 'category_edges_dfs_classification.tsv', 'w') as f:
    f.write("CategoryId\tSubCategoryId\tEdgeType\n")
    for (category_id, sub_category_id), edge_type in edge_to_type.items():
        f.write(f"{category_id}\t{sub_category_id}\t{edge_type}\n")

In [84]:
"""
Obtain and save these mappings:

category_id_to_page_ids.tsv: CategoryId, PageId
category_id_to_stats.tsv: CategoryId, NumPages, NumChildCategories, NumParentCategories, NumDescendantCategories
"""

category_id_to_data = {}

start_time = time.time()
failure_counts = {
    'Unknown Category': 0,
    'Unknown SubCategory': 0,
    'Unknown ParentCategory': 0
}
log_level = 'ERROR'
for partition in range(NUM_PARTITIONS):
    with open(data_root_dir + f'category_pages/part-{partition}.txt', 'r') as f:
        for line in f:
            if line=='': continue
            category_json = json.loads(line)
            category_data = {
                'page_ids': [],
                'sub_category_ids': [],
                'parent_category_ids': []
            }
            category_id = categories['name_to_id'].get(category_json['category_name'], None)
            if category_id is None:
                if log_level in ['WARN', 'INFO']:
                    print(f"Category {category_json['category_name']} not found in category name to id mapping.")
                failure_counts['Unknown Category'] += 1
                continue
            category_id_to_data[category_id] = category_data
            for _, page_id in category_json['articles']:
                category_data['page_ids'].append(page_id)
            for sub_category in category_json['sub_categories']:
                sub_category_id = categories['name_to_id'].get(sub_category, None)
                if sub_category_id is None:
                    if log_level in ['WARN', 'INFO']:
                        print(f"Sub category {sub_category} not found in category name to id mapping.")
                    failure_counts['Unknown SubCategory'] += 1
                    continue
                category_data['sub_category_ids'].append(sub_category_id)
            for parent_category in category_json['parent_categories']:
                parent_category_id = categories['name_to_id'].get(parent_category, None)
                if parent_category_id is None:
                    if log_level in ['WARN', 'INFO']:
                        print(f"Parent category {parent_category} not found in category name to id mapping.")
                    failure_counts['Unknown ParentCategory'] += 1
                    continue
                category_data['parent_category_ids'].append(parent_category_id)
    print(f"Processed till part {partition} in {(time.time() - start_time) / 60} minutes")

print (f"Failure counts: {failure_counts}")


Processed till part 0 in 0.14392542441685993 minutes
Processed till part 1 in 0.25730487902959187 minutes
Processed till part 2 in 0.3715030550956726 minutes
Processed till part 3 in 0.49416927099227903 minutes
Processed till part 4 in 0.6536876241366069 minutes
Processed till part 5 in 0.7914778232574463 minutes
Processed till part 6 in 0.9500364224116008 minutes
Processed till part 7 in 1.083427627881368 minutes
Processed till part 8 in 1.232020664215088 minutes
Processed till part 9 in 1.3414887547492982 minutes
Failure counts: {'Unknown Category': 22359, 'Unknown SubCategory': 0, 'Unknown ParentCategory': 170572}


In [89]:
# save these mappings to tsv files

with open(data_root_dir + 'category_id_to_stats.tsv', 'w') as f:
    f.write("CategoryId\tNumPages\tNumChildCategories\tNumParentCategories\tNumDescendantCategories\n")
    for category_id, data in category_id_to_data.items():
        num_descendant_categories = dfs_num_descendants[category_id]
        f.write(f"{category_id}\t{len(data['page_ids'])}\t{len(data['sub_category_ids'])}\t{len(data['parent_category_ids'])}\t{num_descendant_categories}\n")

In [86]:
with open(data_root_dir + 'category_id_to_page_ids.tsv', 'w') as f:
    f.write("CategoryId\tPageId\n")
    for category_id, data in category_id_to_data.items():
        for page_id in data['page_ids']:
            f.write(f"{category_id}\t{page_id}\n")