In [1]:
import mwapi
import wmfdata
import requests
import re
import glob
from time import time
import mwparserfromhell as mwp
import difflib

In [2]:
#CONFIG
LANGUAGE_CODE='id'
TEMPLATES_FILE_EN='../data/reliability_templates_list.txt'
SNAPSHOT ="2021-04"

#HADOOP OUTPUT DIRS
outputHDFS = './HDFSout/'
REVISIONS_OUTDIR=outputHDFS+'/revisions_template/{}/'.format(LANGUAGE_CODE)
META_OUTDIR=outputHDFS+'/revisions_meta/{}/'.format(LANGUAGE_CODE)
PAIRS_OUTDIR=outputHDFS+'/revisions_pairs/{}/'.format(LANGUAGE_CODE)
PAIRS_OUTDIR_lang = PAIRS_OUTDIR+'/{}/'.format(LANGUAGE_CODE)
PAIRS_OUTDIR_en = PAIRS_OUTDIR+'/{}/'.format('en')
TEXT_OUTDIR=outputHDFS+'/revisions_txt/{}/'.format(LANGUAGE_CODE)
FEATURES_OUTDIR=outputHDFS+'/revisions_features/{}/'.format(LANGUAGE_CODE)

#LOCAL OUTPUT DIRS
OUTDIR_LOCAL = '../data/out/'
META_OUTDIR_LOCAL = OUTDIR_LOCAL+'/revisions_meta/{}/'.format(LANGUAGE_CODE)

PAIRS_OUTDIR_LOCAL=OUTDIR_LOCAL+'/revisions_pairs/{}/'.format(LANGUAGE_CODE)
PAIRS_OUTDIR_lang_LOCAL = PAIRS_OUTDIR_LOCAL+'/{}/'.format(LANGUAGE_CODE)
PAIRS_OUTDIR_en_LOCAL = PAIRS_OUTDIR_LOCAL+'/{}/'.format('en')

TEXT_OUTDIR_LOCAL=OUTDIR_LOCAL+'/revisions_txt/{}/'.format(LANGUAGE_CODE)
TEXT_OUTDIR_lang_LOCAL = TEXT_OUTDIR_LOCAL+'/{}/'.format(LANGUAGE_CODE)
TEXT_OUTDIR_en_LOCAL = TEXT_OUTDIR_LOCAL+'/{}/'.format('en')

FEATURES_OUTDIR_LOCAL=OUTDIR_LOCAL+'/revisions_features/{}/'.format(LANGUAGE_CODE)
FEATURES_OUTDIR_lang_LOCAL = FEATURES_OUTDIR_LOCAL+'/{}/'.format(LANGUAGE_CODE)
FEATURES_OUTDIR_en_LOCAL = FEATURES_OUTDIR_LOCAL+'/{}/'.format('en')

#get reliability related templates
TEMPLATES_EN = [l.strip() for l in  open(TEMPLATES_FILE_EN)]

In [39]:
!hadoop fs -mkdir -p $outputHDFS $REVISIONS_OUTDIR $META_OUTDIR $PAIRS_OUTDIR $PAIRS_OUTDIR_lang $PAIRS_OUTDIR_en $TEXT_OUTDIR $FEATURES_OUTDIR

In [40]:
!mkdir -p $OUTDIR_LOCAL $META_OUTDIR_LOCAL $PAIRS_OUTDIR_LOCAL $PAIRS_OUTDIR_lang_LOCAL $PAIRS_OUTDIR_en_LOCAL $TEXT_OUTDIR_LOCAL $TEXT_OUTDIR_lang_LOCAL $TEXT_OUTDIR_en_LOCAL $FEATURES_OUTDIR_LOCAL $FEATURES_OUTDIR_lang_LOCAL $FEATURES_OUTDIR_en_LOCAL

#### CONNECT TO SESSIONS

In [3]:
# mwapi session
MWAPI_SESSION = mwapi.Session('https://en.wikipedia.org', user_agent='wikireliability -- kaywong@wikimedia')
# get spark session
spark = wmfdata.spark.get_session(type='yarn-large')
from pyspark.sql.functions import udf,col, array
from pyspark.sql.types import ArrayType, StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import explode

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


## 1. Process revisions for templates

In [10]:
def get_langlink_templates(templates, langcode, mwapi_session):
    """
    Get language linked title for list of `templates` from `langcode` wiki
    
    `templates`: list of templates to get language linked title
    `langcode`: language code of wikiproject to search
    """
    params = {'action':'query',
              'prop':'langlinks',
              'llprop': 'url|langname|autonym',
              'lllang': langcode,
              'format': 'json'
             }
    lang_templates = []

    for template in templates:
        params['titles'] = 'Template:{}'.format(template)
        langlinks = mwapi_session.get(params)
        langtitle = next(iter(langlinks['query']['pages'].values()))
        langtitle = langtitle.get('langlinks', [{}])[0].get('*', None)
        if langtitle:
            langtitle=langtitle.split(':')[1].lower()
        lang_templates.append(langtitle)

    template_titles_map = dict(zip(templates, lang_templates))
    templates_lang= [t for t in template_titles_map.values() if t!=None]
    
    return template_titles_map, templates_lang

