In [1]:
import numpy as np
import pandas as pd

import wmfdata as wmf
from wmfdata.utils import get_dblist

# Parameters

In [2]:
# Data will be generated for the 12 months ending with and including SNAPSHOT
# This is also the mediawiki_history snapshot necessary for the calculations.
SNAPSHOT = "2023-01"

# An ordered list of wikis for which to output each metric after it is generated. 
# This is useful for spot checking the values generated against the previous snapshot.
WIKIS_TO_CHECK = [
    "enwiki",
    "eswiki",
    "jawiki",
    "dewiki",
    "frwiki",
    "ruwiki"
]

In [3]:
snapshot = pd.Period(SNAPSHOT)
start = (snapshot - 11).start_time
end = (snapshot + 1).start_time
# Used to name the output files
file_stem = snapshot.strftime("%b %Y")

# Start included, end excluded
query_vars = dict(
    snapshot=snapshot.strftime("%Y-%m"),
    start=start.strftime('%Y-%m-%d'), 
    end=end.strftime('%Y-%m-%d'),
    ym_start=start.strftime("%Y-%m"),
    ym_end=end.strftime("%Y-%m"),
    pv_start=start.strftime("%Y%m"),
    pv_end=end.strftime("%Y%m"),
    # New editor retention needs different time boundaries, since we define retention
    # in a given month as the status of the new users who registered two months prior
    ner_cohort_start=(snapshot - 14).strftime("%Y-%m"),
    ner_cohort_end=(snapshot - 1).strftime("%Y-%m")
)

# Spark setup

In [6]:
wmf.spark.create_session(type="yarn-large")

SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/07 02:04:40 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/02/07 02:04:41 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/02/07 02:04:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/07 02:04:51 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/02/07 02:04:51 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


# List of wikis

In [7]:
# The canonical_data.wikis table is not automatically updated,
# so you should first update it manually by running the notebook:
# https://github.com/wikimedia-research/canonical-data/blob/master/generation/wikis.ipynb
#
# This is important since new wikis are regularly opened and old ones are sometimes closed.
wikis = wmf.spark.run("""
SELECT
  database_code,
  database_group AS project_code,
  language_code,
  domain_name,
  language_name,
  english_name as wiki_name
FROM canonical_data.wikis
WHERE
  database_group IN (
    "commons", "incubator", "foundation", "mediawiki", "meta", "sources",
    "species","wikibooks", "wikidata", "wikinews", "wikipedia", "wikiquote",
    "wikisource", "wikiversity", "wikivoyage", "wiktionary"
  )
  AND status = "open"
  AND visibility = "public"
  AND editability = "public"
""")

                                                                                

# Check for missing wikis in mediawiki_history

Sometimes, wikis are not added to mediawiki_history after they're created (e.g. [T299548](https://phabricator.wikimedia.org/T299548), [T220456](https://phabricator.wikimedia.org/T220456)). Let's check for that.

In [8]:
mwh_wikis = wmf.spark.run("""
SELECT DISTINCT wiki_db AS database_code
FROM wmf.mediawiki_history
""".format(**query_vars))

23/02/07 02:05:44 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/02/07 02:06:08 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

In [9]:
mwh_missing_wikis = set(wikis["database_code"]) - set(mwh_wikis["database_code"])
mwh_missing_wikis

{'aswikiquote',
 'bclwikiquote',
 'bjnwiktionary',
 'blkwiki',
 'bnwikiquote',
 'gorwiktionary',
 'guwwikiquote',
 'guwwiktionary',
 'igwikiquote',
 'igwiktionary',
 'pcmwiki',
 'shnwikibooks',
 'tlwikiquote'}

If there are missing wikis, they should be reported to Data Engineering. In the meantime, let's continue generating the data for the remaining wikis.

In [10]:
wikis = wikis.drop(
    index=wikis.query("database_code in @mwh_missing_wikis").index
)

# Data collection

