# Import

In [1]:
from functools import reduce
from pathlib import Path
import time
import datetime
import pandas as pd
import requests
from wmfdata import hive
from wmfdata import spark 
from wmfdata.utils import print_err, pd_display_all

# Parameters

In [2]:
# TSV file where metrics are or will be saved
FILENAME = "metrics/metrics.tsv"

# Metric month. The mediawiki_history snapshot must be from the metrics month or later.
last_month = datetime.date.today().replace(day=1) - datetime.timedelta(days=1)

METRICS_MONTH_TEXT = last_month.strftime("%Y-%m")
MEDIAWIKI_HISTORY_SNAPSHOT = last_month.strftime("%Y-%m")

# Preparation

In [3]:
# Convert our metrics month to all the formats we need and provide them in a dict
# so we can easily use them to format strings
metrics_month = METRICS_MONTH_TEXT
date_params = {
    "mediawiki_history_snapshot": MEDIAWIKI_HISTORY_SNAPSHOT,
    "metrics_month": str(metrics_month), 
    "metrics_month_first_day": str((datetime.date.today()- datetime.timedelta(days=31)).replace(day=1)),
    "metrics_month_last_day": str(last_month),
    "metrics_year": last_month.year,
    "metrics_cur_month" : last_month.month
}

# Load any previous results
try:
    old_metrics = (
        pd.read_csv(FILENAME, sep="\t", parse_dates = ["month"])
        .set_index("month")
    )
except FileNotFoundError:
    old_metrics = None
    
def prepare_query(filename):
    return (
        Path(filename)
        .read_text()
        .format(**date_params)
    )

# Query metrics


In [4]:
queries = {
    "structured_data_used": {
        "file": "queries/structured_data_used.hql",
        "engine": "spark"
    },
    "wikidata_items": {
        "file": "queries/wikidata_items.hql",
        "engine": "spark"
    },
    "wikidata_items_being_reused": {
        "file": "queries/wikidata_items_being_reused.hql",
        "engine": "spark"
    }
    
       
}


for key, val in queries.items():
    query = prepare_query(val["file"])
    engine = val["engine"]
    print_err("Running {} on {}...".format(key, engine))
    
    if engine == "mariadb":
        result = mariadb.run(query)
    elif engine == "spark":
        result = spark.run(query)
    else:
        raise ValueError("Unknown engine specified.") 
    
    result = result.assign(month=lambda df: pd.to_datetime(df["month"]))
    val["result"] = result

Running structured_data_used on spark...
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
Running wikidata_items on spark...
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
Running wikidata_items_being_reused on spark...
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [5]:
#"num_commons_files_used_content_pages": {
#        "file": "queries/num_commons_files.hql",
#        "engine": "spark"
#    },



num_commons_files_query = """ 
WITH

    wiki_content_namespaces as (
        select
            dbname,
            namespace
        FROM
            wmf_raw.mediawiki_project_namespace_map
        WHERE
            namespace_is_content = 1
            AND snapshot = '{metrics_month}'
    )

   
        SELECT
            '{metrics_month_first_day}' AS month,
            COUNT(DISTINCT il_to) AS num_commons_files_used_content_pages
        FROM
            wmf_raw.mediawiki_imagelinks as m
            INNER JOIN wiki_content_namespaces AS ns
                ON ( ns.namespace = m.il_from_namespace)
                AND ns.dbname = m.wiki_db -- recommended by Mikhail to properly run the code
        WHERE
            m.snapshot = '{metrics_month}'

"""

In [6]:
#num_commons_files_used_content_pages = spark.sql(num_commons_files_query.format(**date_params)).toPandas()
#merge_in(mce)

num_commons_files_df = spark.run(num_commons_files_query.format(**date_params))

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


# Combining and saving metrics

In [7]:
# Assemble list of result dataframes
results = [val["result"] for _, val in queries.items()]

# Merge them all, assuming that the month is the only common column
new_metrics_subset = reduce(lambda l, r: pd.merge(l, r, how="outer"), results)

#add in num_commons_files_used_content_pages
new_metrics_prep1 = pd.concat([new_metrics_subset, num_commons_files_df['num_commons_files_used_content_pages']], axis=1)

#reorder columns 
new_metrics_prep = new_metrics_prep1[['month', 'structured_data_used', 'num_commons_files_used_content_pages','wikidata_items']]

#Set the month as an index so combine_first works properly
new_metrics = new_metrics_prep.set_index("month").sort_index()

In [8]:
if old_metrics is None:
    metrics = new_metrics
else:
    metrics = new_metrics.combine_first(old_metrics)
    
pd_display_all(metrics.tail(10))

Unnamed: 0_level_0,num_commons_files_used_content_pages,structured_data_used,wikidata_items,wikidata_items_being_reused
month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2021-07-01,33853754.0,118873343.0,93808863.0,19464104.0
2021-08-01,35339547.0,120411293.0,94347261.0,19540328.0
2021-09-01,35748687.0,121413643.0,94646283.0,19640044.0
2021-10-01,36017280.0,123036144.0,95086219.0,19801319.0
2021-11-01,36208503.0,123918750.0,95420570.0,20031718.0
2021-12-01,36843454.0,124842611.0,95826220.0,20161422.0
2022-01-01,37096017.0,125887906.0,96153653.0,20266273.0
2022-02-01,37124441.0,127025510.0,96332960.0,20548393.0
2022-03-01,37488533.0,127991760.0,96647383.0,20647318.0
2022-04-01,37898030.0,128971911.0,96908499.0,


In [30]:
#get last value & replace as needed
metrics.iloc[-1,c.columns.get_loc("wikidata_items_being_reused")] = #missing value

In [32]:
metrics.to_csv(FILENAME, sep="\t")