Data collection logic: 
- event_entity = 'revision'
- snapshot dated 2022-07
- the observation period is 2021-07-01 – 2022-08-01 (1 month)
- consider only revisions with page_namespace = 0 (content)
- filter bots
- leave only revisions by anonymous users
- add a binary feature that revert was done by "experienced" user
- add filter for "revision-wars"



In [None]:
import wmfdata
spark = wmfdata.spark.get_session(
    type='yarn-regular',
    app_name ='mediawiki_history wikitext en',
    ship_python_env=True,
    extra_settings={
        "spark.executor.cores": 2,
    #     "spark.sql.shuffle.partitions": 195,
        "spark.sql.broadcastTimeout": "3600",
        "spark.sql.autoBroadcastJoinThreshold": 10485760 # 10Mb
    }
)

In [2]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.functions import udf
from collections.abc import Iterable

In [3]:
### Collecting data
snapshot = "'2022-07'"
wiki_dbs = ['enwiki']
wiki_dbs_str = '("' + '", "'.join(wiki_dbs) + '")'
min_timestamp = "'2022-01-01'"
max_timestamp = "'2022-08-01'"
dump_name = "mediawiki_history_en_07_2022.parquet"

features_to_observe = [
    "wiki_db",
    "event_comment",
    "event_user_text_historical",
    "event_user_is_bot_by",
    "event_user_is_anonymous",
    "event_user_groups",
    "event_user_seconds_since_previous_revision",
    "revision_id",
    "page_title",
    "page_revision_count",
    "revision_text_bytes_diff",
    "revision_is_identity_reverted",
    "event_timestamp",
    "revision_tags",
    "revision_parent_id",
    "revision_first_identity_reverting_revision_id"
]

changes = spark.sql(
    f'''
       SELECT {','.join(features_to_observe)}
       FROM wmf.mediawiki_history 
       WHERE event_entity = 'revision' AND wiki_db IN {wiki_dbs_str} AND snapshot = {snapshot} 
       AND event_timestamp >= {min_timestamp} AND event_timestamp < {max_timestamp}
       AND page_namespace = 0
    '''
)

changes.write.parquet(dump_name, mode="overwrite") 


# collected in 03_Initial_EDA.ipynb, reading dump
changes=spark.read.parquet(dump_name)
changes.count()

22/09/24 12:48:11 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
22/09/24 13:07:59 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

24664831

In [4]:
# change type of target column for stats calculation
target_column = "revision_is_identity_reverted"
changes = changes.withColumn(target_column, changes[target_column].cast(T.IntegerType()))

In [5]:
columns_to_select = changes.columns
changes_new = changes.alias("t1") \
    .join(changes.alias("t2"), F.col("t1.revision_first_identity_reverting_revision_id") == F.col("t2.revision_id"), "left") \
    .select(
        *[f"t1.{c}" for c in changes.columns],
        F.col("t2.revision_is_identity_reverted").alias("reverting_revision_is_reverted_revision"),
        F.col("t2.event_user_groups").alias("reverting_revision_event_user_groups"),
    )
changes_new.count()

                                                                                

24664831

In [6]:
# rate of "bad" reverts (reverts, that was later also reverted) -> we should filter out those
changes_new.where(F.col("revision_is_identity_reverted") == True) \
    .select(F.mean(F.col("reverting_revision_is_reverted_revision").cast(T.IntegerType()))).show()

[Stage 11:>                                                         (0 + 1) / 1]

+---------------------------------------------------------+
|avg(CAST(reverting_revision_is_reverted_revision AS INT))|
+---------------------------------------------------------+
|                                       0.1798579791724813|
+---------------------------------------------------------+



                                                                                

# Adding filters:

### Filter out revision wars:

In [7]:
# filter out only "not_reverted" revisions and reverted revisions that were reverted by "good" revert
changes_new = changes_new.fillna({'reverting_revision_is_reverted_revision': False})
changes_new = changes_new.where(changes_new["reverting_revision_is_reverted_revision"] != True)
print(changes_new.count())



24251117


                                                                                

### Filter out bots:

In [None]:
@udf("Integer")
def is_bot(user_groups):
    if isinstance(user_groups, Iterable):
        return 1 if len(user_groups) > 0 else 0
    else:
        return 0
    
changes_new = changes_new.withColumn("is_bot", is_bot(F.col("event_user_is_bot_by")))
changes_new = changes_new.where(changes_new["is_bot"]==0)
print(changes_new.count())