In [11]:
def merge_in(df, on="database_code"):
    global wikis
    wikis = pd.merge(wikis, df, how="left", on=on).fillna(0)

wikis_to_check = (
    pd.DataFrame({"database_code": WIKIS_TO_CHECK})
    .merge(wikis, how="left", on="database_code")
)

def check(df, index_col="database_code"):
    indexes = wikis_to_check[index_col].to_frame()
    return indexes.merge(df, how="left", on=index_col)

## Unique devices

In [12]:
ud = wmf.spark.run("""
SELECT
    -- Strip mobile subdomains so mobile and desktop sites are combined. 
    REGEXP_REPLACE(
        REGEXP_REPLACE(
            -- The canonical domains for Wikidata and MediaWiki.org start with `www`, which 
            -- gets _replaced_ by the mobile subdomain. Combine the two possibilites for each site.
            REGEXP_REPLACE(
                REGEXP_REPLACE(domain, "^m\\\\.wikidata", "www.wikidata"),
            "^m\\\\.mediawiki", "www.mediawiki"),
        "^m\\\\.", ""),
    "\\\\.m\\\\.", ".") AS domain_name,
    SUM(uniques_estimate) / 12 AS monthly_unique_devices,
    SUM(IF(
        (domain REGEXP "^m\\\\." OR  domain REGEXP "\\\\.m\\\\."),
        uniques_estimate,
        0
    )) / SUM(uniques_estimate) AS mobile_unique_devices
FROM wmf.unique_devices_per_domain_monthly
WHERE 
    CONCAT(year, LPAD(month, 2, "0")) >= "{pv_start}" 
    AND CONCAT(year, LPAD(month, 2, "0")) < "{pv_end}"  
GROUP BY    
    REGEXP_REPLACE(
        REGEXP_REPLACE(
            -- The canonical domains for Wikidata and MediaWiki.org start with `www`, which 
            -- gets _replaced_ by the mobile subdomain. Combine the two possibilites for each site.
            REGEXP_REPLACE(
                REGEXP_REPLACE(domain, "^m\\\\.wikidata", "www.wikidata"),
            "^m\\\\.mediawiki", "www.mediawiki"),
        "^m\\\\.", ""),
    "\\\\.m\\\\.", ".")
""".format(**query_vars))

                                                                                

In [13]:
check(ud, index_col="domain_name")

Unnamed: 0,domain_name,monthly_unique_devices,mobile_unique_devices
0,en.wikipedia.org,806906000.0,0.701502
1,es.wikipedia.org,145458600.0,0.736539
2,ja.wikipedia.org,102381500.0,0.744152
3,de.wikipedia.org,97089160.0,0.62665
4,fr.wikipedia.org,93493820.0,0.678406
5,ru.wikipedia.org,107290200.0,0.716601


In [14]:
merge_in(ud, on="domain_name")

## Pageviews

In [15]:
pv = wmf.spark.run("""
SELECT
    IF(
        project IN ("mediawiki", "wikidata"),
        CONCAT("www.", project, ".org"),
        CONCAT(project, ".org")
    ) AS domain_name, 
    SUM(view_count) / 12 AS monthly_pageviews,
    SUM(CASE WHEN access_method = "mobile web" THEN view_count END)
        / SUM(view_count) AS mobile_web_pageviews,
    SUM(CASE WHEN access_method = "mobile app" THEN view_count END)
        / SUM(view_COUNT) AS mobile_app_pageviews
FROM wmf.projectview_hourly
WHERE
    agent_type = "user" 
    AND CONCAT(year, LPAD(month, 2, "0")) >= "{pv_start}" 
    AND CONCAT(year, LPAD(month, 2, "0")) < "{pv_end}"
GROUP BY
    IF(
        project IN ("mediawiki", "wikidata"),
        CONCAT("www.", project, ".org"),
        CONCAT(project, ".org")
    )
""".format(**query_vars))

                                                                                

