## imports

In [1]:
import wmfdata as wmf
import pandas as pd
import numpy as np

In [2]:
import json
import re
import warnings

In [3]:
pd.options.display.max_columns = None
bold = '\033[1m'
end = '\033[0m'

## spark_session

In [4]:
spark_session = wmf.spark.get_active_session()

if type(spark_session) != type(None):
    spark_session.stop()
else:
    print('no active session')

no active session


In [6]:
spark_session = wmf.spark.create_custom_session(
    master="yarn",
    app_name='bot-vandal-reverts',
    spark_config={
        "spark.driver.memory": "4g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "20g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 256,
        "spark.driver.maxResultSize": "2g"
        
    }
)

SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/25 06:48:08 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/07/25 06:48:09 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/07/25 06:48:09 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/07/25 06:48:18 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/07/25 06:48:18 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13001. Attempting port 13002.
23/07/25 06:48:18 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [5]:
# ig_warn()
# spark_session = wmf.spark.create_session(type='yarn-large')
spark_session

In [8]:
spark_session.sparkContext.setLogLevel("ERROR")

## data

In [9]:
bots = {
    'enwiki': 'ClueBot NG',
    'eswiki': 'SeroBOT',
    'frwiki': 'Salebot',
    'ptwiki': 'Salebot',
    'fawiki': 'Dexbot',
    'bgwiki': 'PSS 9',
    'simplewiki': 'ChenzwBot',
    'ruwiki': 'Рейму Хакурей',
    'rowiki': 'PatrocleBot'
}

In [29]:
%%time

query = """
WITH
    base AS (
        SELECT
            wiki_db,
            revision_id,
            event_timestamp,
            revision_first_identity_reverting_revision_id,
            revision_seconds_to_identity_revert
        FROM
            wmf.mediawiki_history
        WHERE
            snapshot = '2023-06' AND
            wiki_db IN {DBS} AND
            event_entity = 'revision' AND
            event_type = 'create' AND
            revision_is_identity_reverted AND
            page_namespace_is_content AND
            revision_seconds_to_identity_revert <= 24*60*60 AND
            DATE(event_timestamp) >= DATE('2020-07-01') AND
            DATE(event_timestamp) <= DATE('2023-06-30') AND
            NOT revision_parent_id = 0
        ),
            
    revert_counts AS (
        SELECT
            base.wiki_db,
            YEAR(base.event_timestamp) AS year,
            MONTH(base.event_timestamp) AS month,
            DAY(base.event_timestamp) AS day,
            COUNT(DISTINCT mwh.revision_id) AS all_reverts,
            COUNT(DISTINCT (CASE WHEN event_user_text IN {BOTS} THEN mwh.revision_id END)) AS bot_reverts
        FROM
            base
            JOIN wmf.mediawiki_history mwh
            ON base.revision_first_identity_reverting_revision_id = mwh.revision_id
        WHERE
            snapshot = '2023-06'
        GROUP BY
            YEAR(base.event_timestamp),
            MONTH(base.event_timestamp),
            DAY(base.event_timestamp),
            base.wiki_db
)
        
SELECT *
FROM revert_counts
"""

result = wmf.spark.run(query.format(DBS=wmf.utils.sql_tuple(bots.keys()),
                                   BOTS=wmf.utils.sql_tuple(set(bots.values()))))



CPU times: user 552 ms, sys: 120 ms, total: 672 ms
Wall time: 2min 41s


                                                                                

In [None]:
result.groupby('wiki_db').agg({'all_reverts': 'mean', 'bot_reverts': 'mean'})