In [None]:
# get page ranges

#  round datetime to day, get ± 7 days inclusive

#  merge any date ranges that overlaps

#  reorganise data into dict where key is date range and value is a list of pages

In [17]:
import pandas as pd
from urllib.parse import unquote
import wikitoolkit as wt
import pickle
import sqlite3
import time
import os
from urllib3.exceptions import MaxRetryError


In [18]:
my_agent = 'wikireddit <p.gildersleve@exeter.ac.uk>'
conn = sqlite3.connect('wikireddit.db')

ranges_df = pd.read_hdf('data/ranges_df.h5')
raw_ranges_df = pd.read_hdf('data/raw_ranges_df.h5')

with open('data/langpagemaps.pkl', 'rb') as f:
    pagemapsdict = pickle.load(f)

Get ranges for pageviews and edits

In [19]:
ranges_df_consol = ranges_df.groupby(['lang', 'start_date', 'end_date'])['title'].apply(list)
date_article_dict = {}
# convert ranges_df_consol to a dict of dicts
for lang, start_date, end_date in ranges_df_consol.index:
    if lang not in date_article_dict:
        date_article_dict[lang] = {}
    date_article_dict[lang][(start_date, end_date)] = ranges_df_consol[(lang, start_date, end_date)]

raw_ranges_df_consol = raw_ranges_df.groupby(['lang', 'start_date', 'end_date'])['title'].apply(list)
raw_date_article_dict = {}
# convert raw_ranges_df_consol to a dict of dicts
for lang, start_date, end_date in raw_ranges_df_consol.index:
    if lang not in raw_date_article_dict:
        raw_date_article_dict[lang] = {}
    raw_date_article_dict[lang][(start_date, end_date)] = raw_ranges_df_consol[(lang, start_date, end_date)]

starts_df_consol = ranges_df.groupby(['lang', 'start_date'])['title'].apply(list)
startdate_article_dict = {}
# convert starts_df_consol to a dict of dicts
for lang, start_date in starts_df_consol.index:
    if lang not in startdate_article_dict:
        startdate_article_dict[lang] = {}
    startdate_article_dict[lang][start_date] = starts_df_consol[(lang, start_date)]

Get revisions in range [from, to] (inclusive), as well as revisions at start of range

In [None]:
if os.path.exists('data/done_langdateranges_rev.pkl'):
    with open('data/done_langdateranges_rev.pkl', 'rb') as f:
        done_langdateranges = pickle.load(f)
else:
    done_langdateranges = {}