In [16]:
check(pv, index_col="domain_name")

Unnamed: 0,domain_name,monthly_pageviews,mobile_web_pageviews,mobile_app_pageviews
0,en.wikipedia.org,7244414000.0,0.614975,0.023044
1,es.wikipedia.org,842508000.0,0.689464,0.00744
2,ja.wikipedia.org,994408300.0,0.662674,0.0101
3,de.wikipedia.org,816103700.0,0.549553,0.049144
4,fr.wikipedia.org,754624100.0,0.536628,0.015572
5,ru.wikipedia.org,909630300.0,0.641799,0.014268


In [17]:
merge_in(pv, on="domain_name")

## Monthly editors

In [18]:
me = wmf.spark.run("""
SELECT
    wiki AS database_code,
    COUNT(*) / 12 AS monthly_editors,
    SUM(CAST(content_edits >= 5 AS INT)) / 12 AS monthly_active_editors,
    SUM(CAST(
        content_edits >= 5
        AND TRUNC(user_registration, 'MM') = TRUNC(month, 'MM') 
    AS INT)) / 12 AS monthly_new_active_editors
FROM wmf_product.editor_month
WHERE
    month >= "{start}" 
    AND month < "{end}" 
    AND user_id != 0
    -- Despite the name, this field identifies bots using both the name and group strategies
    AND NOT bot_by_group
GROUP BY wiki
""".format(**query_vars))

                                                                                

In [19]:
check(me)

Unnamed: 0,database_code,monthly_editors,monthly_active_editors,monthly_new_active_editors
0,enwiki,126515.75,31491.75,4522.916667
1,eswiki,15689.916667,4319.166667,916.25
2,jawiki,15324.5,5330.25,857.833333
3,dewiki,18414.166667,5134.916667,355.416667
4,frwiki,18530.25,4933.5,670.75
5,ruwiki,11823.583333,3462.166667,552.666667


In [20]:
merge_in(me)

## Monthly active administrators

In [21]:
maa = wmf.spark.run("""
SELECT
    wiki AS database_code,
    SUM(monthly_active_administrators) / 12 AS monthly_active_administrators
FROM (
    SELECT
        wiki_db AS wiki,
        SUBSTR(log_timestamp, 1, 6) AS month,
        COUNT(DISTINCT log_actor) AS monthly_active_administrators
    FROM wmf_raw.mediawiki_logging
    WHERE
        log_type IN ("block", "delete", "protect", "rights")
        -- Omit the "delete_redir", "move_prot", and "autopromote" actions, which can be done by regular users
        AND log_action NOT IN ("autopromote", "delete_redir", "move_prot")
        AND log_timestamp >= "{pv_start}" 
        AND log_timestamp < "{pv_end}" 
        AND snapshot = "{snapshot}"
    GROUP BY wiki_db, SUBSTR(log_timestamp, 1, 6)
) mae
GROUP BY wiki
""".format(**query_vars))

23/02/07 21:47:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/02/07 21:48:05 WARN DAGScheduler: Broadcasting large task binary with size 1069.7 KiB
                                                                                

In [22]:
check(maa)

Unnamed: 0,database_code,monthly_active_administrators
0,enwiki,408.166667
1,eswiki,46.416667
2,jawiki,32.583333
3,dewiki,130.75
4,frwiki,94.166667
5,ruwiki,89.916667


In [23]:
merge_in(maa)

## Majority-mobile editors proportion

In [24]:
mmep = wmf.spark.run("""
WITH user AS (
    SELECT 
        wiki_db AS database_code,
        SUM(CAST(ARRAY_CONTAINS(revision_tags, "mobile edit") AS INT))
            / COUNT(*) AS mobile_editing_proportion
    FROM wmf.mediawiki_history
    WHERE
        event_entity = "revision" 
        AND event_type = "create" 
        AND snapshot = "{snapshot}" 
        AND event_timestamp >= "{start}" 
        AND event_timestamp < "{end}"
        AND NOT event_user_is_anonymous
        AND SIZE(event_user_is_bot_by_historical) = 0
    GROUP BY
        wiki_db,
        event_user_text
)
SELECT
   database_code,
   SUM(CAST(mobile_editing_proportion > 0.5 AS INT))
       / COUNT(*) AS majority_mobile_editors
FROM user
GROUP BY database_code
""".format(**query_vars))

                                                                                

