In [1]:
import numpy as np
import pandas as pd
import wmfdata as wmf

pd.options.display.max_columns = None

import warnings
import json
import time




You are using Wmfdata v2.0.0, but v2.0.1 is available.

To update, run `pip install --upgrade git+https://github.com/wikimedia/wmfdata-python.git@release`.

To see the changes, refer to https://github.com/wikimedia/wmfdata-python/blob/release/CHANGELOG.md.


In [2]:
snapshot = '2023-08'

## spark_session

In [3]:
spark_session = wmf.spark.get_active_session()

if type(spark_session) != type(None):
    spark_session.stop()
else:
    print('no active session')

no active session


In [4]:
spark_session = wmf.spark.create_custom_session(
    master="yarn",
    app_name='cws-user-data',
    spark_config={
        "spark.driver.memory": "4g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "16g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 256,
        "spark.driver.maxResultSize": "2g"
        
    }
)

SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/21 07:11:04 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/09/21 07:11:05 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/09/21 07:11:05 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
23/09/21 07:11:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/21 07:11:05 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/09/21 07:11:13 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
23/09/21 07:11:13 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on 

In [5]:
spark_session

In [6]:
spark_session.sparkContext.setLogLevel("ERROR")

## query

In [7]:
cws_links = pd.read_csv('data/cws_page_links.tsv', sep='\t')
cws_links.head()

Unnamed: 0,year,data_end,main_page,main_category,category_subpages,proposals_category,proposal_template,results_page,archive_page,ar_contains_reject_reason,notes
0,2015,2015-12-31,Community Wishlist Survey 2015,Community Wishlist Survey 2015,Community Wishlist Survey 2015,,non_standard,Community Wishlist Survey 2015/Results,Community Wishlist Survey 2015/Archive,False,proposals listed as sections; discussions in ...
1,2016,2016-12-31,Community Wishlist Survey 2016,Community Wishlist Survey 2016,Community Wishlist Survey 2016/Proposals,,non_standard,Community Wishlist Survey 2016/Results,Community Wishlist Survey 2016/Archive,False,proposals listed as sections; sections were a...
2,2017,2017-12-31,Community Wishlist Survey 2017,Community Wishlist Survey 2017,Community Wishlist Survey 2017/Categories,Community Wishlist Survey 2017/Proposals,standard,Community Wishlist Survey 2017/Results,Community Wishlist Survey 2017/Archive,True,proposals as subpages of categories; standard ...
3,2018,,,,,,,,,,
4,2019,2018-11-30,Community Wishlist Survey 2019,Community Wishlist Survey 2019,Community Wishlist Survey 2019/Categories,Community Wishlist Survey 2019/Proposals,standard,Community Wishlist Survey 2019/Results,Community Wishlist Survey 2019/Archive,True,proposals as subpages of categories; standard ...


In [8]:
with open('data/01-cws_proposals_data.json') as file:
    cws_data = json.load(file)

In [9]:
user_data_query = """
WITH
    yearly_edits AS (
        SELECT
            event_user_text AS username,
            wiki_db,
            SUM(IF(wiki_db = 'wikidatawiki', 0.1, 1)) AS edit_count,
            MAX(event_timestamp) AS last_edit
        FROM 
            wmf.mediawiki_history
        WHERE 
            snapshot='{HIVE_SNAPSHOT}' 
            AND NOT event_user_is_anonymous
            AND event_type = 'create'
            AND event_entity = 'revision'
            AND event_user_text IN {USERS_LIST}
            AND DATE(event_timestamp) BETWEEN DATE_SUB('{END_OF_DATA}', 365*2) AND DATE('{END_OF_DATA}')
        GROUP BY 
            event_user_text, 
            wiki_db
    ),

    home_wiki_ranked AS (
        SELECT 
            *,
            ROW_NUMBER() OVER (
                PARTITION BY username 
                ORDER BY edit_count DESC, last_edit DESC) AS rank
        FROM yearly_edits
    ),

    home_wiki AS (
        SELECT username, wiki_db AS home_wiki
        FROM home_wiki_ranked
        WHERE rank = 1
    ),

    edit_bucket_data AS (
        SELECT
            mwh.revision_id,
            mwh.event_user_text,
            mwh.event_user_revision_count AS edit_count,
            CASE
                WHEN mwh.event_user_revision_count < 100 THEN '0-99'
                WHEN mwh.event_user_revision_count BETWEEN 100 AND 499 THEN '100-499'
                WHEN mwh.event_user_revision_count BETWEEN 1000 AND 4999 THEN '1000-4999'
                ELSE '5000+'
            END AS edit_bucket,
            mwh.event_timestamp,
            ROW_NUMBER() OVER (
                PARTITION BY mwh.event_user_text, mwh.wiki_db 
                ORDER BY mwh.event_timestamp DESC) AS rank,
            CASE 
                WHEN mwh.wiki_db = 'metawiki' THEN NULL 
                ELSE ARRAY_DISTINCT(ARRAY_UNION(mwh.event_user_groups, mwh.event_user_groups_historical)) 
            END AS user_groups,
            mwh.wiki_db,
            hw.home_wiki
        FROM 
            wmf.mediawiki_history mwh
        JOIN 
            home_wiki hw 
            ON mwh.event_user_text = hw.username
        WHERE 
            mwh.snapshot = '2023-08'
            AND mwh.event_user_text IN {USERS_LIST}
            AND DATE(mwh.event_timestamp) <= DATE('{END_OF_DATA}')
            AND (mwh.wiki_db = hw.home_wiki 
                OR mwh.wiki_db = 'metawiki')
    ),

    home_wiki_activity AS (
        SELECT 
            * 
        FROM 
            edit_bucket_data
        WHERE 
            wiki_db = home_wiki 
            AND rank = 1
    ),
    
    meta_wiki_activity AS (
        SELECT 
            * 
        FROM 
            edit_bucket_data
        WHERE 
            wiki_db = 'metawiki' 
            AND rank = 1
    )

SELECT
    hw.event_user_text AS username,
    hw.home_wiki,
    hw.edit_count AS hw_edit_count,
    hw.edit_bucket AS hw_edit_bucket,
    hw.user_groups AS hw_user_groups,
    mw.edit_count AS mw_edit_count,
    mw.edit_bucket AS mw_bucket
FROM 
    home_wiki_activity hw
JOIN 
    meta_wiki_activity mw 
    ON hw.event_user_text = mw.event_user_text
"""