maxgroupsize = 10000
for lang, da_dict in date_article_dict.items():
    print()
    try:
        l_revisions_df = [pd.read_hdf('data/revisions.h5', key=f'/{lang}')]
    except (KeyError, FileNotFoundError):
        l_revisions_df = []
    if lang not in done_langdateranges:
        done_langdateranges[lang] = set()
    wtsession = wt.WTSession(f'{lang}.wikipedia', user_agent=my_agent)
    for n, (daterange, articles) in enumerate(da_dict.items()):
        if n % 100 == 0:
            print(lang, n/len(da_dict), end='\r')
        if daterange in done_langdateranges[lang]:
            continue

        grouprevs = {}
        groupsize = maxgroupsize
        while len(articles) > 0:
            try:
                revisions = await wt.get_revisions(wtsession, articles[:groupsize], pagemaps=pagemapsdict[lang],
                                                start=daterange[0].isoformat(), stop=daterange[1].isoformat())
                grouprevs.update(revisions)
                articles = articles[groupsize:]
                groupsize = min(int(round(groupsize * (2**0.6), 0)), maxgroupsize)
            except ValueError as e:
                print(e, 'Reducing group size to', groupsize // 2)
                time.sleep(0.1)
                groupsize = groupsize // 2

        revisions = pd.concat({k: pd.DataFrame(v) for k, v in grouprevs.items()}).reset_index(
            level=1, drop=True).reset_index().rename(columns={'index': 'title'})
        done_langdateranges[lang].add(daterange)
        if len(revisions) == 0:
            continue
        revisions['lang'] = lang
        revisions = revisions[['lang', 'title', 'revid', 'parentid', 'timestamp']]
        l_revisions_df.append(revisions)

        if (n%5000 == 0)&(n > 0): # save every 5000 for very large langs for safety
            if len(l_revisions_df) > 0:
                l_revisions_df_i = pd.concat(l_revisions_df, ignore_index=True)
                l_revisions_df_i.to_hdf('data/revisions.h5', key=f'/{lang}', mode='a')
                del l_revisions_df_i
                with open('data/done_langdateranges_rev.pkl', 'wb') as f:
                    pickle.dump(done_langdateranges, f)
        
    if len(l_revisions_df) > 0:
        l_revisions_df = pd.concat(l_revisions_df, ignore_index=True)
        l_revisions_df.to_hdf('data/revisions.h5', key=f'/{lang}', mode='a')
        with open('data/done_langdateranges_rev.pkl', 'wb') as f:
            pickle.dump(done_langdateranges, f)
    await wtsession.close()

get revisions at start of range

In [None]:
if os.path.exists('data/done_langdatestarts_rev.pkl'):
    with open('data/done_langdatestarts_rev.pkl', 'rb') as f:
        done_langdatestarts = pickle.load(f)
else:
    done_langdatestarts = {}

maxgroupsize = 1000
for lang, da_dict in startdate_article_dict.items():
    print()
    try:
        l_revisions_df = [pd.read_hdf('data/revisions.h5', key=f'/{lang}')]
    except (KeyError, FileNotFoundError):
        l_revisions_df = []
    if lang not in done_langdatestarts:
        done_langdatestarts[lang] = set()

    wtsession = wt.WTSession(f'{lang}.wikipedia', user_agent=my_agent)
    for n, (date, articles) in enumerate(da_dict.items()):
        if n % 10 == 0:
            print(lang, n/len(da_dict), end='\r')
        if date in done_langdatestarts[lang]:
            continue
        grouprevs = {}
        groupsize = maxgroupsize
        while len(articles) > 0:
            try:
                revisions = await wt.get_revision(wtsession, titles=articles[:groupsize], pagemaps=pagemapsdict[lang],
                                                 date=date.isoformat())
                grouprevs.update(revisions)
                articles = articles[groupsize:]
                groupsize = min(int(round(groupsize * (2**0.6), 0)), maxgroupsize)
            except ValueError as e:
                print('\n', n, e, 'Reducing group size to', groupsize // 2)
                time.sleep(0.1)
                groupsize = groupsize // 2

        date_revisions = pd.DataFrame({k: v for k, v in grouprevs.items() if v}).T.reset_index().rename(columns={'index': 'title'})
        done_langdatestarts[lang].add(date)
        if len(date_revisions) == 0:
            continue
        date_revisions['lang'] = lang
        date_revisions = date_revisions[['lang', 'title', 'revid', 'parentid', 'timestamp']]
        l_revisions_df.append(date_revisions)

        if (n%100 == 0)&(n > 0):
            if len(l_revisions_df) > 0:
                l_revisions_df_i = pd.concat(l_revisions_df, ignore_index=True)
                l_revisions_df_i = l_revisions_df_i.astype({'revid': int, 'parentid': int}).drop_duplicates(subset=['revid'])
                l_revisions_df_i.to_hdf('data/revisions.h5', key=f'/{lang}', mode='a')
                del l_revisions_df_i
                with open('data/done_langdatestarts_rev.pkl', 'wb') as f:
                    pickle.dump(done_langdatestarts, f)

    if len(l_revisions_df) > 0:
        l_revisions_df = pd.concat(l_revisions_df, ignore_index=True)
        l_revisions_df = l_revisions_df.astype({'revid': int, 'parentid': int}).drop_duplicates(subset=['revid'])
        l_revisions_df.to_hdf('data/revisions.h5', key=f'/{lang}', mode='a')
        with open('data/done_langdatestarts_rev.pkl', 'wb') as f:
            pickle.dump(done_langdatestarts, f)

    await wtsession.close()

Get pageviews for canonical and raw titles

In [None]:
if os.path.exists('data/done_langdateranges_raw_pv.pkl'):
    with open('data/done_langdateranges_raw_pv.pkl', 'rb') as f:
        done_langdateranges = pickle.load(f)
else:
    done_langdateranges = {}

errors = []
maxgroupsize = 10000
for lang, da_dict in raw_date_article_dict.items():

    if lang not in done_langdateranges:
        done_langdateranges[lang] = set()
    try:
        l_pvdf = [pd.read_hdf('data/pageviews_raw.h5', key=f'/{lang}')]
        print(lang, 'successful read')
    except (KeyError, FileNotFoundError):
        l_pvdf = []

    wtsession = wt.WTSession(f'{lang}.wikipedia', user_agent=my_agent)
    
    for n, (daterange, articles) in enumerate(da_dict.items()):
        if n % 100 == 0:
            print(lang, n/len(da_dict), end='\r')
        if daterange in done_langdateranges[lang]:
            continue
        grouppvs = []
        groupsize = maxgroupsize
        while len(articles) > 0:
            try:
                try:
                    pageviews = await wt.api_article_views(wtsession, f'{lang}.wikipedia', articles[:groupsize], pagemaps=pagemapsdict[lang],
                            start=daterange[0].strftime('%Y%m%d'), end=(daterange[1] - pd.Timedelta(days=1)).strftime('%Y%m%d'),
                            agent='user', redirects=False, process=False)
                except Exception as e:
                    if e.args[0][:44] == 'The pageview API returned nothing useful at:':
                        pageviews = {d.to_pydatetime(): {x: 0 for x in articles}
                                for d in pd.date_range(start=daterange[0], end=daterange[1] - pd.Timedelta(days=1))}
                    else:
                        raise e
                    
                pvdf = pd.DataFrame(pageviews).T.reset_index().rename(columns={'index': 'date'})
                pvdf = pvdf.melt(id_vars='date', var_name='title', value_name='pageviews')
                pvdf['lang'] = lang
                pvdf = pvdf[['lang', 'title', 'date', 'pageviews']]
                grouppvs.append(pvdf)
                articles = articles[groupsize:]
                groupsize = min(int(round(groupsize * (2**0.6), 0)), maxgroupsize)

            except (ValueError, TypeError, MaxRetryError, IndexError) as e:
                print(n/len(da_dict), 'Reducing group size to', groupsize // 2, end='\r')
                errors.append(e)
                time.sleep(0.1)
                groupsize = groupsize // 2

        gpvdf = pd.concat(grouppvs, ignore_index=True)
        l_pvdf.append(gpvdf)
        done_langdateranges[lang].add(daterange)

        if (len(l_pvdf) > 0) & (n % 5000 == 0) & (n > 0):
            print('writing intermediate')
            l_pvdfw = pd.concat(l_pvdf, ignore_index=True)
            l_pvdfw.to_hdf('data/pageviews_raw.h5', key=f'/{lang}', mode='a')
            with open('data/done_langdateranges_raw_pv.pkl', 'wb') as f:
                pickle.dump(done_langdateranges, f)

    if len(l_pvdf) > 0:
        l_pvdf = pd.concat(l_pvdf, ignore_index=True)
        l_pvdf.to_hdf('data/pageviews_raw.h5', key=f'/{lang}', mode='a')
        with open('data/done_langdateranges_raw_pv.pkl', 'wb') as f:
            pickle.dump(done_langdateranges, f)  
                  
    await wtsession.close()

In [None]:
if os.path.exists('data/done_langdateranges_pv.pkl'):
    with open('data/done_langdateranges_pv.pkl', 'rb') as f:
        done_langdateranges = pickle.load(f)
else:
    done_langdateranges = {}

errors = []
maxgroupsize = 10000
for lang, da_dict in date_article_dict.items():

    if lang not in done_langdateranges:
        done_langdateranges[lang] = set()
    try:
        l_pvdf = [pd.read_hdf('data/pageviews.h5', key=f'/{lang}')]
        print(lang, 'successful read')
    except (KeyError, FileNotFoundError):
        l_pvdf = []

    wtsession = wt.WTSession(f'{lang}.wikipedia', user_agent=my_agent)
    
    for n, (daterange, articles) in enumerate(da_dict.items()):
        if n % 100 == 0:
            print(lang, n/len(da_dict), end='\r')
        if daterange in done_langdateranges[lang]:
            continue
        grouppvs = []
        groupsize = maxgroupsize
        while len(articles) > 0:
            try:
                try:
                    pageviews = await wt.api_article_views(wtsession, f'{lang}.wikipedia', articles[:groupsize], pagemaps=pagemapsdict[lang],
                            start=daterange[0].strftime('%Y%m%d'), end=(daterange[1] - pd.Timedelta(days=1)).strftime('%Y%m%d'),
                            agent='user', redirects=False)
                except Exception as e:
                    if e.args[0][:44] == 'The pageview API returned nothing useful at:':
                        pageviews = {d.to_pydatetime(): {x: 0 for x in articles}
                                for d in pd.date_range(start=daterange[0], end=daterange[1] - pd.Timedelta(days=1))}
                    else:
                        raise e
                    
                pvdf = pd.DataFrame(pageviews).T.reset_index().rename(columns={'index': 'date'})
                pvdf = pvdf.melt(id_vars='date', var_name='title', value_name='pageviews')
                pvdf['lang'] = lang
                pvdf = pvdf[['lang', 'title', 'date', 'pageviews']]
                grouppvs.append(pvdf)
                articles = articles[groupsize:]
                groupsize = min(int(round(groupsize * (2**0.6), 0)), maxgroupsize)

            except (ValueError, TypeError, MaxRetryError, IndexError) as e:
                print(n/len(da_dict), 'Reducing group size to', groupsize // 2, end='\r')
                errors.append(e)
                time.sleep(0.1)
                groupsize = groupsize // 2

        gpvdf = pd.concat(grouppvs, ignore_index=True)
        l_pvdf.append(gpvdf)
        done_langdateranges[lang].add(daterange)

        if (len(l_pvdf) > 0) & (n % 5000 == 0) & (n > 0):
            print('writing intermediate')
            l_pvdfw = pd.concat(l_pvdf, ignore_index=True)
            l_pvdfw.to_hdf('data/pageviews.h5', key=f'/{lang}', mode='a')
            with open('data/done_langdateranges_pv.pkl', 'wb') as f:
                pickle.dump(done_langdateranges, f)

    if len(l_pvdf) > 0:
        l_pvdf = pd.concat(l_pvdf, ignore_index=True)
        l_pvdf.to_hdf('data/pageviews.h5', key=f'/{lang}', mode='a')
        with open('data/done_langdateranges_pv.pkl', 'wb') as f:
            pickle.dump(done_langdateranges, f)  
                  
    await wtsession.close()

In [None]:
# save to sql db

import sqlite3
import time

def save_to_sql_with_retry(df, conn, table_name, retries=5, delay=1):
    for i in range(retries):
        try:
            df.to_sql(table_name, conn, if_exists='append', index=False)
            break
        except sqlite3.OperationalError as e:
            if 'database is locked' in str(e):
                print(f"Database is locked, retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                raise e

# erase any existing pageviews table

conn.execute('DROP TABLE IF EXISTS pageviews;')

# /write pageviews to sql

for lang in date_article_dict:
    print(lang)
    pvdf = pd.read_hdf('data/pageviews.h5', key=f'/{lang}')
    try:
        pvrdf = pd.read_hdf('data/pageviews_raw.h5', key=f'/{lang}')
        final_pvdf = pd.concat([pvdf, pvrdf], ignore_index=True).drop_duplicates(subset=['lang', 'title', 'date'])
    except KeyError:
        final_pvdf = pvdf.drop_duplicates(subset=['lang', 'title', 'date'])

    save_to_sql_with_retry(final_pvdf, conn, 'pageviews')



In [None]:
# get storage size of database
import os
mb_size = os.path.getsize('wikireddit.db') / 1024 / 1024
print(f"Database size: {mb_size:.2f} MB")

topics

In [None]:
lang_title_dict = ranges_df[['lang', 'title']].drop_duplicates().groupby('lang')['title'].apply(list).to_dict()

maxgroupsize = 1000
l_topics_df = []
xx = None
for lang, articles in lang_title_dict.items():

    if lang not in pagemapsdict:
        pagemapsdict[lang] = wt.PageMaps()

    wtsession = wt.WTSession(f'{lang}.wikipedia', user_agent=my_agent)

    grouptopics = {}
    groupsize = maxgroupsize
    counter = 0
    la = None
    while len(articles) > 0:
        print(f'{lang}: {len(articles)} articles remaining. {len(grouptopics)} articles processed.', end='\r')
        try:

            a_topics = wt.get_articles_topics_sync(wtsession, articles[:groupsize],
                                        lang=lang, tf_args={'threshold': 0},
                                        pagemaps=pagemapsdict[lang])
            grouptopics.update(a_topics)
            articles = articles[groupsize:]
            groupsize = min(int(round(groupsize * (2**0.6), 0)), maxgroupsize)
        except KeyboardInterrupt:
            raise
        except Exception as e:
            if len(articles) == la:
                counter += 1
                time.sleep(10)
                if counter > 10:
                    print(f'{lang}: {len(articles)} articles remaining. {len(grouptopics)} articles processed.')
                    print(e, 'emergency_save')
                    l_topics_df = pd.concat(l_topics_df, ignore_index=True)
                    l_topics_df.to_hdf('data/topics_df_exit.h5', key='df')
                    with open('data/grouptopics.pkl_exit', 'wb') as f:
                        pickle.dump(grouptopics, f)

            print(e, 'Reducing group size to', groupsize // 2)
            time.sleep(0.1)
            groupsize = groupsize // 2

    grouptopics = pd.concat({k: pd.Series(v) for k, v in grouptopics.items()}).reset_index().rename(
                    columns={'level_0': 'article', 'level_1': 'topic', 0: 'score'})
    grouptopics = grouptopics.pivot(index='article', columns='topic', values='score').reset_index().rename_axis(None, axis=1)
    grouptopics['lang'] = lang
    grouptopics = grouptopics.set_index(['lang', 'article'])
    l_topics_df.append(grouptopics)
    
    await wtsession.close()

l_topics_df = pd.concat(l_topics_df, ignore_index=True)