In [25]:
check(mmep)

Unnamed: 0,database_code,majority_mobile_editors
0,enwiki,0.266922
1,eswiki,0.323329
2,jawiki,0.300403
3,dewiki,0.161901
4,frwiki,0.202347
5,ruwiki,0.369455


In [26]:
merge_in(mmep)

## New editor retention

In [27]:
ner = wmf.spark.run(
"""
SELECT
    wiki AS database_code,
    SUM(CAST(2nd_month_edits >= 1 AS INT))
        / SUM(CAST(1st_month_edits >= 1 AS INT)) AS second_month_new_editor_retention
FROM wmf_product.new_editors
WHERE 
    cohort >= "{ner_cohort_start}" and
    cohort < "{ner_cohort_end}"
GROUP BY wiki
""".format(**query_vars))

                                                                                

In [28]:
check(ner, "database_code")

Unnamed: 0,database_code,second_month_new_editor_retention
0,enwiki,0.077388
1,eswiki,0.049207
2,jawiki,0.106374
3,dewiki,0.075687
4,frwiki,0.061977
5,ruwiki,0.06096


In [29]:
merge_in(ner)

## Monthly non-bot edits

In [30]:
mnbe = wmf.spark.run("""
SELECT
    CONCAT(project, ".org") AS domain_name,
    SUM(edit_count) / 12 AS monthly_nonbot_edits,
    SUM(CASE WHEN ARRAY_CONTAINS(revision_tags, "mobile edit") THEN edit_count END)
        / SUM(edit_count) AS mobile_edits,
    SUM(CASE WHEN ARRAY_CONTAINS(revision_tags, "visualeditor") THEN edit_count END)
        / SUM(edit_count) AS visual_edits,
    SUM(CASE WHEN user_is_anonymous THEN edit_count END) / SUM(edit_count) AS anonymous_edits
FROM wmf.edit_hourly
WHERE
    ts >= "{start}"
    AND ts < "{end}"
    AND NOT user_is_bot
    AND snapshot = "{snapshot}"
GROUP BY CONCAT(project, ".org")
""".format(**query_vars))

                                                                                

In [31]:
check(mnbe, index_col="domain_name")

Unnamed: 0,domain_name,monthly_nonbot_edits,mobile_edits,visual_edits,anonymous_edits
0,en.wikipedia.org,4246871.0,0.151764,0.09239,0.162143
1,es.wikipedia.org,563508.9,0.261083,0.164849,0.265549
2,ja.wikipedia.org,442413.9,0.262757,0.151111,0.23874
3,de.wikipedia.org,699083.4,0.055543,0.110093,0.089826
4,fr.wikipedia.org,707189.2,0.110177,0.172266,0.119487
5,ru.wikipedia.org,500737.0,0.142449,0.157279,0.181969


In [32]:
merge_in(mnbe, on="domain_name")

## Bot editing proportion

In [33]:
bep = wmf.spark.run("""
SELECT
   CONCAT(project, ".org") AS domain_name,
   SUM(CASE WHEN user_is_bot THEN edit_count END)
        / SUM(edit_count) AS bot_edits
FROM wmf.edit_hourly
WHERE
    ts  >= "{start}" 
    AND ts  < "{end}"
    AND snapshot = "{snapshot}"
GROUP BY CONCAT(project, ".org")
""".format(**query_vars))

                                                                                

In [34]:
check(bep, index_col="domain_name")

