In [13]:
#!/usr/bin/env python3
import wmfdata as wmf
import pandas as pd

spark = wmf.spark.get_custom_session(
    master="yarn",
    spark_config={
        "spark.driver.memory": "16g",
        "spark.dynamicAllocation.maxExecutors": 128,
        "spark.executor.memory": "16g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 512
    }
)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [15]:
LANGUAGE_EDITIONS = "('enwiki','eswiki','jawiki','arwiki','dewiki','ptwiki','viwiki','frwiki','cawiki','ruwiki','idwiki','bnwiki','hiwiki','cswiki')"

query = """
WITH active_editors AS (
SELECT
    cast(substr(event_timestamp,1,4) as int) as year, 
    cast(substr(event_timestamp,6,2) as int) as month,
    wiki_db,
    COUNT(DISTINCT event_user_id) AS num_active_editors
FROM wmf.mediawiki_history  
WHERE snapshot = '2022-07'
AND wiki_db in {0}
AND array_contains(event_user_groups,'bot') = FALSE
AND revision_is_identity_reverted = FALSE
GROUP BY
    cast(substr(event_timestamp,1,4) as int),
    cast(substr(event_timestamp,6,2) as int),
    wiki_db
),

bytes_added AS (
SELECT
    cast(substr(event_timestamp ,1,4) as int) as year, 
    cast(substr(event_timestamp,6,2) as int) as month,
    wiki_db,
    SUM(revision_text_bytes_diff) AS num_bytes_added
FROM wmf.mediawiki_history   
WHERE snapshot = '2022-07'
AND wiki_db in {0}
AND array_contains(event_user_groups,'bot') = FALSE
AND revision_is_identity_reverted = FALSE
AND revision_text_bytes_diff > 0
GROUP BY
    cast(substr(event_timestamp,1,4) as int),
    cast(substr(event_timestamp,6,2) as int),
    wiki_db
),

articles_created AS (
SELECT
    cast(substr(start_timestamp ,1,4) as int) as year, 
    cast(substr(start_timestamp,6,2) as int) as month,
    wiki_db,
    COUNT(*) AS num_articles_created
FROM wmf.mediawiki_page_history 
WHERE snapshot = '2022-07'
AND page_namespace = 1
AND wiki_db in {0}
GROUP BY
    cast(substr(start_timestamp,1,4) as int),
    cast(substr(start_timestamp,6,2) as int),
    wiki_db
),

articles_deleted AS (
SELECT
    cast(substr(end_timestamp ,1,4) as int) as year, 
    cast(substr(end_timestamp,6,2) as int) as month,
    wiki_db,
    COUNT(*) AS num_articles_deleted
FROM wmf.mediawiki_page_history 
WHERE snapshot = '2022-07'
AND page_namespace = 1
AND wiki_db in {0}
GROUP BY
    cast(substr(end_timestamp,1,4) as int),
    cast(substr(end_timestamp,6,2) as int),
    wiki_db
),

num_articles AS (
SELECT
    articles_created.month,
    articles_created.year,
    articles_created.wiki_db,
    COALESCE(articles_deleted.num_articles_deleted,0) AS articles_deleted,
    articles_created.num_articles_created,
    articles_created.num_articles_created-COALESCE(articles_deleted.num_articles_deleted,0) AS article_diff,
    SUM(articles_created.num_articles_created-COALESCE(articles_deleted.num_articles_deleted,0)) OVER (PARTITION BY articles_created.wiki_db ORDER BY articles_created.year, articles_created.month) AS num_articles 
FROM articles_created
LEFT JOIN articles_deleted
ON (articles_created.month = articles_deleted.month AND articles_created.year = articles_deleted.year AND articles_created.wiki_db = articles_deleted.wiki_db)
WHERE articles_created.month IS NOT NULL
AND articles_created.year IS NOT NULL
)

SELECT active_editors.month, active_editors.year, active_editors.wiki_db, active_editors.num_active_editors, bytes_added.num_bytes_added, num_articles.num_articles
FROM active_editors
INNER JOIN bytes_added
ON active_editors.year = bytes_added.year AND active_editors.month = bytes_added.month AND active_editors.wiki_db = bytes_added.wiki_db
INNER JOIN num_articles
ON active_editors.year = num_articles.year AND active_editors.month = num_articles.month AND active_editors.wiki_db = num_articles.wiki_db
""".format(
    LANGUAGE_EDITIONS
)

df = wmf.spark.run(query)
df

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
[Stage 33:=====>                                               (28 + 228) / 256]6144]]22/08/23 21:01:55 WARN TaskSetManager: Lost task 40.0 in stage 33.0 (TID 44842, an-worker1094.eqiad.wmnet, executor 368): FetchFailed(BlockManagerId(378, analytics1066.eqiad.wmnet, 7337, None), shuffleId=22, mapId=91, reduceId=40, message=
org.apache.spark.shuffle.FetchFailedException: java.util.concurrent.TimeoutException: Timeout waiting for task.
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$11.hasN

Unnamed: 0,month,year,wiki_db,num_active_editors,num_bytes_added,num_articles
0,9,2004,enwiki,9864,320410731,72187
1,9,2004,eswiki,587,16554581,2295
2,4,2007,viwiki,757,16012165,6400
3,9,2008,cawiki,1098,29859840,16355
4,1,2009,cawiki,1594,33984840,21061
...,...,...,...,...,...,...
3266,10,2014,viwiki,1171,24488060,1091677
3267,6,2017,bnwiki,486,26183234,61545
3268,7,2017,eswiki,14072,132919273,501030
3269,12,2019,arwiki,4198,87225529,1064872


In [41]:
FILEPATH = '/home/jmads/datasets/momentum/language_edition_first_edit_timestamps_8-23-22.csv'
min_age = pd.read_csv(FILEPATH)

def getWikiAge(df,min_age_df):
    min_age_df = min_age_df.rename(columns={'year':'year_1','month':'month_1'})
    df = df.merge(min_age_df, on='wiki_db')
    df['wiki_age'] = df['year'].multiply(12).add(df['month']).subtract(df['year_1'].multiply(12).add(df['month_1']))
    return df

result_df = df.copy()
result_df = getWikiAge(result_df,min_age)
result_df = result_df.sort_values(['wiki_db','year','month'])

In [50]:
FILEPATH = '/home/jmads/datasets/momentum/active_editors_content_added_multi-wiki_8-24-22.csv'

result_df.to_csv(FILEPATH,index=False)