def getTemplatesRegexReliability(wikitext, templates):
    """
    Extract list of templates from `wikitext` for an article via simple regex, 
    filtered by list of `templates`.
    
    `wikitext`: wikitext to parse for templates
    `templates`: list of templates to filter by
    
    Known Issues:
    * Doesn't handle nested templates (just gets the first)
    -- e.g., '{{cite web|url=http://www.allmusic.com/|ref={{harvid|AllMusic}}}}' would be just web
    """
    try:
        all_templates = list(set([m.split('|')[0].strip() for m in re.findall('(?<=\{\{)(.*?)(?=\}\})', wikitext, flags=re.DOTALL)]))
        all_templates = [str(t).strip().lower() for t in all_templates]
        matching_templates = [template for template in all_templates if template in templates]
        if len(matching_templates) > 0:
            return matching_templates
        else:
            return None
        
    except Exception:
        return None

In [114]:
print("=============================================================")
print("Processing revision templates for {}".format(LANGUAGE_CODE))

t1 = time()        

# Get language linked template titles
template_titles_map, templates_lang= get_langlink_templates(TEMPLATES_EN, LANGUAGE_CODE, MWAPI_SESSION)
templates_ALL= TEMPLATES_EN+templates_lang
templates_ALL=list(set([t.lower() for t in templates_ALL]))
print("LANG TEMPLATES: \n{}\nEN TEMPLATES: \n{}\nALL TEMPLATES: \n{}".format(templates_lang, TEMPLATES_EN, templates_ALL))

print("\nGet revisions containing reliability templates...")
# Get wikitexthistory for this language wiki
wikidb = "{}wiki".format(LANGUAGE_CODE)
wikitext_history = spark.sql('''SELECT page_id,revision_id,revision_text,page_namespace FROM wmf.mediawiki_wikitext_history 
    WHERE snapshot ="{snapshot}" and wiki_db ="{wikidb}"'''.format(wikidb=wikidb,snapshot=SNAPSHOT))

# Get revisions containing the reliability templates
# curry getTemplatesRegexReliability for specific templates
getTemplatesRegexReliability_lang = udf(lambda wktext: getTemplatesRegexReliability(wktext, templates_ALL), \
                              returnType=ArrayType(StringType()) )
wikitext_history = wikitext_history.withColumn("templates",getTemplatesRegexReliability_lang(col('revision_text')))
revisions_with_templates = wikitext_history.select(wikitext_history.page_id,wikitext_history.revision_id,explode(wikitext_history.templates))

# Get template counts
template_counts= revisions_with_templates.select('col').groupBy('col').count().orderBy('count', ascending=False)
template_counts.show()
template_counts.coalesce(1).write.mode('overwrite').option('header','true').csv(REVISIONS_OUTDIR+'{}_templatecounts.csv'.format(LANGUAGE_CODE))

print("\nWriting to parquet file...")
# Write to parquet file
revisions_with_templates.write.parquet(REVISIONS_OUTDIR+'/{}_templates.parquet'.format(LANGUAGE_CODE),mode='overwrite')

t2 = time()
print('Time taken: ',t2-t1)

Processing revision templates for ms
LANG TEMPLATES: 
['pertikaian', 'tipu', 'pengesahan', 'tiada rujukan', 'penyelidikan asli', 'sumber tidak dipercayai', 'citation needed']
EN TEMPLATES: 
['pov', 'disputed', 'third-party', 'contradict', 'hoax', 'more_citations_needed', 'unreferenced', 'original_research', 'unreliable_sources', 'one_source', 'citation_needed']
ALL TEMPLATES: 
['more_citations_needed', 'pov', 'pengesahan', 'penyelidikan asli', 'third-party', 'unreliable_sources', 'citation needed', 'sumber tidak dipercayai', 'tiada rujukan', 'original_research', 'unreferenced', 'one_source', 'citation_needed', 'hoax', 'tipu', 'pertikaian', 'disputed', 'contradict']

Get revisions containing reliability templates...
+--------------------+-----+
|                 col|count|
+--------------------+-----+
|     citation needed|26587|
|        unreferenced|10249|
|          pengesahan| 3391|
|                 pov| 2080|
|       tiada rujukan| 1265|
|            disputed|  447|
|   penyelidik

## 2. Get revision metadata