Unnamed: 0,domain_name,bot_edits
0,en.wikipedia.org,0.245877
1,es.wikipedia.org,0.111687
2,ja.wikipedia.org,0.09167
3,de.wikipedia.org,0.149992
4,fr.wikipedia.org,0.195407
5,ru.wikipedia.org,0.292654


In [35]:
merge_in(bep, on="domain_name")

## Revert rate

In [36]:
rr = wmf.spark.run("""
    SELECT
        wiki_db AS database_code,
        SUM(CAST(revision_is_identity_reverted AS INT)) / COUNT(*) AS revert_rate
    FROM
        wmf.mediawiki_history
    WHERE
        event_entity = "revision" 
        AND event_type = "create" 
        AND snapshot = "{snapshot}" 
        AND event_timestamp >= "{start}" 
        AND event_timestamp < "{end}" 
        AND SIZE(event_user_is_bot_by_historical) = 0 
    GROUP BY wiki_db
""".format(**query_vars))

                                                                                

In [37]:
check(rr)

Unnamed: 0,database_code,revert_rate
0,enwiki,0.10452
1,eswiki,0.166822
2,jawiki,0.069074
3,dewiki,0.07257
4,frwiki,0.061446
5,ruwiki,0.121988


In [38]:
merge_in(rr)

## Edits Gini coefficient

In [39]:
user_edits = wmf.spark.run("""
SELECT
    wiki_db AS wiki,
    COUNT(*) AS user_edits
FROM
    wmf.mediawiki_history
WHERE
    event_entity = "revision" 
    AND event_type = "create" 
    AND snapshot = "{snapshot}" 
    AND event_timestamp >= "{start}" 
    AND event_timestamp < "{end}" 
    AND SIZE(event_user_is_bot_by_historical) = 0
GROUP BY
    wiki_db,
    event_user_id
""".format(**query_vars))

                                                                                

In [40]:
# from https://github.com/oliviaguest/gini
# licensed under CC0 (public domain)
def gini(array):
    """Calculate the Gini coefficient of a numpy array."""
    # based on bottom eq:
    # http://www.statsdirect.com/help/generatedimages/equations/equation154.svg
    # FROM:
    # http://www.statsdirect.com/help/default.htm#nonparametric_methods/gini.htm
    # All values are treated equally, arrays must be 1d:
    array = array.flatten()
    if np.amin(array) < 0:
        # Values cannot be negative:
        array -= np.amin(array)
    # Values cannot be 0:
    array = array + 0.0000001
    # Values must be sorted:
    array = np.sort(array)
    # Index per array element:
    index = np.arange(1, array.shape[0] + 1)
    # Number of array elements:
    n = array.shape[0]
    # Gini coefficient:
    return ((np.sum((2 * index - n - 1) * array)) / (n * np.sum(array)))

egc = (
    user_edits
    .groupby("wiki")
    .apply(lambda g: gini(g["user_edits"].values))
    .reset_index()
)

egc.columns = ["database_code", "edits_Gini_coefficient"]

In [42]:
check(egc)

Unnamed: 0,database_code,edits_Gini_coefficient
0,enwiki,0.953873
1,eswiki,0.957572
2,jawiki,0.936955
3,dewiki,0.959207
4,frwiki,0.960226
5,ruwiki,0.961327


In [43]:
merge_in(egc)

## Content pages

Note that this query gives the number of content pages _at query time_, not at the end of the snapshot period. Unless the gap between those times is many months, the difference should be pretty small.