### Filter out new pages (where there is no revision_parent_id)
Filter revisions without parent revision (as we are interested in text changes

In [9]:
changes_new = changes_new.where(F.col("revision_parent_id") != 0)
print(changes_new.count())



21764892


                                                                                

### Filter out anonymous users:

In [10]:
# filter only is_anonymous users
changes_new = changes_new.where(changes_new["event_user_is_anonymous"]==True)
print(changes_new.count())



4322554


                                                                                

### Artificial balancing (optional step)

In [11]:
import numpy as np

MAX_COUNT = 300000

revert_revisions = changes_new.where(changes_new[target_column]==1)
revert_revisions_count = revert_revisions.count()
not_revert_revisions = changes_new.where(changes_new[target_column]==0)
not_revert_revisions_count = not_revert_revisions.count()

not_revert_revisions_count_new = np.min([not_revert_revisions_count, MAX_COUNT])
not_revert_revisions = not_revert_revisions.sample(not_revert_revisions_count_new/not_revert_revisions_count)
revert_revisions = revert_revisions.sample(np.min([1, not_revert_revisions_count_new/revert_revisions_count]))

changes_new = revert_revisions.union(not_revert_revisions)
changes_new.select(F.mean(F.col(target_column))).show()



+----------------------------------+
|avg(revision_is_identity_reverted)|
+----------------------------------+
|                0.4994026224623952|
+----------------------------------+



                                                                                

# Feature engineering:

### Mark if reverted revision is reverted by experienced user

In [12]:
@udf("Integer")
def is_good_user(user_groups):
    good_users = ['sysop','oversight','editor','rollbacker','checkuser','abusefilter','bureaucrat']
    if isinstance(user_groups, Iterable):
        return 1 if len([i for i in user_groups if i in good_users]) > 0 else 0
    else:
        return 0

changes_new = \
    changes_new.withColumn("is_reverted_by_good_user", is_good_user(F.col("reverting_revision_event_user_groups")))

### Mark specific tags

In [13]:
# Add features for specific tags:
revision_tags_to_get = [
    "mobile edit",
    "mobile web edit",
    "visualeditor",
    "wikieditor",
    "mobile app edit"
    "android app edit",
    "ios app edit"
]

for tag in revision_tags_to_get:
    @udf("Integer")
    def is_tag(tags_list, tag=tag):
        if isinstance(tags_list, Iterable):
            return 1 if tag in tags_list else 0
        else:
            return 0
    
    changes_new = changes_new.withColumn(f"is_{tag.replace(' ', '_')}", is_tag(F.col("revision_tags")))

In [14]:
columns_to_save = [
 'wiki_db',
 'event_comment',
 'event_user_text_historical',
 'event_user_seconds_since_previous_revision',
 'revision_id',
 'page_title',
 'page_revision_count',
 'revision_text_bytes_diff',
 'revision_is_identity_reverted',
 'event_timestamp',
 'revision_parent_id',
 'revision_first_identity_reverting_revision_id',
 'is_reverted_by_good_user',
 'is_mobile_edit',
 'is_mobile_web_edit',
 'is_visualeditor',
 'is_wikieditor',
 'is_mobile_app_editandroid_app_edit',
 'is_ios_app_edit'
]

dump_name = "en_mediawiki_history_filtered_07_2022.parquet"
changes_new.select(columns_to_save).write.parquet(dump_name, mode="overwrite") 


# collected prepared mediawiki_history
changes_new=spark.read.parquet(dump_name)

                                                                                

In [15]:
dump_name = "en_mediawiki_history_filtered_07_2022.parquet"
changes_new=spark.read.parquet(dump_name)

In [16]:
changes_new.count()

                                                                                

600123

# Collecting text differences:

### Collecting text data

In [17]:
snapshot = "2022-07"
wiki_dbs = ['enwiki']
wiki_dbs_str = '("' + '", "'.join(wiki_dbs) + '")'

wikitext_df = spark.sql(f"""SELECT revision_id, wiki_db, revision_text 
    FROM wmf.mediawiki_wikitext_history
    WHERE snapshot = '{snapshot}' and wiki_db in {wiki_dbs_str} and page_namespace = 0
    """)

### Joining the tables and collecting texts: 

In [18]:
history_columns = changes_new.columns

revisions_text_1 = wikitext_df.alias("text") \
    .join(
        changes_new.alias("history"),
        F.col("text.revision_id") == F.col("history.revision_id"),
        "right"
    ).select(
        *[F.col(f"history.{c}") for c in history_columns],
        F.col("text.revision_text").alias("revision_id_revision_text"),
    ).alias("t1")

revisions_text_2 = wikitext_df.alias("text") \
    .join(
        changes_new.alias("history"),
        F.col("text.revision_id") == F.col("history.revision_parent_id"),
        "right"
    ).select(
        F.col(f"history.revision_id"),
        F.col("text.revision_text").alias("revision_parent_id_revision_text")
    ).alias("t2")


revisions_text_all = revisions_text_1 \
    .join(revisions_text_2, F.col("t1.revision_id") == F.col("t2.revision_id")) \
    .select(
        F.col("t1.revision_id_revision_text").alias("revision_id_revision_text"),
        F.col("t2.revision_parent_id_revision_text").alias("revision_parent_id_revision_text"),
        *[F.col(f"t1.{c}") for c in history_columns],
    )

revisions_text_all = revisions_text_all.where(F.col("revision_parent_id_revision_text").isNotNull())
revisions_text_all = revisions_text_all.where(F.col("revision_id_revision_text").isNotNull())

In [19]:
revisions_text_all

DataFrame[revision_id_revision_text: string, revision_parent_id_revision_text: string, wiki_db: string, event_comment: string, event_user_text_historical: string, event_user_seconds_since_previous_revision: bigint, revision_id: bigint, page_title: string, page_revision_count: bigint, revision_text_bytes_diff: bigint, revision_is_identity_reverted: int, event_timestamp: string, revision_parent_id: bigint, revision_first_identity_reverting_revision_id: bigint, is_reverted_by_good_user: int, is_mobile_edit: int, is_mobile_web_edit: int, is_visualeditor: int, is_wikieditor: int, is_mobile_app_editandroid_app_edit: int, is_ios_app_edit: int]

# Developing logic for taking text diffs

In [20]:
import pandas as pd
import mwparserfromhell as mwp
from mwedittypes.constants import CAT_ALIASES, CAT_PREFIXES
from mwedittypes import EditTypes
from mwedittypes.tree_differ import get_diff
from mwedittypes.node_differ import get_diff_count
from difflib import ndiff
from fuzzywuzzy import fuzz
import ast
import re

FUZZY_THRESHOLD = 60



In [21]:
def sentence_tokenize(text):
    return list(map(str.strip, re.split(r"[.!?](?!$)", text)))

def parse_change_result(res: dict, similarity_threshold: int = FUZZY_THRESHOLD):
    # collecting added/deleted sentences within text
    added = []
    deleted = []
    for d in ndiff(res['prev']['text'].split('\n'), res['curr']['text'].split('\n')):
        if d.startswith('- '):
            text = d[2:]
            if len(text) > 0: 
                # deleted.extend(tokenize.sent_tokenize(d[2:]))
                deleted.extend(sentence_tokenize(d[2:]))
        elif d.startswith('+ '):
            text = d[2:]
            if len(text) > 0: 
                # added.extend(tokenize.sent_tokenize(d[2:]))
                added.extend(sentence_tokenize(d[2:]))
    
    # process added/deleted sentences
    # ToDo: refactor this code, as it is not optimal 
    text_changes = []
    used_deleted_texts = []
    used_added_texts = []
    
    for added_text in added:
        if added_text in used_added_texts:
                continue
        for deleted_text in deleted:
            if deleted_text in used_deleted_texts:
                continue
            similarity = fuzz.ratio(added_text, deleted_text)
            if similarity > similarity_threshold and added_text != deleted_text:
                text_changes.append((deleted_text, added_text))
                used_deleted_texts.append(deleted_text)
                used_added_texts.append(added_text)
                break
            elif added_text == deleted_text:
                used_deleted_texts.append(deleted_text)
                used_added_texts.append(added_text)
                break
    
    added = list(set(added) - set(used_added_texts))
    deleted = list(set(deleted) - set(used_deleted_texts)) 
    
    ### filter same inserts-deletes:
    same = set(added) & set(deleted)
    added = list(set(added) - same)
    deleted = list(set(deleted) - same)

    return added, deleted, text_changes

def get_differences(tree_diff, keys_to_parse):
    
    texts_removed = {}
    for res in tree_diff["remove"]:
        if res["type"] in keys_to_parse:
            texts_removed[res["type"]] = texts_removed.get(res["type"], []) + [res["text"]]

    texts_insert = {}
    for res in tree_diff["insert"]:
        if res["type"] in keys_to_parse:
            texts_insert[res["type"]] = texts_insert.get(res["type"], []) + [res["text"]]

    texts_change = {}
    for res in tree_diff["change"]:
        type_ = res['prev']["type"]
        if type_ in keys_to_parse:
            added, deleted, changes = parse_change_result(res)
            texts_change[type_] = texts_change.get(type_, []) + changes
            texts_removed[type_] = texts_removed.get(type_, []) + deleted
            texts_insert[type_] = texts_insert.get(type_, []) + added
    
    return texts_removed, texts_insert, texts_change

def parse_categories(wikitext, lang):
    categories = []
    parsed_wikicode = mwp.parse(wikitext)
    cat_prefixes = [c.lower() for c in CAT_PREFIXES + CAT_ALIASES.get(lang, [])]
    
    for wikilink in parsed_wikicode.filter_wikilinks():
        
        elements = re.sub(r"[\[,\]]", "", str(wikilink)).split(':', maxsplit=1)
        
        if len(elements) == 2:
            prefix, category = elements
        else:
            continue
        
        if prefix.lower() in cat_prefixes:
            categories.append(category)
    return categories

def get_changes(curr_wikitext, prev_wikitext, wikidb):
    # ToDo: move to constanst
    keys_to_parse = [
        # "Wikilink",
        # "Heading",
        "Reference",
        # "Table",
        # "Section",
        "Text",
        # "Template",
        # "Media",
        # "Category",
    ]
    categories = parse_categories(curr_wikitext, wikidb[:2])
    try:
        et = EditTypes(prev_wikitext, curr_wikitext, lang=wikidb[:2], timeout=10)
        tree_diff = get_diff(et.prev_wikitext, et.curr_wikitext, lang=et.lang,
                                      timeout=et.timeout, debug=et.debug)
        actions = get_diff_count(tree_diff, lang=et.lang)

        texts_removed, texts_insert, texts_change = get_differences(tree_diff, keys_to_parse)

        return [str(res) for res in [texts_removed, texts_insert, texts_change, actions]] + [str(categories)]
    except:
        return [str(dict()) for i in range(4)] + [str(categories)]

In [22]:
# Developing functions to take a diffs

schema = T.StructType([
    T.StructField("texts_removed", T.StringType(), True), 
    T.StructField("texts_insert", T.StringType(), True),
    T.StructField("texts_change", T.StringType(), True),
    T.StructField("actions", T.StringType(), True),
    T.StructField("categories", T.StringType(), True)
])

udf_get_changes = F.udf(get_changes, schema)

# comment to work with sample only
revisions_text_all_sample = revisions_text_all.repartition(1024, "revision_id")

revisions_text_all_sample = revisions_text_all_sample \
    .withColumn("udf_res", udf_get_changes(revisions_text_all_sample['revision_id_revision_text'], 
                                           revisions_text_all_sample['revision_parent_id_revision_text'], 
                                           revisions_text_all_sample['wiki_db'])) \
    .select(
        *history_columns, 
        F.col("udf_res.texts_removed"), 
        F.col("udf_res.texts_insert"),  
        F.col("udf_res.texts_change"), 
        F.col("udf_res.actions"),
        F.col("udf_res.categories")
    )

In [23]:
revisions_text_all_sample

DataFrame[wiki_db: string, event_comment: string, event_user_text_historical: string, event_user_seconds_since_previous_revision: bigint, revision_id: bigint, page_title: string, page_revision_count: bigint, revision_text_bytes_diff: bigint, revision_is_identity_reverted: int, event_timestamp: string, revision_parent_id: bigint, revision_first_identity_reverting_revision_id: bigint, is_reverted_by_good_user: int, is_mobile_edit: int, is_mobile_web_edit: int, is_visualeditor: int, is_wikieditor: int, is_mobile_app_editandroid_app_edit: int, is_ios_app_edit: int, texts_removed: string, texts_insert: string, texts_change: string, actions: string, categories: string]

### Dump to .parquet

In [None]:
dump_name = "en_anonymous_text_07-2022"
revisions_text_all_sample.write.parquet(dump_name + ".parquet", mode="overwrite") 

### Dump to .csv

In [25]:
# Collecting dataset
# saving intermediate results
revisions_text_all_sample = spark.read.parquet(dump_name + ".parquet")

In [26]:
revisions_text_all_sample_df = revisions_text_all_sample.toPandas()
revisions_text_all_sample_df.to_csv(f"data/{dump_name}.csv", index=False)

                                                                                

In [27]:
revisions_text_all_sample_df

Unnamed: 0,wiki_db,event_comment,event_user_text_historical,event_user_seconds_since_previous_revision,revision_id,page_title,page_revision_count,revision_text_bytes_diff,revision_is_identity_reverted,event_timestamp,...,is_mobile_web_edit,is_visualeditor,is_wikieditor,is_mobile_app_editandroid_app_edit,is_ios_app_edit,texts_removed,texts_insert,texts_change,actions,categories
0,enwiki,,86.1.110.209,,1063065888,Georgia_Taylor-Brown,80,5,0,2022-01-01 00:56:51.0,...,0,0,1,0,0,{'Text': []},{'Text': []},{'Text': [('Georgia Taylor-Brown (born 15 Marc...,"{'Template': {'change': 1}, 'Whitespace': {'in...","['1994 births', 'Sportspeople from Manchester'..."
1,enwiki,/* External links */,2A00:23C8:1780:9201:BCA4:7D62:B01D:4490,,1063071140,Wiggly_Park,85,0,0,2022-01-01 01:42:21.0,...,0,0,1,0,0,{},{},{},"{'Category': {'change': 1}, 'Section': {'chang...","[""BBC children's television shows"", 'Nick Jr. ..."
2,enwiki,,209.93.148.173,,1063151921,1990_Prime_Minister's_Resignation_Honours,51,16,1,2022-01-01 15:10:55.0,...,0,0,1,0,0,{'Text': []},{'Text': []},"{'Text': [(""Andrew Turnbull, Principal Private...","{'Wikilink': {'insert': 3, 'change': 2}, 'Word...","['1990 awards in the United Kingdom', '1990 aw..."
3,enwiki,/* Closures */Fresh Choice isn't fully gone th...,2600:1010:B062:37DB:C007:7F60:5B2D:ED0D,,1063443456,Fresh_Choice,182,-1409,1,2022-01-03 03:16:48.0,...,1,0,0,0,0,{'Reference': ['<ref>{{cite web |url=http://ca...,"{'Text': ['In late 2012, Fresh Choice closed a...",{'Text': []},"{'Wikilink': {'remove': 6}, 'Text Formatting':...","['Buffet restaurants', 'Defunct restaurant cha..."
4,enwiki,Added content,2A02:C7F:6F05:1A00:5C6A:DEE4:20E8:3A34,,1063697235,Peter_Sutcliffe,3687,108,1,2022-01-04 12:12:25.0,...,1,0,0,0,0,{'Text': []},{'Text': ['Sutcliffe plays as a centre back fo...,{'Text': []},"{'Whitespace': {'insert': 18}, 'Punctuation': ...","['1946 births', '1980s trials', '2020 deaths',..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
598762,enwiki,/* Lyrics */,49.150.32.41,,1101067030,The_More_We_Get_Together,121,1,1,2022-07-29 04:20:11.0,...,1,0,0,0,0,{'Text': []},{'Text': []},{'Text': []},"{'Whitespace': {'insert': 1}, 'Section': {'cha...","[""Traditional children's songs"", 'American fol..."
598763,enwiki,/* Models of fitness */,141.136.230.143,,1101105227,Fitness_(biology),514,-10,0,2022-07-29 09:54:55.0,...,1,0,0,0,0,{},{},{},"{'Heading': {'change': 1}, 'Section': {'change...","['Evolutionary biology concepts', 'Genetics co..."
598764,enwiki,/* List */,39.32.228.237,,1101106513,List_of_countries_by_foreign-exchange_reserves...,768,1,1,2022-07-29 10:07:21.0,...,0,0,0,0,0,{'Text': []},{'Text': []},"{'Text': [('1 2,281,208 Sep 2020中國4月外匯佔款連三月...","{'Table': {'change': 1}, 'Word': {'change': 1}...",['Lists of countries by economic indicator|for...
598765,enwiki,,94.226.56.157,,1101169003,Laurit_Krasniqi,13,-1,1,2022-07-29 17:13:20.0,...,1,0,0,0,0,{'Text': []},{'Text': []},"{'Text': [('Born in Belgium, he has chosen to ...","{'Wikilink': {'change': 1}, 'Word': {'change':...","['2001 births', 'Living people', 'Association ..."