In [7]:
def get_reverted(revisions_with_template, template, outf, localoutf): 
    """
    Get meta-info for revisions from the mediawiki_history table, w/ revertion information
    
    `revisions_with_template`: Spark dataframe of revisions
    `template`: Template to process
    `outf`: output file path on HDFS
    `localoutf`: output file path on local machine
    """
    t1 = time()
    print(template)
    df = revisions_with_template.where(revisions_with_template['col']==template)
    df.cache()
    t2 = time()
    print('read table, done',t2-t1)
    t1 = time()        
    page_ids = df.select('page_id').distinct() # get all unique pages transcluding a template
    page_ids.createOrReplaceTempView('tmp_page_ids')
    revision_ids = df.select('revision_id').distinct()
    revision_ids.createOrReplaceTempView('tmp_revision_ids')
    reverts= spark.sql('''
        SELECT w.event_timestamp, w.page_title,w.page_id, w.page_namespace,
        w.revision_id, w.revision_is_identity_reverted, 
        w.revision_minor_edit, w.revision_text_bytes, 
        w.revision_first_identity_reverting_revision_id, w.revision_seconds_to_identity_revert,
        w.event_user_id,w.event_user_registration_timestamp, 
        w.event_user_is_anonymous,w.event_user_revision_count,
        CASE WHEN r.revision_id IS NOT NULL  THEN 1 ELSE 0 END has_template,
        w.event_comment

        FROM mediawiki_history_subset w LEFT OUTER JOIN tmp_revision_ids r 
                                    ON (w.revision_id = r.revision_id)

        WHERE  w.page_id IN (
                            SELECT  page_id FROM tmp_page_ids) 
        ORDER BY page_id, w.revision_id
        ''') 

    if not reverts.rdd.isEmpty():
        reverts.repartition(1).write.csv(outf,header=True,mode='overwrite',sep='\t')
        !hadoop fs -text "$outf/*"  | gzip -c > $localoutf-meta.csv.gz
        print("Saved table")
    else: 
        print("empty df")
    t2 = time()
    print('done',t2-t1)

In [180]:
print("==============================================================================================")
print("Processing revision templates for {}".format(LANGUAGE_CODE))     

# Get language linked template titles
template_titles_map, templates_lang = get_langlink_templates(TEMPLATES_EN, LANGUAGE_CODE, MWAPI_SESSION)
lang2en = {v: k for k, v in template_titles_map.items()}
print("LANG TEMPLATES: \n{}\nEN TEMPLATES: \n{}\n".format(templates_lang, TEMPLATES_EN))

# Read in revisions_with_template file
rev_inf=REVISIONS_OUTDIR+'/{}_templates.parquet'.format(LANGUAGE_CODE)
print("Read in revisions file")
revisions_with_template = spark.read.parquet(rev_inf)
revisions_with_template.cache()

# Fet subset of pages from mwhistory which have >=1 occurence of reliability template
pages_templates_subset = revisions_with_template.select('page_id').distinct()
pages_templates_subset.createOrReplaceTempView('pages_templates_subset')

mediawiki_history_subset =  spark.sql('''
        SELECT w.event_timestamp, w.page_title,w.page_id,w.page_namespace, 
        w.revision_id, w.revision_is_identity_reverted, 
        w.revision_minor_edit, w.revision_text_bytes, 
        w.revision_first_identity_reverting_revision_id, w.revision_seconds_to_identity_revert,
        w.event_user_id,w.event_user_registration_timestamp, 
        w.event_user_is_anonymous,w.event_user_revision_count,

        w.event_comment
        FROM wmf.mediawiki_history w
        WHERE w.snapshot ="{0}" and w.wiki_db ="{1}wiki" AND  
      w.event_entity = 'revision' AND w.page_id IN (
                    SELECT  page_id FROM pages_templates_subset)                   
        '''.format(SNAPSHOT, LANGUAGE_CODE))
mediawiki_history_subset.cache()
mediawiki_history_subset.createOrReplaceTempView('mediawiki_history_subset')

# For lang version of templates
for template in templates_lang:
    try:
        outf=META_OUTDIR+"/{}-{}".format(lang2en[template], LANGUAGE_CODE)
        localoutf = META_OUTDIR_LOCAL+"/{}-{}".format(lang2en[template], LANGUAGE_CODE)
        get_reverted(revisions_with_template, template, outf, localoutf)

    except Exception as e:
        print('error',e)

# For EN version of templates
for template in TEMPLATES_EN:
    try:
        outf = META_OUTDIR+"/{}-en".format(template)
        localoutf = META_OUTDIR_LOCAL+"/{}-en".format(template)
        get_reverted(revisions_with_template, template, outf, localoutf)

    except Exception as e:
        print('error',e)


Processing revision templates for ms
LANG TEMPLATES: 
['pertikaian', 'tipu', 'pengesahan', 'tiada rujukan', 'penyelidikan asli', 'sumber tidak dipercayai', 'citation needed']
EN TEMPLATES: 
['pov', 'disputed', 'third-party', 'contradict', 'hoax', 'more_citations_needed', 'unreferenced', 'original_research', 'unreliable_sources', 'one_source', 'citation_needed']