Ideally, we would query `mediawiki_history` or the AQS API instead for the count as of the end of the snapshot period. However, this would introduce a new problem: the official [content pages definition](https://www.mediawiki.org/wiki/Manual:Article_count) specifies that pages (in addition to being in a content namespace, not being deleted, and not being a redirect) must also contain one internal link. This information isn't available in `mediawiki_history`, and the article count available through the AQS API probably doesn't take it into account either.


In [None]:
wikis_list = wikis["database_code"].tolist()

ac = wmf.mariadb.run("""
SELECT
    DATABASE() AS database_code,
    ss_good_articles AS content_pages
FROM site_stats
""", wikis_list)

# Some wikis have sharded site_stats tables, which means the output will have multiple rows
# for that wiki that need to be summed up.
# Documented on https://www.mediawiki.org/wiki/Manual:Site_stats_table
ac = ac.groupby("database_code").sum().reset_index()

In [53]:
check(ac)

Unnamed: 0,database_code,content_pages
0,enwiki,6614510
1,eswiki,1837386
2,jawiki,1361481
3,dewiki,2769985
4,frwiki,2492047
5,ruwiki,1892103


In [54]:
merge_in(ac)

## All-time content edits

In [57]:
atce = wmf.spark.run("""
SELECT
    wiki_db AS database_code,
    COUNT(*) AS all_time_content_edits
FROM
    wmf.mediawiki_history
WHERE
    event_entity = "revision" 
    AND event_type = "create" 
    AND snapshot = "{snapshot}" 
    AND page_namespace_is_content
    AND SIZE(event_user_is_bot_by_historical) = 0 
GROUP BY wiki_db
""".format(**query_vars))

                                                                                

In [58]:
check(atce)

Unnamed: 0,database_code,all_time_content_edits
0,enwiki,696533897
1,eswiki,101324297
2,jawiki,71951024
3,dewiki,141779143
4,frwiki,117511241
5,ruwiki,77248137


In [59]:
merge_in(atce)

## Script direction

In [60]:
rtl_wikis = get_dblist("rtl")
rtl = pd.DataFrame({"database_code": rtl_wikis, "script_direction": "right-to-left"})
merge_in(rtl)
wikis["script_direction"] = wikis["script_direction"].replace({0: "left-to-right"})

## Unique devices per editor

In [61]:
wikis["unique_devices_per_editor"] = wikis["monthly_unique_devices"] / wikis["monthly_editors"]

In [62]:
wikis = wikis.replace([np.inf], 0)

## Overall size rank

In [63]:
size = np.sqrt(wikis["monthly_unique_devices"] * wikis["monthly_active_editors"])
rank = size.rank(method="min", na_option="bottom", ascending=False)
wikis["overall_size_rank"] = rank

## Edits per content page

In [64]:
wikis["all_time_edits_per_content_page"] = wikis["all_time_content_edits"] / wikis["content_pages"]

# Readying for output

In [65]:
wikis_formatted = (
    wikis[[
        "overall_size_rank",
        "monthly_unique_devices",
        "mobile_unique_devices",
        "monthly_pageviews",
        "mobile_web_pageviews",
        "mobile_app_pageviews",
        "unique_devices_per_editor",
        "monthly_editors",
        "majority_mobile_editors",
        "monthly_active_editors",
        "monthly_active_administrators",
        "monthly_new_active_editors",
        "second_month_new_editor_retention",
        "bot_edits",
        "monthly_nonbot_edits",
        "mobile_edits",
        "visual_edits",
        "anonymous_edits",
        "revert_rate",
        "edits_Gini_coefficient",
        "content_pages",
        "all_time_content_edits",
        "all_time_edits_per_content_page",
        "script_direction",
        "database_code",
        "project_code",
        "language_code",
        "language_name",
        "domain_name",
        "wiki_name"
    ]].astype({
        # These columns are not monthly averages and will never have a fractional component
        "overall_size_rank": int,
        "content_pages": int,
        "all_time_content_edits": int
    })
    .sort_values("overall_size_rank")
)

# Convert underscores to spaces in column names in the final step, to avoid having to
# work with spaces beforehand
wikis_formatted = wikis_formatted.rename(columns=lambda c: c.replace("_", " "))

## Output CSV

In [66]:
wikis_formatted.to_csv(
    f"snapshots/{file_stem}.csv",
    float_format="%0.4f",
    index=False
)