In [None]:
import wmfdata

spark = wmfdata.spark.create_custom_session(
    master="yarn",
    ship_python_env = True,
    app_name=f'mediawiki history',
    spark_config={
        "spark.driver.memory": "16g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "8g",
        "spark.executor.memoryOverhead": "12g",
        "spark.executor.cores":2,
        "spark.sql.shuffle.partitions": 256, #2048
        "spark.sql.autoBroadcastJoinThreshold": 20971520,
        "spark.driver.maxResultSize": "2g"
    }
)

In [None]:
#Check the recent partitions
spark.sql('SHOW PARTITIONS wmf.mediawiki_wikitext_history').toPandas()

In [2]:
from pyspark.sql.functions import udf, broadcast
from pyspark.sql.types import StringType
from pyspark.sql.functions import rand
import re 
import pyspark.sql.functions as F
from pyspark.sql import types as T
from collections.abc import Iterable
import pandas as pd


@udf("String")
def findProp(t):
    try:
        return re.search("Property:(\w+)\]",t).group(1)
    except:
        return ''
    
@udf("String")
def findCommentSummary(t):
    try:
        return re.search("\* ([\w-]+)",t).group(1)
    except:
        return ''

@udf("String")
def getObject(t):
    try:
        return re.search(r"(Q\w+)",t).group(1)
    except:
        return None
    
    
@udf("String")
def getValue(t):
    try:
        return eval(t).get('id',None)
    except:
        return None

In [None]:
#Check the recent partitions
spark.sql('SHOW PARTITIONS wmf.mediawiki_history').toPandas()

In [None]:
#Check the recent partitions
spark.sql('SHOW PARTITIONS wmf.wikidata_item_page_link').toPandas()

In [None]:
#Check the recent partitions
res = spark.sql('SHOW PARTITIONS wmf.mediawiki_wikitext_history').toPandas()
[i for i in res.partition.values if "wikidata" in i]

In [6]:
snapshot_small = "2024-05-06"
snapshot_big = "2024-04"
lang='en'

In [None]:
itemsWithSiteLink =  spark.sql (f'''
SELECT DISTINCT item_id FROM  wmf.wikidata_item_page_link WHERE 
snapshot = '{snapshot_small}'
''')


claims =  spark.sql(f'''SELECT TIMESTAMP(event_timestamp), 
                                    revision_id,
                                    page_title,
                                    event_comment,
                                    revision_parent_id, 
                                    event_user_is_bot_by,
                                    SIZE(event_user_is_bot_by) as is_bot,
                                    revision_is_identity_reverted,
                                    revision_is_identity_revert,
                                    revision_seconds_to_identity_revert,
                                    revision_first_identity_reverting_revision_id,
                                    event_user_groups,
                                    event_user_text,
                                    SIZE(event_user_groups) as number_of_groups,
                                    event_user_revision_count,
                                    event_user_is_anonymous,                                    
                                    page_revision_count,
                                    page_seconds_since_previous_revision,
                                    datediff(TIMESTAMP(event_timestamp),TIMESTAMP(event_user_creation_timestamp)) AS user_age,
                                    datediff(TIMESTAMP(event_timestamp),TIMESTAMP(page_first_edit_timestamp)) AS page_age,
                                    revision_tags
                                    
            FROM wmf.mediawiki_history WHERE event_entity = 'revision' 
            AND wiki_db = 'wikidatawiki' 
            AND page_namespace=0
            AND snapshot = '{snapshot_big}' 
            AND TIMESTAMP(event_timestamp) BETWEEN TIMESTAMP('2021-01-01') AND TIMESTAMP('2024-01-01')
            ''')
    
# collecting info about reverting revision
columns_to_select = claims.columns
changes_new = claims.alias("t1") \
    .join(claims.alias("t2"), F.col("t1.revision_first_identity_reverting_revision_id") == F.col("t2.revision_id"), "left") \
    .select(
        *[f"t1.{c}" for c in claims.columns],
        F.col("t2.revision_is_identity_reverted").alias("reverting_revision_is_reverted_revision"),
        F.col("t2.event_user_text").alias("reverting_revision_event_user_text"),
    )
changes_new = changes_new.withColumn(
    "self_revert", 
    (F.col("event_user_text") == F.col("reverting_revision_event_user_text")
    )
)
changes_new = changes_new.filter(
    F.array_contains("revision_tags", "wikidata-ui") 
)