Read in revisions file
pertikaian
read table, done 0.006998300552368164
21/06/21 14:08:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Saved table
done 273.1199948787689
tipu
read table, done 0.0901646614074707
empty df
done 25.68256974220276
pengesahan
read table, done 0.07528209686279297
21/06/21 14:09:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Saved table
done 34.868781089782715
tiada rujukan
read table, done 0.0652918815612793
21/06/21 14:09:36 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Saved table
done 17.958820343017578
penyelidikan asli
read table, done 0.08086013

## 3. Get template addition/removal pairs

In [None]:
def getPosNegRevisionPairs(inputF, outF, outFlocal):
    '''
    Get all positive/negative revision pairs of template addition and removal
    
    `inputF`: File path to revisions meta info
    `outF`: output file path on HDFS
    `outFlocal`: output file path on local machine
    '''
    
    # Read input table
    t1 = time()
    revisions_meta = spark.read \
                     .option("header", "true") \
                     .option("delimiter", "\t") \
                     .csv(inputF)
    
    t2 = time()
    print('read table, done',t2-t1)

    # Check table is not empty    
    assert revisions_meta.rdd.isEmpty()==False, 'Empty table'

    # Remove reverted revisions
    t1 = time()
    rev_unreverted = revisions_meta.where(revisions_meta['revision_is_identity_reverted']==False)
    # Filter pages without has_template
    pages_hastemplate_subset = rev_unreverted.where(rev_unreverted['has_template']==1).select('page_id').distinct()
    pages_hastemplate_subset.createOrReplaceTempView('pages_hastemplate_subset')
    rev_filtered = rev_unreverted.join(pages_hastemplate_subset,'page_id','inner')
    rev_filtered = rev_filtered.select(col('page_id'), col('revision_id'), col('revision_minor_edit'), col('revision_text_bytes'), col('event_user_is_anonymous'), col('has_template'))
    t2 = time()
    print("\t Remove reverted count: {}".format(rev_unreverted.count()))
    print("\t Filtered pages count: {}".format(rev_filtered.count()))
    print('Filter reverted revisions, done',t2-t1)


    # Extract positive and negative revision pairs
    ## Gets all revisions with change in has template
    rev_filtered.createOrReplaceTempView('rev_filtered')
    t1 = time()
    rev_posneg = spark.sql(
        '''
        SELECT r.page_id, r.revision_id, r.revision_minor_edit, r.revision_text_bytes, r.event_user_is_anonymous, r.has_template
            FROM (SELECT *, 
                  LAG(has_template) OVER (PARTITION BY page_id ORDER BY CAST(revision_id AS INT)) as prev_has_template
                  FROM rev_filtered) r
        WHERE (has_template <> prev_has_template) OR (has_template=1 AND prev_has_template IS NULL)
        '''
    )

    ## Filters groups of pages to select pairs
    rev_posneg.createOrReplaceTempView('rev_posneg')
    rev_posneg_filtered = spark.sql(
        '''
        SELECT *
        FROM
            (/* Select all pages with even number of revisions*/
            SELECT R1.page_id, R1.revision_id, R1.revision_minor_edit, R1.revision_text_bytes, R1.event_user_is_anonymous, R1.has_template 
            FROM rev_posneg R1
            INNER JOIN 
                (SELECT page_id
                FROM rev_posneg
                GROUP BY page_id
                HAVING COUNT(page_id) % 2 = 0
               ) R2
            ON R1.page_id = R2.page_id)
        UNION ALL
            (/* Select all pages with odd number of revisions, but removing last row */
            SELECT R3.page_id, R3.revision_id, R3.revision_minor_edit, R3.revision_text_bytes, R3.event_user_is_anonymous, R3.has_template
                FROM (SELECT R4.page_id, R4.revision_id, R4.revision_minor_edit, R4.revision_text_bytes, R4.event_user_is_anonymous, R4.has_template, 
                      ROW_NUMBER() OVER (PARTITION BY R4.page_id ORDER BY CAST(R4.revision_id AS INT) DESC) AS GROUPEDROWNUM
                  FROM rev_posneg R4
                      INNER JOIN 
                      (SELECT page_id
                       FROM rev_posneg
                       GROUP BY page_id
                       HAVING COUNT(page_id) % 2 <> 0
                      ) R5
                  ON R4.page_id = R5.page_id
                ) R3    
            WHERE R3.GROUPEDROWNUM >1
            ORDER BY R3.page_id, CAST(R3.revision_id AS INT))
        '''
    )
    t2 = time()
    print('Extract template addition and removal pairs, done',t2-t1)

    # Add revision_id pair ID
    t1 = time()
    rev_posneg_filtered.createOrReplaceTempView('rev_posneg_filtered')

    rev_posneg_final = spark.sql(
            '''
            SELECT page_id, revision_id, revision_id_key, revision_minor_edit, revision_text_bytes, event_user_is_anonymous, has_template
            FROM 
                (SELECT R1.page_id, R1.revision_id, R1.revision_id_key, R1.revision_minor_edit, R1.revision_text_bytes, R1.event_user_is_anonymous, R1.has_template
                    FROM 
                        (SELECT *, 
                        LEAD(revision_id) OVER (ORDER BY page_id, CAST(revision_id AS INT)) AS revision_id_key,
                        ROW_NUMBER() OVER (ORDER BY page_id, CAST(revision_id AS INT)) AS ROWNUM
                        FROM rev_posneg_filtered) R1
                    WHERE R1.ROWNUM % 2 <>0)
                UNION ALL
                    (SELECT R2.page_id, R2.revision_id, R2.revision_id_key, R2.revision_minor_edit, R2.revision_text_bytes, R2.event_user_is_anonymous, R2.has_template
                    FROM 
                        (SELECT *, 
                        LAG(revision_id) OVER (ORDER BY page_id, CAST(revision_id AS INT)) AS revision_id_key,
                        ROW_NUMBER() OVER (ORDER BY page_id, CAST(revision_id AS INT)) AS ROWNUM
                        FROM rev_posneg_filtered) R2
                    WHERE R2.ROWNUM % 2 =0) 

            ORDER BY page_id, CAST(revision_id AS INT)

            '''
        )
    t2 = time()
    print('Template pairs, done',t2-t1)
    
    # Remove empty pages
    rev_posneg_final= rev_posneg_final.filter(~(col("txt_neg")== ""))

    t1 = time()
    if not rev_posneg_final.rdd.isEmpty():
        rev_posneg_final.repartition(1).write.csv(outF,header=True,mode='overwrite',sep='\t')
        !hadoop fs -text "$outF/*"  | gzip -c > $outFlocal-pairs.csv.gz
        print('Saved to {}'.format(outF))
    else:
        print("EMPTY TABLE")

    t2 = time()        
    print('done', t2-t1)

