In [64]:
w = [
["itwiki", "IT"],
["zhwiki", "TW"],
["ptwiki", "BR"],
["plwiki", "PL"],
["ukwiki", "UA"],
["kowiki", "KR"],
["hewiki", "IL"],
["cswiki", "CZ"],
["idwiki", "ID"],
["fiwiki", "FI"]]

wikis = pd.DataFrame(w, columns = ["wiki", "country"])

In [2]:
#!/usr/bin/python
import pymysql
import pandas as pd
from impala.dbapi import connect as impala_conn
from impala.util import as_pandas

def try_decode(cell):
    try:
        return cell.decode(encoding = "utf-8")
    except AttributeError:
        return cell
    
def decode_data(d):
    return [{try_decode(key): try_decode(val) for key, val in item.items()} for item in d]

def query_db(query, db = "mariadb", fmt = "pandas"):
    if db not in ["mariadb", "hadoop"]:
        raise ValueError("The db should be `mariadb` or `hadoop`.")
    if fmt not in ["pandas", "raw"]:
        raise ValueError("The format should be either `pandas` or `raw`.")
    
    if db == "mariadb":
        try:
            conn = pymysql.connect(
                host = "analytics-store.eqiad.wmnet",
                read_default_file = '/etc/mysql/conf.d/research-client.cnf',
                charset = 'utf8mb4',
                db='staging',
                cursorclass=pymysql.cursors.DictCursor
            )
            if fmt == "pandas":
                result = pd.read_sql_query(query, conn)
                # Turn any binary data into strings
                result = result.applymap(try_decode)
            elif fmt == "raw":
                cursor = conn.cursor()
                cursor.execute(query)
                result = cursor.fetchall()
                result = decode_data(result)
        finally:
            conn.close()
        
    elif db == "hadoop":
        try:
            hive_conn = impala_conn(host='analytics1003.eqiad.wmnet', port=10000, auth_mechanism='PLAIN')
            hive_cursor = hive_conn.cursor()
            hive_cursor.execute(query)
            if fmt == "pandas":
                result = as_pandas(hive_cursor)
            elif fmt == "raw":
                result = hive_cursor.fetchall()
        finally:
            hive_conn.close()
    
    return result

# Retention rates

In [None]:
q = """
select (count(2nd_month.user_id) / count(*)) as retention_rate
from
(select distinct(event_user_id) as user_id
from wmf.mediawiki_history
where 
wiki_db = "{wiki}" and
event_entity = "revision" and
event_type = "create" and
event_user_creation_timestamp >= "20160901" and
event_user_creation_timestamp < "20161101" and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") <
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (30*24*60))
) 1st_month
left join
(select distinct(event_user_id) as user_id
from wmf.mediawiki_history
where 
wiki_db = "{wiki}" and
event_entity = "revision" and
event_type = "create" and
event_user_creation_timestamp >= "20160901" and
event_user_creation_timestamp < "20161101" and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") >=
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (30*24*60)) and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") <
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (60*24*60))
) 2nd_month
on (1st_month.user_id = 2nd_month.user_id)
"""

rates = []

for row in wikis.itertuples():
    results = query_db(q.format(wiki = row.wiki), db = "hadoop")
    rate = results.iloc[0, 0]
    rates.append(rate)

wikis["retention_rate"] = rates
wikis

In [67]:
wikis

Unnamed: 0,wiki,country,retention_rate
0,itwiki,IT,0.021011
1,zhwiki,TW,0.035955
2,ptwiki,BR,0.033258
3,plwiki,PL,0.035674
4,ukwiki,UA,0.031875
5,kowiki,KR,0.040188
6,hewiki,IL,0.034457
7,cswiki,CZ,0.018182
8,idwiki,ID,0.035577
9,fiwiki,FI,0.011314


# Longer term retention rates

In [68]:
w = [
["kowiki"],
["zhwiki"],
["ukwiki"],
["cswiki"],
["enwiki"],
["commonswiki"],
["dewiki"],
["eswiki"],
["jawiki"],
["ruwiki"]
]

wikis = pd.DataFrame(w, columns = ["wiki"])

q = """
select (count(2nd_month.user_id) / count(*)) as retention_rate
from
(select distinct(event_user_id) as user_id
from wmf.mediawiki_history
where 
wiki_db = "{wiki}" and
event_entity = "revision" and
event_type = "create" and
event_user_creation_timestamp >= "20151101" and
event_user_creation_timestamp < "20161101" and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") <
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (30*24*60))
) 1st_month
left join
(select distinct(event_user_id) as user_id
from wmf.mediawiki_history
where 
wiki_db = "{wiki}" and
event_entity = "revision" and
event_type = "create" and
event_user_creation_timestamp >= "20151101" and
event_user_creation_timestamp < "20161101" and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") >=
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (30*24*60)) and
unix_timestamp(event_timestamp, "YYYYMMddHHmmss") <
(unix_timestamp(event_user_creation_timestamp, "YYYYMMddHHmmss") + (60*24*60))
) 2nd_month
on (1st_month.user_id = 2nd_month.user_id)
"""

In [None]:
rates = []

for row in wikis.itertuples():
    results = query_db(q.format(wiki = row.wiki), db = "hadoop")
    rate = results.iloc[0, 0]
    rates.append(rate)

wikis["retention_rate"] = rates

In [73]:
wikis

Unnamed: 0,wiki,retention_rate
0,kowiki,0.041925
1,zhwiki,0.039836
2,ukwiki,0.028717
3,cswiki,0.022216
4,enwiki,0.03709
5,commonswiki,0.019375
6,dewiki,0.020587
7,eswiki,0.039828
8,jawiki,0.057843
9,ruwiki,0.021159


# Prospect search #

In [110]:
q = """
select user_id, ip
from
(select user_id, count(*) as edits, cuc_ip as ip
from
(select *
from log.ServerSideAccountCreation_5487345 ssac
inner join {wiki}.cu_changes cuc on cuc_user = event_userId
inner join {wiki}.user u on user_id = event_userId
where
ssac.wiki = "{wiki}" and
ssac.timestamp >= date_format(date_sub(now(), interval 90 day), "%Y%m%d%H%i%S") and
u.user_email is not null) a
group by user_id
having edits >= {n}) b
inner join {wiki}.cu_changes on user_id = cuc_user
;
"""

In [111]:
import geoip2.database
import numpy as np

reader = geoip2.database.Reader("/usr/share/GeoIP/GeoIP2-Country.mmdb")

prospects_col = []

for row in wikis.itertuples():
    def country_match(ip):
        country_code = row.country
        try:
            r = reader.country(ip)
            return float(r.country.iso_code == country_code)
        except:
            return 0
    
    wiki = row.wiki
    user_ips = query_db(q.format(wiki = wiki, n = "5"))
    user_ips["in_country"] = user_ips["ip"].apply(country_match)
    country_pct = user_ips.groupby("user_id").agg({"in_country": np.mean})
    n_prospects = len(country_pct[country_pct["in_country"] == 1])
    prospects_col.append(n_prospects)
    
wikis["prospects"] = prospects_col
wikis

Unnamed: 0,wiki,country,prospects
0,itwiki,IT,3026
1,zhwiki,TW,1330
2,ptwiki,BR,4030
3,plwiki,PL,1254
4,nlwiki,NL,1179
5,ukwiki,UA,1343
6,kowiki,KR,1159
7,hewiki,IL,1232
8,arwiki,EG,489
9,svwiki,SE,1726