##### Sampling strategy: 
# 1. Leave all the reverts
changes_new_reverts = changes_new.where(F.col("revision_is_identity_reverted") == True)
n_reverts = changes_new_reverts.count()
print("Number of reverted revisions: ", n_reverts)

# 2. Leave all the ids from testset
holdout_test = pd.read_csv("../data/holdout/test_holdout.csv", sep="\t")
revisions_to_check = list(set(holdout_test.rev))
changes_new_holdout = changes_new.filter(F.col("revision_id").isin(revisions_to_check))

# 3. Sample not-reverted revisions (10 times number of reverts)
changes_new_non_reverts = changes_new.where(F.col("revision_is_identity_reverted") == False)
changes_new_non_reverts = changes_new_non_reverts.sample(fraction=1.0, seed=42).limit(n_reverts * 5)

# 4. Union dataframes: 
changes_new = changes_new_reverts.union(changes_new_holdout).union(changes_new_non_reverts)

# 5. Drop duplicates: 
changes_new = changes_new.dropDuplicates()
####


#### Additional features: 
instanceof = spark.sql(f"""
SELECT subject as page_title, rank, object FROM (
    SELECT
      subject,
      claim.rank,
      claim.mainSnak.property AS predicate,
      claim.mainSnak.dataValue.value AS object
    FROM (
          SELECT
            id as subject,
            explode(claims) as claim
          FROM wmf.wikidata_entity
          WHERE snapshot = '{snapshot_small}'
    ) t
 ) 
 WHERE predicate = "P31"
""")

labels = spark.sql(f"""SELECT id, labels.{lang} as label_en
        FROM wmf.wikidata_entity
      WHERE snapshot = '{snapshot_small}'
      """)

changes_new = changes_new.join(labels,changes_new['page_title']==labels['id'],'left')
instanceof = instanceof.withColumn("instanceof", getValue(F.col("object"))).drop('object')

instanceof = instanceof.groupby('page_title').agg(F.collect_list('instanceof').alias('instanceof'),F.collect_list('rank').alias('rank'))
changes_new = changes_new.join(instanceof,'page_title','left')

# changes_new = changes_new.join(itemsWithSiteLink, changes_new['page_title']==itemsWithSiteLink['item_id'])  # Avoid filtering data from the holdout test


######### More additional features
def listToOneHot(df,tag_list_col,exlude_tags=[], include_tags=None):
    tags = [
        x[0]
        for x in df.select(F.explode(tag_list_col).alias(tag_list_col))
        .distinct()
        .orderBy(tag_list_col)
        .collect()
    ]
    if include_tags:
        tags = include_tags
    else:
        tags = [tag for tag in tags if tag not in exlude_tags]
    print(tags)
    df = df.select(
        "*",
        *[
            F.array_contains(tag_list_col, tag).alias(f"{tag_list_col.replace(' ', '-')}-{tag.replace(' ', '-')}").cast("integer")
            for tag in tags
        ]
    )
    df = df.drop(tag_list_col)
    return df

changes_new.cache()

changes_new = listToOneHot(changes_new,'event_user_groups')
changes_new = listToOneHot(changes_new,'event_user_is_bot_by')
include_tags = [
       'revision_tags-InfoboxExport gadget',
       'revision_tags-Unexpected value for IMDb identifier',
       'revision_tags-Unexpected value for IUCN conservation status',
       'revision_tags-Unexpected value for gender',
       'revision_tags-deprecated property',
       'revision_tags-description-phone-number', 'revision_tags-emoji',
       'revision_tags-mw-replace',
       'revision_tags-new editor changing statement',
       'revision_tags-new editor removing statement',
       'revision_tags-possible test edit', 'revision_tags-possible vandalism',
       'revision_tags-removal of gender property',
       'revision_tags-removing deprecated statement',
       'revision_tags-self-referencing',
       'revision_tags-unexpected value for phone number',
       'revision_tags-unsourced ethnicity addition by IP',
       'revision_tags-wikisyntax']
changes_new = listToOneHot(changes_new,'revision_tags', exlude_tags=['mw-manual-revert','mw-reverted','wikidata-ui'], include_tags=include_tags)

# changing columns to dump intermediate results
for c in changes_new.columns:
    changes_new = changes_new.withColumnRenamed(c, c.replace(" ", "-"))

changes_new.cache()

24/06/10 12:24:35 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/06/10 12:24:56 WARN YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

Number of reverted revisions:  1114789


24/06/10 12:28:54 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                ]]