In [186]:
TEMPLATE_FNAMES = glob.glob(META_OUTDIR_LOCAL+'/*.csv.gz')

for template_fname in TEMPLATE_FNAMES:
    searchstr = "(.*)-({0}|en)-meta.csv.gz".format(LANGUAGE_CODE)
    template_fname = template_fname.split('/')[-1]
    template_match = re.match(searchstr, template_fname)
    template = template_match[1] if template_match else None
    template_lang = template_match[2] if template_match else None
    
    inputF = META_OUTDIR+"/{0}-{1}".format(template, template_lang)
    outF = PAIRS_OUTDIR+'/{0}/{1}-{0}-pairs.csv'.format(template_lang, template)
    outFlocal = PAIRS_OUTDIR_LOCAL+'/{0}/{1}-{0}'.format(template_lang, template)
    
    print("\nGet template addition/removal pairs for {}, {}".format(template, template_lang))
    try:
        getPosNegRevisionPairs(inputF, outF, outFlocal) 
    except Exception as e:
        print(e)


Get template addition/removal pairs for more_citations_needed, ms
read table, done 1.8162105083465576
	 Remove reverted count: 14763
	 Filtered pages count: 14614
Filter reverted revisions, done 0.07820963859558105
Extract template addition and removal pairs, done 0.15042448043823242
Template pairs, done 0.08742117881774902
21/06/21 14:18:29 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Saved to ./HDFSout//revisions_pairs/ms//ms/more_citations_needed-ms-pairs.csv
done 11.594306468963623

Get template addition/removal pairs for original_research, ms
read table, done 0.2519402503967285
	 Remove reverted count: 1518
	 Filtered pages count: 849
Filter reverted revisions, done 0.05218029022216797
Extract template addition and removal pairs, done 0.11555790901184082
Template pairs, done 0.04957413673400879
21/06/21 14:18:39 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Saved to ./HDFSout//revisions_pairs/ms//ms/original_research-ms-pairs.csv
done 8.3641870021

## 4. Get revision text and diff text

In [6]:
@udf(returnType=StringType())
def getContentText(wikicode):
    """
    Parse wikicode for plain content text
    
    `wikicode`: wikicode/revision text to parse
    """
    
    parsed_wc = mwp.parse(wikicode or "")
    sections = parsed_wc.get_sections()
    sections = [section.strip_code().strip() for section in sections]
    filtered_sections= [section for section in sections if not section.startswith(("See also", "References", "External links", "Footnotes","Further reading", "Bibliography" ))]
    content_txt = "\n".join(filtered_sections)
        
    return content_txt

def getDiffText(pos_txt, neg_txt):
    """
    Get diff'd sections between two versions of text
    
    `pos_txt`: Positive instance of template addition
    `neg_txt`: Negative instance of template removal
    """
    diff_pos=[]; diff_neg=[]

    for l in difflib.ndiff(pos_txt.splitlines(), neg_txt.splitlines()):
        if l[0] =='-':
            diff_pos.append(l)
        elif l[0]=='+':
            diff_neg.append(l)
        else:
            pass
        
    difftxt_pos=" ".join([l[2:] for l in diff_pos])
    difftxt_neg=" ".join([l[2:] for l in diff_neg])
    
    return (difftxt_pos, difftxt_neg)

