# Analytics for Section Level Image Suggestions Notifications¶

[T331290](https://phabricator.wikimedia.org/T331290)

# Purpose

Section-level image suggestion were sent out on July 5th, 2023. These suggestions are sent weekly as notifications to experienced contributors for pages on their watchlist and are page level, section level or bundled together.

In this report, we will take a look at data in July 2023 from Structured Content's pilot wikis: CA, PT, RU, ID, NO, HU, FI wikis and answer the following question:

- Number of notifications sent
    - Filtered by article vs section-level suggestion
- Percentage of notifications read
    - Filtered by article vs section-level suggestion
- Notificaton opt out rate
- Number of images suggested that are added to the matched article within a month of receiving the notification
    - Filtered by article vs section-level suggestion
- Revert rate for image additions
    - Filtered by article vs section-level suggestion

# Data Preparation

In [1]:
import re

from wmfdata import hive, mariadb, spark
import wmfdata 

import math
import pandas as pd
import numpy as np

from datetime import datetime, timedelta, date




The check for a newer release of Wmfdata failed to complete. Consider checking manually.


In [2]:
spark_session = wmfdata.spark.create_session(app_name='pyspark regular; media-changes',
                                  type='yarn-large')  

SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/04 18:05:28 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
23/09/04 18:05:28 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
23/09/04 18:05:28 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
23/09/04 18:05:28 WARN Utils: Service 'sparkDriver' could not bind on port 12002. Attempting port 12003.
23/09/04 18:05:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/04 18:05:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/09/04 18:05:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/09/04 18:05:36 WARN Utils: Service 'org.apache.spark.netwo

## Parameters

In [28]:
mw_snapshot = '2023-07'  
wiki_dbs = ('ptwiki', 'ruwiki', 'idwiki','fiwiki', 'huwiki','cawiki', 'nowiki')

wiki_db_str = "('" + "','".join(wiki_dbs) + "')"  # otherwise single wiki leads to confusing syntax errors
wiki_db_list = list(wiki_dbs)

notification_timestamp = 20220720000000

#notification ts
m_start_timestamp = 20230701000000
n_start_timestamp = 20230601000000
n_end_timestamp = 20230801000000

##edits ts
start_timestamp = '2023-07-01' 
end_timestamp = '2023-08-01'

media_list_table = 'cchen.media_july_2023'

## Notification Data

In [4]:
notification_query = """

SELECT
   notification_event,
   notification_bundle_hash,
   notification_user,
   notification_timestamp,
   notification_read_timestamp,
   event_extra,
   event_page_id
FROM echo_notification
JOIN echo_event on event_id = notification_event
WHERE notification_timestamp >= {start_timestamp}
  AND notification_timestamp < {end_timestamp}
  AND event_type = 'image-suggestions'

"""

In [5]:
notification_data = pd.DataFrame()

for i in range(len(wiki_db_list)):
        
    print('getting data for %s' % wiki_db_list[i])
    
    
    noti_result = mariadb.run( 
        notification_query.format(
            start_timestamp = n_start_timestamp,
            end_timestamp = n_end_timestamp
        ),wiki_db_list[i],'wikishared','pandas'
    )
    
    noti_result.insert(0, 'wiki_db', wiki_db_list[i])
    
    notification_data = pd.concat([notification_data, noti_result], sort=False)

getting data for ptwiki




getting data for ruwiki




getting data for idwiki




getting data for fiwiki




getting data for huwiki




getting data for cawiki




getting data for nowiki




In [6]:
notification_data['event_extra'] = notification_data['event_extra'].astype(str)
# extract name of suggested images from data
notification_data['suggested_image'] = notification_data['event_extra'].str.extract(r'(?<=File:)(.+?)(?=\")')
# filtering article level image suggestion notifications
notification_data['is_article_level'] = notification_data['event_extra'].str.endswith(";N;}")

In [7]:
# store in global temp view
notification_sdf = spark_session.createDataFrame(notification_data)
notification_sdf.createGlobalTempView("notification_data")

## User Pref Data¶

In [8]:
pref_query = """

SELECT 
    up_property,
    up_value,
    up_user AS local_user_id
FROM user_properties
WHERE up_property like '%image_suggestions%'

"""

In [9]:
local_pref_data = pd.DataFrame()

for i in range(len(wiki_db_list)):
        
    print('getting data for %s' % wiki_db_list[i])
    
    data = pd.DataFrame()
    
    pref_result = mariadb.run( 
        pref_query,wiki_db_list[i]
    )
    
    pref_result.insert(0, 'wiki_db', wiki_db_list[i])
    
    local_pref_data = pd.concat([local_pref_data, pref_result], sort=False)

getting data for ptwiki




getting data for ruwiki




getting data for idwiki




getting data for fiwiki




getting data for huwiki




getting data for cawiki




getting data for nowiki




In [10]:
## convert up_value to a string in order to store in GlobalTempView
local_pref_data['up_value'] = local_pref_data['up_value'].astype(str)

In [11]:
# store in global temp view
local_pref_sdf = spark_session.createDataFrame(local_pref_data)
local_pref_sdf.createGlobalTempView("local_pref_temp")

In [12]:
global_pref_query = """

SELECT
    gp_user,
    gp_property,
    gp_value,
    lu_wiki,
    lu_local_id
FROM global_preferences gp RIGHT JOIN localuser lu ON gp_user = lu_global_id
WHERE gp_property LIKE '%image-suggestions%'
  AND lu_wiki IN {wiki_db}
  
"""

In [13]:
global_pref_data = mariadb.run( 
        global_pref_query.format(
          wiki_db = wiki_dbs
        ),'centralauth','pandas'
    )



In [14]:
## convert up_value to a string in order to store in GlobalTempView
global_pref_data['gp_value'] = global_pref_data['gp_value'].astype(str)

In [15]:
global_pref_sdf = spark_session.createDataFrame(global_pref_data)
global_pref_sdf.createGlobalTempView("global_pref_temp")

## Notification User Data

In [16]:
notification_user_query = """
SELECT 
   notification_event,
   notification_user
FROM echo_notification
JOIN echo_event on event_id = notification_event
WHERE notification_timestamp >= {start_timestamp}
  AND notification_timestamp < {end_timestamp}
  AND event_type = 'image-suggestions'
"""

In [17]:
notification_user_data = pd.DataFrame()

for i in range(len(wiki_db_list)):
        
    print('getting data for %s' % wiki_db_list[i])
    
    
    noti_result = mariadb.run( 
        notification_user_query.format(
            start_timestamp = notification_timestamp,
            end_timestamp = n_end_timestamp
        ),wiki_db_list[i],'wikishared','pandas'
    )
    
    noti_result.insert(0, 'wiki_db', wiki_db_list[i])
    
    notification_user_data = pd.concat([notification_user_data, noti_result], sort=False)

getting data for ptwiki




getting data for ruwiki




getting data for idwiki




getting data for fiwiki




getting data for huwiki




getting data for cawiki




getting data for nowiki




In [18]:
# store in global temp view
notification_user_sdf = spark_session.createDataFrame(notification_user_data)
notification_user_sdf.createGlobalTempView("notification_user_data")

## Image Edtis Data

In [19]:
MEDIA_PREFIXES = ['File', 'Image', 'Media']

MEDIA_ALIASES= {
    "ar": ["ميديا", "صورة", "وسائط", "ملف"],
    "bn": ["চিত্র", "মিডিয়া"],
    "bpy": ["ছবি", "মিডিয়া"],
    "br": ["Skeudenn", "Restr"],
    "bs": ["Mediji", "Slika", "Datoteka", "Medija"],
    "bug": ["Gambar", "Berkas"],
    "bxr": ["Файл", "Меди", "Изображение"],
    "ca": ["Fitxer", "Imatge"],
    "ce": ["Хlум", "Медиа", "Сурт", "Файл", "Медйа", "Изображение"],
    "ceb": ["Payl", "Medya", "Imahen"],
    "ch": ["Litratu"],
    "ckb": ["میدیا", "پەڕگە"],
    "co": ["Immagine"],
    "crh": ["Медиа", "Resim", "Файл", "Fayl", "Ресим"],
    "cs": ["Soubor", "Média", "Obrázok"],
    "csb": ["Òbrôzk", "Grafika"],
    "cu": ["Видъ", "Ви́дъ", "Дѣло", "Срѣдьства"],
    "cv": ["Медиа", "Ӳкерчĕк", "Изображение"],
    "cy": ["Delwedd"],
    "da": ["Billede", "Fil"],
    "de": ["Medium", "Datei", "Bild"],
    "din": ["Ciɛl", "Apamduööt"],
    "diq": ["Medya", "Dosya"],
    "dsb": ["Wobraz", "Dataja", "Bild", "Medija"],
    "dty": ["चित्र", "मिडिया"],
    "dv": ["ފައިލު", "މީޑިއާ", "ފައިލް"],
    "el": ["Εικόνα", "Αρχείο", "Μέσο", "Μέσον"],
    "eml": ["Immagine"],
    "eo": ["Dosiero", "Aŭdvidaĵo"],
    "es": ["Medio", "Archivo", "Imagen"],
    "et": ["Pilt", "Fail", "Meedia"],
    "eu": ["Irudi", "Fitxategi"],
    "ext": ["Archivu", "Imagen", "Mediu"],
    "fa": ["رسانه", "تصویر", "مدیا", "پرونده", "رسانه‌ای"],
    "ff": ["Média", "Fichier"],
    "fi": ["Kuva", "Tiedosto"],
    "fiu-vro": ["Pilt", "Meediä"],
    "fo": ["Miðil", "Mynd"],
    "fr": ["Média", "Fichier"],
    "frp": ["Émâge", "Fichiér", "Mèdia"],
    "frr": ["Medium", "Datei", "Bild"],
    "fur": ["Immagine", "Figure"],
    "fy": ["Ofbyld"],
    "hak": ["文件", "媒體", "圖像", "檔案"],
    "haw": ["Kiʻi", "Waihona", "Pāpaho"],
    "he": ["תמונה", "קו", "מדיה", "קובץ"],
    "hi": ["मीडिया", "चित्र"],
    "hif": ["file", "saadhan"],
    "hr": ["Mediji", "DT", "Slika", "F", "Datoteka"],
    "hsb": ["Wobraz", "Dataja", "Bild"],
    "ht": ["Imaj", "Fichye", "Medya"],
    "hu": ["Kép", "Fájl", "Média"],
    "hy": ["Պատկեր", "Մեդիա"],
    "ia": ["Imagine", "Multimedia"],
    "id": ["Gambar", "Berkas"],
    "ig": ["Nká", "Midia", "Usòrò", "Ákwúkwó orünotu", "Ákwúkwó_orünotu"],
    "ii": ["媒体文件", "文件", "档案", "图像", "媒体"],
    "ilo": ["Midia", "Papeles"],
    "inh": ["Медиа", "Файл", "Изображение"],
    "io": ["Imajo", "Arkivo"],
    "is": ["Miðill", "Mynd"],
    "it": ["Immagine"],
    "ja": ["メディア", "ファイル", "画像"],
    "jbo": ["velsku", "datnyvei"],
    "jv": ["Barkas", "Medhia", "Gambar", "Médhia"],
    "ka": ["მედია", "სურათი", "ფაილი"],
    "kaa": ["Swret", "Таспа", "سۋرەت", "Taspa", "Su'wret", "Сурет", "تاسپا"],
    "kab": ["Tugna"],
    "kbd": ["Медиа", "Файл"],
    "kbp": ["Média", "Fichier"],
    "kg": ["Fisye"],
    "kk": ["Swret", "سۋرەت", "Таспа", "Taspa", "Сурет", "تاسپا"],
    "kl": ["Billede", "Fiileq", "Fil"],
    "km": ["ឯកសារ", "រូបភាព", "មេឌា", "មីឌា"],
    "kn": ["ಚಿತ್ರ", "ಮೀಡಿಯ"],
    "ko": ["미디어", "파일", "그림"],
    "koi": ["Медиа", "Файл", "Изображение"],
    "krc": ["Медиа", "Файл", "Изображение"],
    "ks": ["میڈیا", "فَیِل"],
    "ksh": ["Beld", "Meedije", "Medie", "Belld", "Medium", "Datei", "Meedijum", "Bild"],
    "ku": ["میدیا", "پەڕگە", "Medya", "Wêne"],
    "kv": ["Медиа", "Файл", "Изображение"],
    "kw": ["Restren"],
    "ky": ["Медиа", "Файл"],
    "nah": ["Mēdiatl", "Īxiptli", "Imagen"],
    "nap": ["Fiùra", "Immagine"],
    "nds": ["Datei", "Bild"],
    "nds-nl": ["Ofbeelding", "Afbeelding", "Bestaand"],
    "ne": ["मीडिया", "चित्र"],
    "new": ["किपा", "माध्यम"],
    "nl": ["Bestand", "Afbeelding"],
    "nn": ["Fil", "Bilde", "Filpeikar"],
    "no": ["Fil", "Medium", "Bilde"],
    "nov": [],
    "nrm": ["Média", "Fichier"],
    "nso": ["Seswantšho"],
    "nv": ["Eʼelyaaígíí"],
    "oc": ["Imatge", "Fichièr", "Mèdia"],
    "olo": ["Kuva", "Medii", "Failu"],
    "or": ["ମାଧ୍ୟମ", "ଫାଇଲ"],
    "os": ["Ныв", "Медиа", "Файл", "Изображение"],
    "pa": ["ਤਸਵੀਰ", "ਮੀਡੀਆ"],
    "pcd": ["Média", "Fichier"],
    "pdc": ["Medium", "Datei", "Bild", "Feil"],
    "pfl": ["Dadai", "Medium", "Datei", "Bild"],
    "pi": ["मीडिया", "पटिमा"],
    "pl": ["Plik", "Grafika"],
    "pms": ["Figura", "Immagine"],
    "pnb": ["میڈیا", "تصویر", "فائل"],
    "pnt": ["Εικόνα", "Αρχείον", "Εικόναν", "Μέσον"],
    "ps": ["انځور", "رسنۍ", "دوتنه"],
    "pt": ["Multimédia", "Ficheiro", "Arquivo", "Imagem"],
    "qu": ["Midya", "Imagen", "Rikcha"],
    "rm": ["Multimedia", "Datoteca"],
    "rmy": ["Fişier", "Mediya", "Chitro", "Imagine"],
    "ro": ["Fişier", "Imagine", "Fișier"],
    "roa-rup": ["Fişier", "Imagine", "Fișier"],
    "roa-tara": ["Immagine"],
    "ru": ["Медиа", "Файл", "Изображение"],
    "rue": ["Медіа", "Медиа", "Файл", "Изображение", "Зображення"],
    "rw": ["Dosiye", "Itangazamakuru"],
    "ta": ["படிமம்", "ஊடகம்"],
    "tcy": ["ಮಾದ್ಯಮೊ", "ಫೈಲ್"],
    "te": ["ఫైలు", "దస్త్రం", "బొమ్మ", "మీడియా"],
    "tet": ["Imajen", "Arquivo", "Imagem"],
    "tg": ["Акс", "Медиа"],
    "th": ["ไฟล์", "สื่อ", "ภาพ"],
    "ti": ["ፋይል", "ሜድያ"],
    "tk": ["Faýl"],
    "tl": ["Midya", "Talaksan"],
    "tpi": ["Fail"],
    "tr": ["Medya", "Resim", "Dosya", "Ortam"],
    "tt": ["Медиа", "Рәсем", "Файл", "Räsem", "Изображение"],
    "ty": ["Média", "Fichier"],
    "tyv": ["Медиа", "Файл", "Изображение"],
    "udm": ["Медиа", "Файл", "Суред", "Изображение"],
    "ug": ["ۋاسىتە", "ھۆججەت"],

}

# https://commons.wikimedia.org/wiki/Commons:File_types
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.svg', '.gif','.tif', '.bmp', '.webp', '.xcf','.djvu', '.pdf']
VIDEO_EXTENSIONS = ['.ogv', '.webm', '.mpg', '.mpeg']
AUDIO_EXTENSIONS = ['.ogg', '.mp3', '.mid', '.webm', '.flac', '.wav', '.oga']
MEDIA_EXTENSIONS = list(set(IMAGE_EXTENSIONS + VIDEO_EXTENSIONS + AUDIO_EXTENSIONS))

In [20]:
exten_regex = ('(' + '|'.join([e + '\\b' for e in MEDIA_EXTENSIONS]) + ')').replace('.', '\.')
extension_pattern = re.compile(f'([\w ,\(\)\.-]+){exten_regex}', flags=re.UNICODE)
bracket_pattern = re.compile('(?<=\[\[)(.*?)(?=\]\])', flags=re.DOTALL)

# NOTE: I explored several approaches to this function and how they impacted speed:
# * mwparserfromhell parsing substantially increases processing time, even compared to many regexes
# * Reducing down the number of extensions considered has a very minimal impact on time
# * Removing the first regex that extracts links has a very minimal impact on time. In theory it should be mostly unnecessary but will catch some rare file extensions.
# * Ignoring upper-case file extensions (e.g., .JPG) by not lower-casing the wikitext and just doing .findall over the iterative .search has very little impact on time

def getMedia(wikitext, wiki_db='enwiki', max_link_length=240):
    """Gather counts of media files found directly in wikitext.
    
    See https://phabricator.wikimedia.org/T299712 for more details.
    Link length: https://commons.wikimedia.org/wiki/Commons:File_naming#Length
    """
    lang = wiki_db.replace('wiki', '')
    try:
        # find standard bracket-syntax links -- this likely could be dropped but adds minimal overhead
        med_prefixes = MEDIA_PREFIXES + MEDIA_ALIASES.get(lang, [])
        links = bracket_pattern.findall(wikitext)
        bracket_links = set([l.split(':', maxsplit=1)[1].split('|', maxsplit=1)[0].strip() for l in links if l.split(':', maxsplit=1)[0] in med_prefixes])
        
        # supplement with links outside brackets as determined via known file extensions
        # lower-case to handle e.g., .JPG instead of .jpg when searching for file extensions
        lc_wt = wikitext.lower()
        exten_links = []
        end = 0
        while True:
            m = extension_pattern.search(lc_wt, pos=end)
            if m is None:
                break
            start, end = m.span()
            exten_links.append(wikitext[start:end].strip())
        return [l.replace('\n', ' ') for l in bracket_links.union(exten_links) if len(l) <= max_link_length]
    except Exception:
        return None
    
spark_session.udf.register('getMedia', getMedia, 'ARRAY<String>')

<function __main__.getMedia(wikitext, wiki_db='enwiki', max_link_length=240)>

In [21]:
def compareMediaLists(curr_media, prev_media):
    """Compare two media lists to determine what changed."""
    try:
        changes = []
        unaligned = set(curr_media) ^ set(prev_media)
        for m in unaligned:
            if m in curr_media:
                changes.append((m, 1))
            elif m in prev_media:
                changes.append((m, -1))
        return changes
    except Exception:
        return None
    
spark_session.udf.register('compareMediaLists', compareMediaLists, 'ARRAY<STRUCT<filename:STRING, action:INT>>')

<function __main__.compareMediaLists(curr_media, prev_media)>

In [29]:
create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {media_list_table} (
        wiki_db                         STRING         COMMENT 'Wiki -- e.g., enwiki for English',
        event_timestamp                 STRING         COMMENT 'When the edits occurred',
        page_id                         INT            COMMENT 'Article page ID',
        user_id                         INT         COMMENT 'User id of who made edit',
        user_text                       STRING         COMMENT 'User name of who made edit',
        revision_id                     BIGINT         COMMENT 'Revision ID',
        revision_parent_id              BIGINT         COMMENT 'Revision ID of parent revision',
        revision_is_identity_reverted   BOOLEAN        COMMENT 'Was revision reverted?',
        revision_is_identity_revert     BOOLEAN        COMMENT 'Did revision restore a previous revision?',
        revision_seconds_to_identity_revert    BIGINT        COMMENT 'seconds elapsed between revision posting and its revert',
        revision_tags                   ARRAY<STRING>  COMMENT 'Edit tags associated with revision',
        cur_rev_media_array             ARRAY<STRING>  COMMENT 'List of images in current revision',
        par_rev_media_array             ARRAY<STRING>  COMMENT 'List of images in parent revision'
    )
    """

spark_session.sql(create_table_query)

23/09/04 20:45:33 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.


DataFrame[]

In [40]:
"""
Explanation of CTEs:
* revisions: get all revisions + metadata from desired wikis / timeframe.
  * only main articles and filter out bots / anonymous users
* all_revision_ids: build deduplicated lists of all revision + parent revision IDs
* media_lists: for each revision ID, extract images from associated wikitext
* INSERT OVERWRITE...: join back in media lists with revisions + metadata

# TODO: are newlines in revision_text causing NULLs?
"""

print_for_hive = False
do_execute = True

query = f"""
WITH revisions AS (
    SELECT
      wiki_db,
      event_timestamp,
      page_id,
      event_user_id AS user_id,
      event_user_text AS user_text,
      revision_id,
      revision_parent_id,
      revision_is_identity_reverted,
      revision_is_identity_revert,
      revision_seconds_to_identity_revert,
      revision_tags
    FROM wmf.mediawiki_history
    WHERE
      snapshot = '{mw_snapshot}'
      AND wiki_db IN ('ptwiki', 'idwiki', 'cawiki')
      AND page_namespace = 0
      AND event_type = 'create'
      AND event_entity = 'revision'
      AND event_timestamp >= '{start_timestamp}'
      AND event_timestamp < '{end_timestamp}'
      AND SIZE(event_user_is_bot_by) < 1
      AND SIZE(event_user_is_bot_by_historical) < 1
      AND NOT event_user_is_anonymous
      AND NOT page_is_redirect
),
all_revision_ids AS (
    SELECT DISTINCT
      wiki_db,
      rev_id
    FROM (
        SELECT
          wiki_db,
          revision_id AS rev_id
        FROM revisions
        UNION ALL
        SELECT
          wiki_db,
          revision_parent_id AS rev_id
        FROM revisions
    ) r
),
media_lists AS (
    SELECT
      r.wiki_db,
      r.rev_id,
      getMedia(revision_text, wt.wiki_db) AS media_array
    FROM wmf.mediawiki_wikitext_history wt
    INNER JOIN all_revision_ids r
      ON (wt.wiki_db = r.wiki_db
          AND wt.revision_id = r.rev_id)
    WHERE
      snapshot = '{mw_snapshot}'
      AND wt.wiki_db IN ('ptwiki', 'idwiki', 'cawiki')
)

INSERT INTO TABLE {media_list_table}     
SELECT
  r.wiki_db,
  event_timestamp,
  page_id,
  user_id,
  user_text,
  revision_id,
  revision_parent_id,
  revision_is_identity_reverted,
  revision_is_identity_revert,
  revision_seconds_to_identity_revert,
  revision_tags,
  c.media_array AS cur_rev_media_array,
  p.media_array AS par_rev_media_array
FROM revisions r
LEFT JOIN media_lists c
  ON (r.wiki_db = c.wiki_db AND r.revision_id = c.rev_id)
LEFT JOIN media_lists p
  ON (r.wiki_db = p.wiki_db AND r.revision_parent_id = p.rev_id)
"""

if do_execute:
    spark_session.sql(query)

In [46]:
"""
Explanation of CTEs:
* revisions: get all revisions + metadata from desired wikis / timeframe.
  * only main articles and filter out bots / anonymous users
* all_revision_ids: build deduplicated lists of all revision + parent revision IDs
* media_lists: for each revision ID, extract images from associated wikitext
* INSERT OVERWRITE...: join back in media lists with revisions + metadata

# TODO: are newlines in revision_text causing NULLs?
"""

print_for_hive = False
do_execute = True

query = f"""
WITH revisions AS (
    SELECT
      wiki_db,
      event_timestamp,
      page_id,
      event_user_id AS user_id,
      event_user_text AS user_text,
      revision_id,
      revision_parent_id,
      revision_is_identity_reverted,
      revision_is_identity_revert,
      revision_seconds_to_identity_revert,
      revision_tags
    FROM wmf.mediawiki_history
    WHERE
      snapshot = '2023-07'
      AND wiki_db IN ('fiwiki', 'huwiki', 'nowiki')
      AND page_namespace = 0
      AND event_type = 'create'
      AND event_entity = 'revision'
      AND event_timestamp >= '2023-07-01'
      AND event_timestamp < '2023-08-01'
      AND SIZE(event_user_is_bot_by) < 1
      AND SIZE(event_user_is_bot_by_historical) < 1
      AND NOT event_user_is_anonymous
      AND NOT page_is_redirect
),
all_revision_ids AS (
    SELECT DISTINCT
      wiki_db,
      rev_id
    FROM (
        SELECT
          wiki_db,
          revision_id AS rev_id
        FROM revisions
        UNION ALL
        SELECT
          wiki_db,
          revision_parent_id AS rev_id
        FROM revisions
    ) r
),
media_lists AS (
    SELECT
      r.wiki_db,
      r.rev_id,
      getMedia(revision_text, wt.wiki_db) AS media_array
    FROM wmf.mediawiki_wikitext_history wt
    INNER JOIN all_revision_ids r
      ON (wt.wiki_db = r.wiki_db
          AND wt.revision_id = r.rev_id)
    WHERE
      snapshot = '2023-07'
      AND wt.wiki_db IN ('fiwiki', 'huwiki', 'nowiki')
)

INSERT INTO TABLE cchen.media_july_2023 
SELECT
  r.wiki_db,
  event_timestamp,
  page_id,
  user_id,
  user_text,
  revision_id,
  revision_parent_id,
  revision_is_identity_reverted,
  revision_is_identity_revert,
  revision_seconds_to_identity_revert,
  revision_tags,
  c.media_array AS cur_rev_media_array,
  p.media_array AS par_rev_media_array
FROM revisions r
LEFT JOIN media_lists c
  ON (r.wiki_db = c.wiki_db AND r.revision_id = c.rev_id)
LEFT JOIN media_lists p
  ON (r.wiki_db = p.wiki_db AND r.revision_parent_id = p.rev_id)
"""

if do_execute:
    spark_session.sql(query)

23/09/05 03:56:17 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                 / 8192]

In [47]:
image_edits_query = """

SELECT
      m.wiki_db,
      event_timestamp,
      m.user_text,
      m.user_id,
      revision_id,
      page_id,
      revision_is_identity_reverted,
      revision_is_identity_revert,
      revision_seconds_to_identity_revert,
      revision_tags,
      u.user_editcount,
      INLINE(compareMediaLists(cur_rev_media_array, par_rev_media_array)),
      SIZE(par_rev_media_array) AS illustrated
    FROM {media_list_table} m
    LEFT JOIN wmf_raw.mediawiki_user u 
      ON (m.user_text = u.user_name AND m.wiki_db = u.wiki_db)
    WHERE revision_id IS NOT NULL
      AND m.wiki_db IN {wiki_db_str}
      AND u.snapshot = '{mw_snapshot}'
      
"""

In [48]:
image_edits_data = spark.run( 
        image_edits_query.format(
          media_list_table = media_list_table,
          wiki_db_str = wiki_dbs,
          mw_snapshot = mw_snapshot
        )
    )

                                                                                

In [49]:
image_edits_sdf = spark_session.createDataFrame(image_edits_data)
image_edits_sdf.createGlobalTempView("image_edit_temp")

In [50]:
match_image_query = """

WITH image_edits AS (
    SELECT
        wiki_db,
        event_timestamp,
        revision_id,
        user_id,
        page_id,
        revision_is_identity_reverted,
        revision_seconds_to_identity_revert,
        REPLACE(filename, ' ', '_') AS filename
    FROM global_temp.image_edit_temp
    WHERE action = 1
      AND ! revision_is_identity_revert
      AND ! ARRAY_CONTAINS(revision_tags, 'newcomer task')
), 
noti_type_one AS (
    SELECT 
        wiki_db,
        notification_user,
        notification_bundle_hash,
        MIN(notification_timestamp) AS send_ts,
        MIN(notification_read_timestamp) AS read_ts,
        SUM(CASE WHEN is_article_level THEN 1 ELSE 0 END) AS noti_type, 
        COUNT(*) AS count
    FROM global_temp.notification_data
    GROUP BY wiki_db, notification_user,notification_bundle_hash
),
noti_type_two AS (
    SELECT
        wiki_db,
        notification_user,
        notification_bundle_hash,
        send_ts,
        read_ts,
        CASE WHEN noti_type = 1 AND count = 1 THEN 'article_level'
             WHEN noti_type = 0 THEN 'section_level'
             WHEN noti_type = 1 AND count > 1 THEN 'combined'
        END AS noti_type
    FROM noti_type_one
),
    suggested_images AS (
    SELECT
        d.wiki_db,
        d.notification_bundle_hash,
        n.send_ts,
        n.read_ts, 
        noti_type,
        event_page_id,
        d.notification_user,
        suggested_image
    FROM global_temp.notification_data d LEFT JOIN noti_type_two n 
       ON (d.wiki_db = n.wiki_db AND d.notification_user= n.notification_user AND d.notification_bundle_hash = n.notification_bundle_hash)
)
SELECT 
    i.wiki_db,
    revision_id,
    user_id,
    noti_type,
    filename,
    IF(revision_is_identity_reverted AND revision_seconds_to_identity_revert <= 172800, TRUE, FALSE) AS reverted
FROM image_edits i
LEFT JOIN suggested_images s 
    ON (i.wiki_db = s.wiki_db AND i.page_id = s.event_page_id AND i.user_id = s.notification_user AND i.filename = s.suggested_image)
WHERE notification_bundle_hash IS NOT NULL
  AND read_ts IS NOT NULL

"""


# Caculation

## Number of Notifications


In [19]:
daily_notification_query = """

WITH noti AS (
    SELECT 
        wiki_db,
        notification_user,
        notification_bundle_hash,
        MIN(notification_timestamp) AS send_ts,
        MIN(notification_read_timestamp) AS read_ts,
        SUM(CASE WHEN is_article_level THEN 1 ELSE 0 END) AS noti_type, 
        COUNT(*) AS count
    FROM global_temp.notification_data
    WHERE notification_timestamp >= {noti_timestamp}
    GROUP BY wiki_db, notification_user,notification_bundle_hash
)

SELECT 
   wiki_db,
   FROM_UNIXTIME(UNIX_TIMESTAMP(SUBSTR(send_ts,0,8), 'yyyyMMdd')) AS date,
   COUNT(1) AS all_noti,
   COUNT(read_ts) AS all_noti_read,
   COUNT(CASE WHEN noti_type = 1 AND count = 1 THEN 1 END) AS article_noti,
   COUNT(CASE WHEN noti_type = 1 AND count = 1 THEN read_ts END) AS article_noti_read,
   COUNT(CASE WHEN noti_type = 0  THEN 1 END) AS section_noti,
   COUNT(CASE WHEN noti_type = 0  THEN read_ts END) AS section_read,
   COUNT(CASE WHEN noti_type = 1 AND count > 1 THEN 1 END) AS combined_noti,
   COUNT(CASE WHEN noti_type = 1 AND count > 1 THEN read_ts END) AS combined_noti_read
FROM noti
GROUP BY wiki_db, FROM_UNIXTIME(UNIX_TIMESTAMP(SUBSTR(send_ts,0,8), 'yyyyMMdd'))
   
"""

In [21]:
daily_notification = spark.run(
        daily_notification_query.format(
            noti_timestamp = m_start_timestamp))

23/08/01 05:20:01 WARN TaskSetManager: Stage 0 contains a task of very large size (11947 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [22]:
notification_stats = daily_notification.groupby('wiki_db').agg(
     notification_sent = ('all_noti', 'sum'),
     notification_read = ('all_noti_read', 'sum'),
     article_sent = ('article_noti', 'sum'),
     article_read = ('article_noti_read', 'sum'),
     section_sent = ('section_noti', 'sum'),
     section_read = ('section_read', 'sum'),
     combined_sent = ('combined_noti', 'sum'),
     combined_read = ('combined_noti_read', 'sum')
).reset_index()

In [23]:
notification_stats

Unnamed: 0,wiki_db,notification_sent,notification_read,article_sent,article_read,section_sent,section_read,combined_sent,combined_read
0,cawiki,4007,1066,96,41,3889,1019,22,6
1,fiwiki,1217,277,1217,277,0,0,0,0
2,huwiki,6205,1173,221,77,5934,1081,50,15
3,idwiki,3843,802,101,51,3690,738,52,13
4,nowiki,4070,647,138,30,3886,609,46,8
5,ptwiki,13950,2213,570,181,13233,2000,147,32
6,ruwiki,32994,7075,1070,346,31590,6636,334,93


## Number of opt-outs

In [24]:
pref_type_list = ("push", "web")

In [25]:
pref_query = """

WITH noti_users AS ( --total notification users

SELECT 
    wiki_db,
    notification_user
FROM global_temp.notification_user_data
GROUP BY wiki_db, notification_user

), local_pref AS ( -- local preference 

SELECT
    wiki_db,
    local_user_id,
    up_value AS local_pref
FROM global_temp.local_pref_temp
WHERE up_property = "echo-subscriptions-{type}-image-suggestions"
  and up_value != ''

), global_pref AS ( -- global preference

SELECT
    lu_wiki,
    lu_local_id,
    gp_value AS global_pref
FROM global_temp.global_pref_temp
WHERE gp_property  = "echo-subscriptions-{type}-image-suggestions"
  and gp_value != ''

), local_ex AS ( -- local exceptions

SELECT
    wiki_db,
    local_user_id,
    up_value AS local_ex   
FROM global_temp.local_pref_temp
WHERE up_property = "echo-subscriptions-{type}-image-suggestions-local-exception"
  and up_value != ''

), global_all_pref AS ( -- compare local exception and global preference

SELECT 
    COALESCE(lu_wiki,wiki_db) AS wiki,
    COALESCE(lu_local_id,local_user_id) AS user_id,
    COALESCE(local_ex,global_pref) AS all_pref
FROM global_pref gp FULL OUTER JOIN local_ex le ON (gp.lu_wiki = le.wiki_db AND gp.lu_local_id = le.local_user_id)

)

SELECT 
    nu.wiki_db,
    nu.notification_user,
    gp.all_pref,
    lp.local_pref
FROM noti_users nu 
    LEFT JOIN local_pref lp ON (nu.wiki_db = lp.wiki_db AND nu.notification_user = lp.local_user_id)
    LEFT JOIN global_all_pref gp ON (nu.wiki_db = gp.wiki AND nu.notification_user = gp.user_id)
"""

In [26]:
pref_stats = pd.DataFrame()

for i in range(len(pref_type_list)):
               
    pref_result = spark.run(pref_query.format(
                           type = pref_type_list[i]
                        ))
    
    pref_result.insert(0, 'type', pref_type_list[i])
    
    pref_stats = pd.concat([pref_stats, pref_result], sort=False)

23/08/01 05:20:11 WARN TaskSetManager: Stage 4 contains a task of very large size (2486 KiB). The maximum recommended task size is 1000 KiB.
23/08/01 05:20:17 WARN TaskSetManager: Stage 13 contains a task of very large size (2486 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [27]:
## total number of users received notifications
total_users = pref_stats[(pref_stats['type'] == "web")].groupby(
    ['wiki_db']
).agg(
    total_users = ('notification_user', 'count')
).reset_index()

In [28]:
## opt-out for push notifications and web notifications
pref = pref_stats.groupby(
    ['wiki_db','type', 'all_pref', 'local_pref']
).agg(
    optout_users = ('wiki_db', 'count')
).reset_index()

pref = pref[(pref['all_pref'] == '0')|(pref['local_pref'] == '0')]

In [29]:
mobile_optout = pref_stats[(pref_stats['type'] == "push")]

mobile_optout= mobile_optout[(mobile_optout['all_pref'] == '0')|(mobile_optout['local_pref'] == '0')].groupby(
    ['wiki_db']
).agg(
    mobile_optouts = ( 'wiki_db','count')
).reset_index()

In [30]:
desktop_optout = pref_stats[(pref_stats['type'] == "web")]

desktop_optout= desktop_optout[(desktop_optout['all_pref'] == '0')|(desktop_optout['local_pref'] == '0')].groupby(
    ['wiki_db']
).agg(
    desktop_optout = ( 'wiki_db','count')
).reset_index()

In [31]:
pd.merge(pd.merge(total_users,mobile_optout,on='wiki_db'),desktop_optout,on='wiki_db')

Unnamed: 0,wiki_db,total_users,mobile_optouts,desktop_optout
0,cawiki,789,7,5
1,fiwiki,1059,38,44
2,huwiki,1278,21,26
3,idwiki,959,13,14
4,nowiki,817,21,21
5,ptwiki,3990,36,38
6,ruwiki,7299,162,171


## Number of imgages added

In [51]:
match_image_query = """

WITH image_edits AS (
    SELECT
        wiki_db,
        event_timestamp,
        revision_id,
        user_id,
        page_id,
        revision_is_identity_reverted,
        revision_seconds_to_identity_revert,
        REPLACE(filename, ' ', '_') AS filename
    FROM global_temp.image_edit_temp
    WHERE action = 1
      AND ! revision_is_identity_revert
      AND ! ARRAY_CONTAINS(revision_tags, 'newcomer task')
), 
noti_type_one AS (
    SELECT 
        wiki_db,
        notification_user,
        notification_bundle_hash,
        MIN(notification_timestamp) AS send_ts,
        MIN(notification_read_timestamp) AS read_ts,
        SUM(CASE WHEN is_article_level THEN 1 ELSE 0 END) AS noti_type, 
        COUNT(*) AS count
    FROM global_temp.notification_data
    WHERE notification_timestamp >= {noti_timestamp}
    GROUP BY wiki_db, notification_user,notification_bundle_hash
),
noti_type_two AS (
    SELECT
        wiki_db,
        notification_user,
        notification_bundle_hash,
        send_ts,
        read_ts,
        CASE WHEN noti_type = 1 AND count = 1 THEN 'article_level'
             WHEN noti_type = 0 THEN 'section_level'
             WHEN noti_type = 1 AND count > 1 THEN 'combined'
        END AS noti_type
    FROM noti_type_one
),

suggested_images AS (
    SELECT
        d.wiki_db,
        d.notification_bundle_hash,
        n.send_ts,
        n.read_ts, 
        noti_type,
        event_page_id,
        d.notification_user,
        suggested_image
    FROM global_temp.notification_data d LEFT JOIN noti_type_two n 
       ON (d.wiki_db = n.wiki_db AND d.notification_user= n.notification_user AND d.notification_bundle_hash = n.notification_bundle_hash)
),

change_count AS (
SELECT 
    i.wiki_db,
    revision_id,
    user_id,
    noti_type,
    filename,
    IF(revision_is_identity_reverted AND revision_seconds_to_identity_revert <= 172800, TRUE, FALSE) AS reverted
FROM image_edits i
LEFT JOIN suggested_images s 
    ON (i.wiki_db = s.wiki_db AND i.page_id = s.event_page_id AND i.user_id = s.notification_user AND i.filename = s.suggested_image)
WHERE notification_bundle_hash IS NOT NULL
  AND read_ts IS NOT NULL
)

SELECT 
    wiki_db,
    noti_type,
    COUNT(DISTINCT(revision_id)) AS num_edits,
    COUNT(DISTINCT(filename)) AS image_added,
    COUNT(DISTINCT(CASE WHEN reverted THEN revision_id ELSE NULL END)) reverted
FROM change_count c
GROUP BY wiki_db,noti_type

"""

In [52]:
img_data = spark.run( 
        match_image_query.format(
          media_list_table = media_list_table,
          noti_timestamp = m_start_timestamp
        )
    )

23/09/05 05:15:17 WARN TaskSetManager: Stage 63 contains a task of very large size (11968 KiB). The maximum recommended task size is 1000 KiB.
23/09/05 05:15:17 WARN TaskSetManager: Stage 64 contains a task of very large size (11968 KiB). The maximum recommended task size is 1000 KiB.
23/09/05 05:15:18 WARN TaskSetManager: Stage 66 contains a task of very large size (8938 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [54]:
##overall 
img_data.groupby(['wiki_db'])['image_added','reverted'].sum()


  img_data.groupby(['wiki_db'])['image_added','reverted'].sum()


Unnamed: 0_level_0,image_added,reverted
wiki_db,Unnamed: 1_level_1,Unnamed: 2_level_1
cawiki,61,0
fiwiki,24,0
huwiki,51,0
idwiki,19,0
nowiki,10,1
ptwiki,52,0
ruwiki,160,1


In [57]:
# by notification type 

img_data.sort_values(by=['wiki_db','noti_type'])


Unnamed: 0,wiki_db,noti_type,num_edits,image_added,reverted
14,cawiki,article_level,1,1,0
2,cawiki,combined,1,1,0
1,cawiki,section_level,59,59,0
11,fiwiki,article_level,24,24,0
15,huwiki,article_level,8,8,0
9,huwiki,section_level,43,43,0
8,idwiki,article_level,3,3,0
16,idwiki,combined,2,2,0
4,idwiki,section_level,14,14,0
10,nowiki,article_level,2,2,0