guc_query = """
SELECT
    gu_name AS username,
    TIMESTAMPDIFF(MONTH, gu_registration, NOW()) AS account_age_months
FROM
    globaluser
WHERE
    gu_name IN {USERS_LIST}
"""

In [10]:
def users_by_year(data):
    
    users_by_year = {}
    
    for year, categories in data.items():
        
        users = set()
        
        for category in categories.values():
            for proposal in category.values():
                for key in ['proposer', 'discussion_participants', 'voters']:
                    participants = proposal.get(key, [])
                    if participants != None:
                        users.update(participants)
        
        users_by_year[year] = list(users)
        
    return users_by_year

cws_users_by_year = users_by_year(cws_data)

In [14]:
def get_user_data(survey_year, cws_info=cws_links, usernames=cws_users_by_year, user_data_query=user_data_query, guc_query=guc_query):
    
    data_end = cws_info.query("""year == @survey_year""")['data_end'].values[0]
    participants = wmf.utils.sql_tuple(usernames[str(survey_year)])

    users_data = wmf.spark.run(user_data_query.format(HIVE_SNAPSHOT=snapshot, END_OF_DATA=data_end, USERS_LIST=participants))
    guc_data = wmf.mariadb.run(guc_query.format(USERS_LIST=participants), dbs='centralauth', use_x1=True)
    
    users_data = pd.merge(users_data, guc_data, how='left', on='username')
    
    users_data['account_age_years'] = round(users_data['account_age_months'] / 12, 2)    
    users_data['survey_year'] = survey_year
    
    return users_data

In [15]:
%%time
warnings.filterwarnings('ignore')
users_data = pd.DataFrame()

for year in range(2015, 2023+1):
    if year != 2018:
        
        start_time = time.time()
        
        user_data_yearly = get_user_data(year)
        user_data_yearly.to_csv(f'secrets/cws_user_data_{year}.tsv', sep='\t')
        
        users_data = pd.concat([users_data, user_data_yearly], ignore_index=True)
        users_data.to_csv(f'secrets/cws_user_data_merged.tsv', sep='\t')
        
        end_time = time.time()
        elapsed_time = round((end_time - start_time)/60, 2)
        print(f"{year} data was extracted in {elapsed_time} minutes.")

                                                                                ]]

2015 data was extracted in 2.68 minutes.


                                                                                ]]

2016 data was extracted in 2.5 minutes.


                                                                                6]]]]

2017 data was extracted in 2.12 minutes.


                                                                                92]]]

2019 data was extracted in 4.16 minutes.


                                                                                ]]]]]

2020 data was extracted in 4.28 minutes.


                                                                                56]]]

2021 data was extracted in 5.93 minutes.


                                                                                ]]]]

2022 data was extracted in 6.73 minutes.


                                                                                2]]]]

2023 data was extracted in 4.82 minutes.
CPU times: user 5.98 s, sys: 1.35 s, total: 7.33 s
Wall time: 33min 13s