# Instantiate getDiffText UDF which returns multiple fields
schema = StructType([
    StructField("difftxt_pos", StringType(), False),
    StructField("difftxt_neg", StringType(), False)
    ])

getDiffText_udf = udf(getDiffText, schema)

In [56]:
def getPosNegTextPairs(inputF, outputF, outFlocal):
    """
    Processes posneg text pairs for `inputF` and writes output to JSON
    
    `inputF`: input file name of pairs data
    `outputF`: output filepath on HDFS
    `outFlocal`: output filepath local
    """
    # Get rev posneg pairs
    rev_posneg = spark.read \
                     .option("header", "true") \
                     .option("delimiter", "\t") \
                     .csv(inputF)

    # Get wikitexthistory for this language wiki
    wikidb = "{}wiki".format(LANGUAGE_CODE)
    wikitext_history = spark.sql('''SELECT page_id,revision_id,revision_text,page_namespace FROM wmf.mediawiki_wikitext_history 
        WHERE snapshot ="{snapshot}" and wiki_db ="{wikidb}"'''.format(wikidb=wikidb,snapshot=SNAPSHOT))

    # Join on both to get revtext
    rev_posneg_txt = rev_posneg.join(wikitext_history,['page_id', 'revision_id'],'left')

    # Parse wikitext for plain content text
    rev_posneg_txt = rev_posneg_txt.withColumn("txt", getContentText(col('revision_text')))
    rev_posneg_txt = rev_posneg_txt.select(col('page_id'), col('revision_id'), col('revision_id_key'), col('has_template'), col('txt'))

    # Join posneg pair entries as a single row
    rev_pos_txt = rev_posneg_txt.where(rev_posneg_txt['has_template']==1)
    rev_pos_txt = rev_pos_txt.withColumnRenamed("revision_id", "revision_id_pos")\
                             .withColumnRenamed("revision_id_key", "revision_id_neg")\
                             .withColumnRenamed("txt", "txt_pos")

    rev_neg_txt = rev_posneg_txt.where(rev_posneg_txt['has_template']==0)
    rev_neg_txt = rev_neg_txt.withColumnRenamed("revision_id", "revision_id_neg")\
                             .withColumnRenamed("revision_id_key", "revision_id_pos")\
                             .withColumnRenamed("txt", "txt_neg")

    rev_pairs_txt = rev_pos_txt.join(rev_neg_txt, ['page_id', 'revision_id_pos', 'revision_id_neg'], 'inner')
    rev_pairs_txt = rev_pairs_txt.select(col('page_id'), col('revision_id_pos'), col('revision_id_neg'), col('txt_pos'), col('txt_neg'))
        
    # Write txt to output file
    rev_pairs_txt.repartition(1).write.format('json').save(outputF, mode='overwrite')
    !hadoop fs -text "$outputF/*"  | gzip -c > $outFlocal-txt.json.gz

    # Get diff'd text sections between pos and neg text and write to output file   
    rev_pairs_difftxt = rev_pairs_txt.withColumn("diffResults", getDiffText_udf(rev_pairs_txt.txt_pos, rev_pairs_txt.txt_neg))
    diff_outputF = outputF.replace("txt", "difftxt")
    rev_pairs_difftxt = rev_pairs_difftxt.select(col('page_id'), col('revision_id_pos'), col('revision_id_neg'), col('diffResults.*'))
    rev_pairs_difftxt.repartition(1).write.format('json').save(diff_outputF, mode='overwrite')
    !hadoop fs -text "$diff_outputF/*"  | gzip -c > $outFlocal-difftxt.json.gz

In [57]:
searchstr = "(.*)-({0}|en)+\-pairs.csv".format(LANGUAGE_CODE)

# Language version of template
TEMPLATE_FNAMES_lang = glob.glob(PAIRS_OUTDIR_lang_LOCAL+'/*pairs.csv.gz')
template_lang =LANGUAGE_CODE

for template_fname in TEMPLATE_FNAMES_lang:
    template_fname = template_fname.split('/')[-1]
    template_match = re.match(searchstr, template_fname)
    template = template_match[1] if template_match else None
    
    inputF = PAIRS_OUTDIR_lang+"/{0}-{1}{2}-pairs.csv".format(template, LANGUAGE_CODE, template_lang)
    outputF = TEXT_OUTDIR+'{0}/{1}-{2}{0}-txt.json'.format(template_lang, template,LANGUAGE_CODE)
    outFlocal = TEXT_OUTDIR_LOCAL+'{0}/{1}-{2}{0}'.format(template_lang, template, LANGUAGE_CODE)
    print("Get text for template addition/removal pairs {}, {}".format(inputF, outputF, outFlocal))
    getPosNegTextPairs(inputF, outputF, outFlocal)
    