['bot', 'bureaucrat', 'checkuser', 'confirmed', 'flood', 'interface-admin', 'ipblock-exempt', 'propertycreator', 'rollbacker', 'suppress', 'sysop', 'translationadmin', 'wikidata-staff']
['group', 'name']
['revision_tags-InfoboxExport gadget', 'revision_tags-Unexpected value for IMDb identifier', 'revision_tags-Unexpected value for IUCN conservation status', 'revision_tags-Unexpected value for gender', 'revision_tags-deprecated property', 'revision_tags-description-phone-number', 'revision_tags-emoji', 'revision_tags-mw-replace', 'revision_tags-new editor changing statement', 'revision_tags-new editor removing statement', 'revision_tags-possible test edit', 'revision_tags-possible vandalism', 'revision_tags-removal of gender property', 'revision_tags-removing deprecated statement', 'revision_tags-self-referencing', 'revision_tags-unexpected value for phone number', 'revision_tags-unsourced ethnicity addition by IP', 'revision_tags-wikisyntax']


DataFrame[page_title: string, event_timestamp: timestamp, revision_id: bigint, event_comment: string, revision_parent_id: bigint, is_bot: int, revision_is_identity_reverted: boolean, revision_is_identity_revert: boolean, revision_seconds_to_identity_revert: bigint, revision_first_identity_reverting_revision_id: bigint, event_user_text: string, number_of_groups: int, event_user_revision_count: bigint, event_user_is_anonymous: boolean, page_revision_count: bigint, page_seconds_since_previous_revision: bigint, user_age: int, page_age: int, reverting_revision_is_reverted_revision: boolean, reverting_revision_event_user_text: string, self_revert: boolean, id: string, label_en: string, instanceof: array<string>, rank: array<string>, event_user_groups-bot: int, event_user_groups-bureaucrat: int, event_user_groups-checkuser: int, event_user_groups-confirmed: int, event_user_groups-flood: int, event_user_groups-interface-admin: int, event_user_groups-ipblock-exempt: int, event_user_groups-prope

In [None]:
# dump data to parquet:
dump_name = f"wikidata_{snapshot_big}_training_sample_full"
changes_new = changes_new.repartition(256, "revision_id")
changes_new.write.parquet(dump_name + ".parquet", mode="overwrite")
# dump data to csv:
changes_new = spark.read.parquet(dump_name + ".parquet")
changes_new_df = changes_new.toPandas()
changes_new_df.to_csv(f"../data/2024-04_metadata.csv", index=False)

                                                                                

In [9]:
changes_new.count()

                                                                                

6689025

In [10]:
len(changes_new_df[changes_new_df.revision_id.isin(revisions_to_check)].revision_id.unique())

1296

In [11]:
print(len(changes_new_df[changes_new_df.revision_id.isin(revisions_to_check)]))

1296


## Getting ORES scores: 

In [None]:
ores = spark.sql('''SELECT rev_id as revision_id, scores.damaging.probability['true'] as pred FROM event_sanitized.mediawiki_revision_score
                 WHERE database ='wikidatawiki' AND year IN (2021, 2022, 2023, 2024)  AND page_namespace=0''') #getting ORES data to compare with test 
orespd = ores.join(changes_new.select('revision_id'), 'revision_id').toPandas()
orespd

In [None]:
orespd.to_csv("../data/ores_scores.csv", index=False)

## Getting text data: 

In [15]:
import json
import re
from typing import Any, Optional, Union
from deepdiff import DeepDiff


def _parse_nested_change(old_dict: dict, new_dict: dict) -> Any:
    hash_pattern = re.compile(r".*\['hash'\]")
    id_pattern = re.compile(r".*\['id'\]")
    diff = DeepDiff(
        new_dict,
        old_dict,
        ignore_order=True,
        exclude_regex_paths=[hash_pattern, id_pattern],
    )
    return diff.get("values_changed", {})


def _find_key_in_nested_dict(
    dictionary: Union[dict, list], target_key: str
) -> Optional[str]:
    # If the input is a dictionary, search it
    if isinstance(dictionary, dict):
        # Iterate through each key-value pair in the dictionary
        for key, value in dictionary.items():
            # If the current value is another dictionary or list, recursively search it
            if isinstance(value, (dict, list)):
                nested_result = _find_key_in_nested_dict(value, target_key)
                if nested_result is not None:
                    return nested_result
            # If the current key matches the target key, return the corresponding value
            elif key == target_key:
                return value
    # If the input is a list, search each element in the list
    elif isinstance(dictionary, list):
        for item in dictionary:
            if isinstance(item, (dict, list)):
                nested_result = _find_key_in_nested_dict(item, target_key)
                if nested_result is not None:
                    return nested_result
    else:
        pass
    # If the key is not found in the entire dictionary or list, return None
    return None


def _parse_type_changes(type_changes_dict: dict) -> tuple[dict, dict]:
    """
    Function to process diff in case type was changed from dict to list or vice versa
    """
    adds = dict()
    removes = dict()
    list_to_look = ["id", "value", "title"]
    try:
        for key, value in type_changes_dict.items():
            # case 1: old is empty list, new is dict
            if len(value["old_value"]) == 0 and len(value["new_value"]) != 0:
                # search for proporty
                p = _find_key_in_nested_dict(value["new_value"], "property")
                # search for new value
                for k in list_to_look:
                    v = _find_key_in_nested_dict(value["new_value"], k)
                    if v:
                        break
                if p:
                    adds[f"{key}['{p}']"] = v
                else:
                    adds[key] = v
            # case 2: new is empty list, old is dict
            if len(value["new_value"]) == 0 and len(value["old_value"]) != 0:
                # search for proporty
                p = _find_key_in_nested_dict(value["old_value"], "property")
                # search for deleted value
                for k in list_to_look:
                    v = _find_key_in_nested_dict(value["old_value"], k)
                    if v:
                        break
                if p:
                    removes[f"{key}['{p}']"] = v
                else:
                    removes[key] = v
    except Exception:
        pass
    return adds, removes


def _add_iterable_items(result: Any) -> dict:
    added_keys = result.keys()
    parsed_result = {}
    for k in added_keys:
        parsed_result[k] = result[k]
    return parsed_result


def _get_element_by_path(obj: Any, path: str) -> str:
    # Split the path and iterate through the keys to access the value
    keys = re.findall(r"\['(.*?)'\]", path)
    current_value = obj
    for key in keys:
        if key in current_value:
            current_value = current_value[key]
        else:
            current_value = None
            break
    return current_value


def parse_wikidata_revision_difference(
    old_revision, new_revision
) -> list[str]:
    """
    Main function to parse the difference between two Wikidata revisions
    """
    try:
        # Parse JSON strings into Python dictionaries if needed
        if isinstance(old_revision, str):
            dict_old = json.loads(old_revision)
            dict_new = json.loads(new_revision)
        else:
            dict_old = old_revision
            dict_new = new_revision

        # Find the difference using deepdiff
        hash_pattern = re.compile(r"root\['claims'\]\['.*?'\]\[\d*\].*\['hash'\]")
        id_pattern = re.compile(r"root\['claims'\]\['.*?'\]\[\d*\].*\['id'\]")
        diff = DeepDiff(
            dict_old,
            dict_new,
            ignore_order=True,
            exclude_regex_paths=[hash_pattern, id_pattern],
        )

        # Process the differences and create a structured representation
        parsed_diff = {
            "added": diff.get("dictionary_item_added", []),
            "removed": diff.get("dictionary_item_removed", []),
            "type_changes": diff.get("type_changes", {}),
            "iterable_added": _add_iterable_items(diff.get("iterable_item_added", {})),
            "iterable_removed": _add_iterable_items(
                diff.get("iterable_item_removed", {})
            ),
            "changed": diff.get("values_changed", {}),
        }

        # Process type chenges:
        additional_add, additional_removes = _parse_type_changes(
            parsed_diff["type_changes"]
        )
        
        # check if descriptions are in changes (optional step): 
        is_desc = False
        for r in parsed_diff["removed"]:
            if "descriptions" in r:
                is_desc = True
                break
        for a in parsed_diff["added"]:
            if ("descriptions" in a) or is_desc:
                is_desc = True
                break
        for c in parsed_diff["changed"].keys():
            if ("descriptions" in c) or is_desc:
                is_desc = True
                break
        for c in parsed_diff["type_changes"].keys():
            if ("descriptions" in c) or is_desc:
                is_desc = True
                break
        parsed_diff["descriptions"] = dict_old["descriptions"] if is_desc else {}
        
        # check if labels are in changes (optional step): 
        is_label = False
        for r in parsed_diff["removed"]:
            if "labels" in r:
                is_label = True
                break
        for a in parsed_diff["added"]:
            if ("labels" in a) or is_label:
                is_label = True
                break
        for c in parsed_diff["changed"].keys():
            if ("labels" in c) or is_label:
                is_label = True
                break
        for c in parsed_diff["type_changes"].keys():
            if ("labels" in c) or is_label:
                is_label = True
                break
        parsed_diff["labels"] = dict_old["labels"] if is_label else {}
        

        # processing nested changes (only one level change):
        processed_changes = {}
        keys_to_delete = []
        for key, value in parsed_diff["changed"].items():
            if isinstance(value["new_value"], dict) and isinstance(
                value["old_value"], dict
            ):
                parsed_changes = _parse_nested_change(
                    value["new_value"], value["old_value"]
                )
                keys_to_delete.append(key)
                for key_c, value_c in parsed_changes.items():
                    processed_changes[key + key_c] = value_c
        parsed_diff["changed"].update(processed_changes)
        for k in keys_to_delete:
            del parsed_diff["changed"][k]

        # getting removed values
        precessed_removes = {}
        for key in parsed_diff["removed"]:
            precessed_removes[key] = _get_element_by_path(dict_old, key)
        parsed_diff["removed"] = precessed_removes
        parsed_diff["removed"].update(parsed_diff["iterable_removed"])
        parsed_diff["removed"].update(additional_removes)

        # getting added values
        precessed_add = {}
        for key in parsed_diff["added"]:
            precessed_add[key] = _get_element_by_path(dict_new, key)
        parsed_diff["added"] = precessed_add
        parsed_diff["added"].update(parsed_diff["iterable_added"])
        parsed_diff["added"].update(additional_add)

        return [
            str(parsed_diff["added"]), str(parsed_diff["removed"]), str(parsed_diff["changed"]), 
            str(parsed_diff["descriptions"]), str(parsed_diff["labels"])]
    except Exception as e:
        print(e)
        return [str({}), str({}), str({}), str({}), str({})]


In [16]:
# Step 0: Filter columns + setup batching
n_batch = 15
selected_columns = [
    'page_title', 'event_timestamp', 'event_user_is_anonymous', 'label_en',
    "revision_id", "revision_parent_id", 
    "revision_is_identity_reverted", "self_revert", 
    "reverting_revision_is_reverted_revision"]

changes_new = changes_new.select(selected_columns).withColumn("batch_id", (rand() * n_batch + 1).cast("int"))

In [17]:
# #Check the recent partitions
# spark.sql('SHOW PARTITIONS wmf.mediawiki_wikitext_history').toPandas()

In [None]:
# Developing functions to take a diffs
schema = T.StructType([
    T.StructField("added", T.StringType(), True),
    T.StructField("removed", T.StringType(), True),
    T.StructField("changed", T.StringType(), True),
    T.StructField("descriptions", T.StringType(), True),
    T.StructField("labels", T.StringType(), True),
])
udf_get_changes = F.udf(parse_wikidata_revision_difference, schema)


for i in range(1, n_batch+1):
    # Step 1: Get ids to filter:
    changes_new_sample = changes_new.where(changes_new.batch_id==i)
    print("Lenght of batch: ", changes_new_sample.count())
    
    selected_revision_df = changes_new_sample.select(F.col("revision_parent_id").alias("revision_id")) \
        .union(changes_new_sample.select(F.col("revision_id").alias("revision_id"))
    ).distinct()
    
    print(f"Processing batch {i}...")
    print("Number of revisions to look for: ", selected_revision_df.count())
    
    # Step 2: Get texts for selected revisions
    wikitext_df = spark.sql(f"""SELECT revision_id, revision_text 
        FROM wmf.mediawiki_wikitext_history
        WHERE snapshot = '{snapshot_big}' 
        AND wiki_db = "wikidatawiki" 
        AND page_namespace = 0 
        """)
    # Perform a broadcast join and filter the DataFrame
    wikitext_df = (
        wikitext_df
        .join(broadcast(selected_revision_df), "revision_id", "inner")
        .select("revision_id", "revision_text")
    ).dropDuplicates(subset=["revision_id"])
    # print("Number of revisions found: ", wikitext_df.count())
    
    # Step 3: Merging logic implementation: 
    history_columns = changes_new_sample.columns
    revisions_text_1 = wikitext_df.alias("text") \
        .join(
        broadcast(changes_new_sample.alias("history")),
        F.col("text.revision_id") == F.col("history.revision_id"),
        "right"
    ).select(
        *[F.col(f"history.{c}") for c in history_columns],
        F.col("text.revision_text").alias("revision_id_revision_text"),
    ).alias("t1")

    revisions_text_2 = wikitext_df.alias("text") \
        .join(
        broadcast(changes_new_sample.alias("history")),
        F.col("text.revision_id") == F.col("history.revision_parent_id"),
        "right"
    ).select(
        F.col(f"history.revision_id"),
        F.col("text.revision_text").alias("revision_parent_id_revision_text")
    ).alias("t2")

    revisions_text_all = revisions_text_1 \
        .join(revisions_text_2, F.col("t1.revision_id") == F.col("t2.revision_id")) \
        .select(
        F.col("t1.revision_id_revision_text").alias("revision_id_revision_text"),
        F.col("t2.revision_parent_id_revision_text").alias("revision_parent_id_revision_text"),
        *[F.col(f"t1.{c}") for c in history_columns],
    )
    
    # print("Number of revisions after merging: ", revisions_text_all.count())

    # drop empty text revisions
    revisions_text_all = revisions_text_all.where(F.col("revision_parent_id_revision_text").isNotNull())
    revisions_text_all = revisions_text_all.where(F.col("revision_id_revision_text").isNotNull())

    # print("Number of revisions after merging & filtering: ", revisions_text_all.count())
    
    # comment to work with sample only
    revisions_text_all_sample = revisions_text_all #.repartition(1024, "revision_id")
    revisions_text_all_sample = revisions_text_all_sample \
        .withColumn("udf_res", udf_get_changes(
        revisions_text_all_sample['revision_parent_id_revision_text'],
        revisions_text_all_sample['revision_id_revision_text'],
        )) \
        .select(
            *history_columns,
            F.col("udf_res.added"),
            F.col("udf_res.removed"),
            F.col("udf_res.changed"),
            F.col("udf_res.descriptions"),
            F.col("udf_res.labels")
        )
    
    # Step 4: Saving: 
    # dump data to parquet:
    dump_name = f"full_wikidata_{snapshot_big}_training_sample_batch_{i}"
    revisions_text_all_sample.write.parquet(dump_name + ".parquet", mode="overwrite")

    # dump data to csv:
    revisions_text_all_sample = spark.read.parquet(dump_name + ".parquet")
    revisions_text_all_sample_df = revisions_text_all_sample.toPandas()
    revisions_text_all_sample_df.to_csv(f"../data/{dump_name}.csv", index=False)
    print("Number of revisions saved: ", revisions_text_all_sample_df)

# Labels collection: 

In [None]:
import re
from tqdm import tqdm

dfs = [pd.read_csv(f"../data/{snapshot_big}_content_batch_{i}.csv") for i in tqdm(range(1,16))]
df = pd.concat(dfs).reset_index(drop=True)

print(df.revision_id.value_counts())

def extract_all_P(text): 
    pattern = r'P\d+'
    matches = re.findall(pattern, text)
    return matches

def extract_all_Q(text): 
    pattern = r'Q\d+'
    matches = re.findall(pattern, text)
    return matches

all_Qs = set(df.page_title)
all_Ps = set()

for text in tqdm(df.added.values):
    all_Qs |= set(extract_all_Q(text))
    all_Ps |= set(extract_all_P(text))
for text in tqdm(df.removed.values):
    all_Qs |= set(extract_all_Q(text))
    all_Ps |= set(extract_all_P(text))
for text in tqdm(df.changed.values):
    all_Qs |= set(extract_all_Q(text))
    all_Ps |= set(extract_all_P(text))

In [None]:
## Extracting list of labels
labels = spark.sql(f"""SELECT id, labels.{lang} as label_en
      FROM wmf.wikidata_entity
      WHERE snapshot = '{snapshot_small}'
      """)

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

ids_to_extract = list(all_Ps | all_Qs)

# Define schema for DataFrame containing IDs
schema = StructType([StructField("id", StringType(), True)])
# Create DataFrame containing IDs to filter by
df_ids = spark.createDataFrame([(id_,) for id_ in ids_to_extract], schema)

# Perform join operation
filtered_labels = labels.join(df_ids, "id", "inner")

# Saving all labels text
labels_df = filtered_labels.toPandas()
labels_df.head()

print("Rate of None: ", labels_df.label_en.isna().mean())

labels_df.to_csv(f"../data/full_labels_{snapshot_big}_text_en.csv", index=False)
print("Number of labels saved: ", labels_df)