In [None]:
!pip install mwapi mwparserfromhell

In [None]:
import mwapi
from mwapi.errors import APIError
import mwparserfromhell as parser
import re
import pandas as pd

In [None]:
def get_transcluded_pages(session, template):
    continued = session.get(
        formatversion=2,
        action='query',
        prop='transcludedin',
        titles=f"Template:{template}",
        continuation=True
    )

    pages = []
    try:
        for portion in continued:
            if 'query' in portion:
                for page in portion['query']['pages']:
                    try:
                        for transcluded in page['transcludedin']:
                            pages.append(transcluded["title"])
                    except:
                        pass
            else:
                print("MediaWiki returned empty result batch.")
    except APIError as error:
        raise ValueError(
            "MediaWiki returned an error:", str(error)
        )
    
    return pages

In [None]:
def extract_sparql(session, p, t):
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=p
    )

    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    templates = wikitext.filter_templates()
    templates = list(filter(lambda template: t in template, templates))
    if t == "Wikidata list":
        templates = list(filter(lambda template: template != "{{Wikidata list end}}", templates))
    
    out = []
    for template in templates:
        out.append(template.split("|")[1].split("=")[1])
        
    return out

In [None]:
def check_templates(template):
    for t in templates:
        if t in template:
            return True
    return False

def split_string_and_extract_preceding(s, delimiter):
    parts = s.split(delimiter)  # Split the string by the delimiter.
    preceding_texts = []  # Initialize a list to hold the preceding text segments.
    
    search_pos = 0  # Start position for each search iteration.
    for part in parts[:-1]:  # Ignore the last part since no split occurs after it.
        # Calculate the start position of the current part in the original string.
        current_part_start = s.find(part, search_pos)
        # Calculate the end position of the current part, which is the split point.
        split_point = current_part_start + len(part)
        
        # Determine the start position for extracting preceding characters.
        # It's the greater of 0 and split_point - 300 to avoid negative indices.
        extract_start = max(0, split_point - 300)
        
        # Extract up to 250 characters preceding the split point.
        preceding_text = s[extract_start:split_point]
        preceding_texts.append(preceding_text)
        
        # Update the search position for the next iteration.
        search_pos = split_point + len(delimiter)
    
    return preceding_texts[0]

In [None]:
def get_sparql_and_surrounding(title):
    out = []
    resp = session.get(
        formatversion=2,
        action='query',
        prop='revisions',
        rvslots='*',
        rvprop='content',
        titles=title
    )
    content = resp['query']['pages'][0]['revisions'][0]['slots']['main']['content']
    wikitext = parser.parse(content)
    wikitext_templates = list(filter(check_templates, wikitext.filter_templates()))
    wikitext_templates = list(filter(lambda template: template != "{{Wikidata list end}}", wikitext_templates))
    if '{{query page' in wikitext:
        lede = wikitext[:250]
        query = re.split("query\s*=\s*", str(wikitext))[1].split("|")[0]
        text = None
        results = None
        out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
    elif len(wikitext_templates) > 0:
        for wt in wikitext_templates:
            lede = wikitext[:250]
            text = split_string_and_extract_preceding(wikitext, str(wt))
            results = None
            if "wdquery" in wt.lower():
                query = re.split("query\s*=\s*", str(wt))[1].split("|")[0]
            elif "complex constraint" in wt.lower():
                lede = re.split("label\s*=\s*", str(wt))[1].split("|")[0]
                text = re.split("description\s*=\s*", str(wt))[1].split("|")[0]
                query = re.split("sparql\s*=\s*", str(wt))[1].split("|")[0]
            elif "wikidata list" in wt.lower():
                ts = wikitext.find(str(wt))
                te = wikitext.lower().find("{{wikidata list end}}")
                truncated = wikitext[ts:te]
                results = truncated[truncated.find("{|"):truncated.find("|}")]
                query = re.split("=\s*", str(wt))[1].split("|")[0]
            else:
                query = wt.split("|")[1].split("=", 1)[1]
            out.append({"title": title, "lede": lede, 'preceding_text': text, 'query': query, 'results': results})
        return out
    return None

In [None]:
templates = [
    "Wikidata list",
    "SPARQL",
    "SPARQL2",
    "SPARQL5",
    "SPARQL Inline",
    "Wdquery",
    "Complex constraint"
]

template_regex_string = "|".join([f"{{{{\s*[{t[0].lower()}|{t[0].upper()}]{t[1:]}\s*\|" for t in templates])

wikis = set()

with open('wikis.txt', 'r') as f:
    for line in f:
        wikis.add(f'https://{line[:-1]}')

big_wikis = [
    'https://en.wikipedia.org',
    'https://fr.wikipedia.org',
    'https://de.wikipedia.org',
    'https://ja.wikipedia.org',
    'https://ru.wikipedia.org',
    'https://pt.wikipedia.org',
    'https://it.wikipedia.org',
    'https://zh.wikipedia.org',
    'https://fa.wikipedia.org',
    'https://ar.wikipedia.org',
    'https://commons.wikimedia.org',
    'https://wikidata.org',
    'https://mediawiki.org'
]

wikis.update(big_wikis)

In [None]:
df = pd.DataFrame(columns=['project', 'title', 'lede', 'preceding_text', 'query', 'results'])

In [None]:
for w in wikis:
    fail_ctr = 0
    print(w)
    session = mwapi.Session(w, user_agent="htriedman sparql corpus bot")
    all_pages = set()
    for t in templates:
        pages = get_transcluded_pages(session, t)
        print(f'template {t} occurs {len(pages)} times on {w}')
        all_pages.update(pages)
    print(f'there are a total of {len(all_pages)} sparql-related pages on {w}')  
    for i, p in enumerate(all_pages):
        if i % 500 == 0:
            print(f'templates seen: {i}')
        try:
            out = get_sparql_and_surrounding(p)
            if out is None:
                continue
            out[0]['project'] = w
            df = pd.concat([df, pd.DataFrame.from_dict(out)])
        except:
            fail_ctr += 1
            if fail_ctr % 50 == 0 and fail_ctr != 0:
                print(f'failures: {fail_ctr}')
            continue

In [None]:
# failure-prone wikis: commons, cswiki, cawiki, nowiki

In [None]:
df

In [None]:
df.to_pickle('wikidata-sparql-templates.pkl')