# En version of template
TEMPLATE_FNAMES_en = glob.glob(PAIRS_OUTDIR_en_LOCAL+'/*pairs.csv.gz')
template_lang ='en'
for template_fname in TEMPLATE_FNAMES_en:
    template_fname = template_fname.split('/')[-1]
    template_match = re.match(searchstr, template_fname)
    template = template_match[1] if template_match else None

    inputF = PAIRS_OUTDIR_en+"/{0}-{1}{2}-pairs.csv".format(template, LANGUAGE_CODE, template_lang)
    outputF = TEXT_OUTDIR+'{0}/{1}-{2}{0}-txt.json'.format(template_lang, template,LANGUAGE_CODE)
    outFlocal = TEXT_OUTDIR_LOCAL+'{0}/{1}-{2}{0}'.format(template_lang, template, LANGUAGE_CODE)
    
    print("Get text for template addition/removal pairs {}, {}".format(inputF, outputF, outFlocal))
    getPosNegTextPairs(inputF, outputF, outFlocal)

Get text for template addition/removal pairs ./HDFSout//revisions_pairs/id//id//unreliable_sources-idid-pairs.csv, ./HDFSout//revisions_txt/id/id/unreliable_sources-idid-txt.json
21/07/01 07:39:04 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
21/07/01 07:40:02 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Get text for template addition/removal pairs ./HDFSout//revisions_pairs/id//id//one_source-idid-pairs.csv, ./HDFSout//revisions_txt/id/id/one_source-idid-txt.json
21/07/01 07:40:44 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
21/07/01 07:41:28 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Get text for template addition/removal pairs ./HDFSout//revisions_pairs/id//id//citation_needed-idid-pairs.csv, ./HDFSout//revisions_txt/id/id/citation_needed-idid-txt.json
21/07/01 07:42:23 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
21/07/01 07:43:55 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Get t

## 5. Process features

In [73]:
sfn_templates = [t.lower() for t in ["Shortened footnote template", "sfn", "Sfnp", "Sfnm", "Sfnmp"]]

#@udf(returnType=IntegerType())
def getNumRefs(wc):
    """
    Extract list of links from wikitext for an article via mwparserfromhell.
    `wc`: Wikicode parsed by mwparser
    """  
    try:
        #wc = mwparserfromhell.parse(wikitext)
        num_ref_tags = len([t.tag for t in wc.filter_tags() if t.tag == 'ref'])
        num_sftn_templates = len([t.name for t in wc.filter_templates() if t.name.lower() in sfn_templates])
        return num_ref_tags + num_sftn_templates
    except Exception:
        return None
    
#@udf(returnType=IntegerType())
def getNumHeadings(wc, max_level=None):
    """
    Extract list of headings from wikitext for an article.
    `wc`: Wikicode parsed by mwparser
    """
    try:
        #wc = mwparserfromhell.parse(wikitext)
        if max_level is None:
            return len([1 for l in wc.filter_headings()])
        else:
            return len([1 for l in wc.filter_headings() if l.level <= max_level])
    except Exception:
        return None

def getImageNamespaceAlias(LANGUAGE_CODE):
    """
    Get aliases for "Image" namespace in a langwiki
    `LANGUAGE_CODE`: Language code of langwiki
    """
    
    S = requests.Session()
    URL = "https://{}.wikipedia.org/w/api.php".format(LANGUAGE_CODE)
    params = {'action':'query',
              'meta':'siteinfo',
              'siprop':'namespacealiases|namespaces',
              'format': 'json'
             }
    R = S.get(url=URL, params=params)
    DATA = R.json()
    
    # Get aliases for "Image" namespace in this langwiki
    image_ns_alias = [alias['*'] for alias in DATA['query']['namespacealiases'] if alias['id']==6 ]
    image_ns_alias.append("File")
    image_ns_alias = tuple(['[['+alias+':' for alias in image_ns_alias])
    
    return image_ns_alias

def getNumImages(wc, image_ns_alias):    
    """
    Extract number of images from an article
    `wc`: Wikicode parsed by mwparser
    `image_ns_alias`: tuple of namespace aliases for "Image" for specific languagewiki
    """
    images = [l for l in wc.filter_wikilinks() if l.startswith(image_ns_alias)]
    if images:
        print(images)
    return len(images)

def getRevisionFeatures(wikitext, LANGUAGE_CODE):
    """
    Extract revision features for a single revision wikitext
    `wiktext`: Wikitext of revision
    `LANGUAGE_CODE`: language code of languagewiki
    """

    wc = mwp.parse(wikitext)
    
    if wc: 
        page_len = len(wikitext)  #normalise??
        num_refs = getNumRefs(wc)
        num_headings = getNumHeadings(wc)
        image_ns_alias = getImageNamespaceAlias(LANGUAGE_CODE)
        num_images = getNumImages(wc, image_ns_alias)
    else:
        page_len=0; num_refs=0; num_headings=0; num_images=0
        
    return (page_len, num_refs, num_headings, num_images)

# Instantiate getDiffText UDF which returns multiple fields
rev_features_schema = StructType([
    StructField("page_len", IntegerType(), False),
    StructField("num_refs", IntegerType(), False),
    StructField("num_headings", IntegerType(), False),
    StructField("num_images", IntegerType(), False)
    ])

getRevisionFeatures_udf = udf(lambda wktext:getRevisionFeatures(wktext, LANGUAGE_CODE), \
                              rev_features_schema)

In [81]:
def getPosNegFeatures(inputF,outputF, outFlocal):
    """
    Processes posneg feature pairs for `inputF` and writes output to CSV
    
    `inputF`: input file name of pairs data
    `outputF`: output filepath on HDFS
    `outFlocal`: output filepath local
    """
    # Get rev posneg pairs
    rev_posneg = spark.read \
                     .option("header", "true") \
                     .option("delimiter", "\t") \
                     .csv(inputF)

    # Get wikitexthistory for this language wiki
    wikidb = "{}wiki".format(LANGUAGE_CODE)
    wikitext_history = spark.sql('''SELECT page_id,revision_id,revision_text,page_namespace FROM wmf.mediawiki_wikitext_history 
        WHERE snapshot ="{snapshot}" and wiki_db ="{wikidb}"'''.format(wikidb=wikidb,snapshot=SNAPSHOT))
    rev_posneg = rev_posneg.join(wikitext_history,['page_id','revision_id'],'left')

    # Get language agnostic metadata features
    rev_posneg_features = rev_posneg.withColumn("revFeatures", getRevisionFeatures_udf(rev_posneg.revision_text))
    rev_posneg_features_expanded = rev_posneg_features.select(col('page_id'), col('revision_id'), col('revision_id_key'), col('has_template'), col('revFeatures.*'))

    # Write
    rev_posneg_features_expanded.repartition(1).write.csv(outputF,header=True,mode='overwrite',sep='\t')
    !hadoop fs -text "$outputF/*"  | gzip -c > $outFlocal-features.csv.gz

In [85]:
searchstr = "(.*)-({0}|en)+\-pairs.csv".format(LANGUAGE_CODE)

# Language version of template
TEMPLATE_FNAMES_lang = glob.glob(PAIRS_OUTDIR_lang_LOCAL+'/*pairs.csv.gz')
template_lang =LANGUAGE_CODE

for template_fname in TEMPLATE_FNAMES_lang[:1]:
    template_fname = template_fname.split('/')[-1]
    template_match = re.match(searchstr, template_fname)
    template = template_match[1] if template_match else None
    
    inputF = PAIRS_OUTDIR_lang+"/{0}-{1}{2}-pairs.csv".format(template, LANGUAGE_CODE, template_lang)
    outputF = FEATURES_OUTDIR+'{0}/{1}-{2}{0}-features.csv'.format(template_lang, template,LANGUAGE_CODE)
    outFlocal = FEATURES_OUTDIR_LOCAL+'{0}/{1}-{2}{0}'.format(template_lang, template, LANGUAGE_CODE)
    print("Get features for template addition/removal pairs {}, {}".format(inputF, outputF, outFlocal))
    getPosNegFeatures(inputF, outputF, outFlocal)
    
# En version of template
TEMPLATE_FNAMES_en = glob.glob(PAIRS_OUTDIR_en_LOCAL+'/*pairs.csv.gz')
template_lang ='en'
for template_fname in TEMPLATE_FNAMES_en[:1]:
    template_fname = template_fname.split('/')[-1]
    template_match = re.match(searchstr, template_fname)
    template = template_match[1] if template_match else None

    inputF = PAIRS_OUTDIR_en+"/{0}-{1}{2}-pairs.csv".format(template, LANGUAGE_CODE, template_lang)
    outputF = FEATURES_OUTDIR+'{0}/{1}-{2}{0}-features.csv'.format(template_lang, template,LANGUAGE_CODE)
    outFlocal = FEATURES_OUTDIR_LOCAL+'{0}/{1}-{2}{0}'.format(template_lang, template, LANGUAGE_CODE)
    
    print("Get features for template addition/removal pairs {}, {}".format(inputF, outputF, outFlocal))
    getPosNegFeatures(inputF, outputF, outFlocal)

Get features for template addition/removal pairs ./HDFSout//revisions_pairs/id//id//unreliable_sources-idid-pairs.csv, ./HDFSout//revisions_features/id/id/unreliable_sources-idid-features.csv
21/07/24 05:03:18 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
Get features for template addition/removal pairs ./HDFSout//revisions_pairs/id//en//third-party-iden-pairs.csv, ./HDFSout//revisions_features/id/en/third-party-iden-features.csv
21/07/24 05:03:56 INFO compress.CodecPool: Got brand-new decompressor [.snappy]
