# Wikipedia Page Views Data Pipeline

This notebook extracts the **top viewed Wikipedia pages** between 2025-11-23 and 2025-11-25, transforms the data into **JSON Lines**, and uploads it to **S3**.



## Imports

In [1]:
import requests
import json
import boto3
from datetime import datetime, timedelta, timezone

## Configuration

Use the **same username as in the in-class Wikipedia Edits assignment**.

- **Username**: `shirind76`
- **S3 Bucket**: `shirind76-wikidata` (format: `<username>-wikidata`)
- **Athena Database**: `shirind76` (format: `<username>`)
- **Lambda**: Write output to the same S3 bucket (`shirind76-wikidata`)


In [2]:
# =====================
# CONFIG
# =====================
USERNAME = "shirind76"          
BUCKET = f"{USERNAME}-wikidata"
PROJECT = "en.wikipedia"
ACCESS = "all-access"


## Extract: Retrieve Top Viewed Pages from the Wikimedia Pageviews API

API docs: https://doc.wikimedia.org/generated-data-platform/aqs/analytics-api/reference/page-views.html

A base date is defined once, and the code automatically loops over the previous three days by subtracting days from this base date. For each date:
- The API is called with the corresponding year, month, and day
- The response is parsed to extract article title, view count, and rank
- The data is enriched with the query date and a retrieval timestamp
- One JSON Lines file is written and uploaded to S3 per day

This approach avoids manual date changes and ensures consistent, repeatable data extraction.

In [5]:
# Choose a base date (change once if needed)
BASE_DATE = datetime(2025, 11, 20)

# Number of days to fetch
N_DAYS = 3

s3 = boto3.client("s3")

# =====================
# FUNCTION
# =====================
def fetch_and_upload(date_obj):
    date_str = date_obj.strftime("%Y-%m-%d")

    url = (
        "https://wikimedia.org/api/rest_v1/metrics/pageviews/top/"
        f"en.wikipedia.org/all-access/{date_obj.strftime('%Y/%m/%d')}"
    )

    print(f"Fetching {date_str}")
    resp = requests.get(
        url,
        headers={"User-Agent": "curl/7.68.0"}
    )
    resp.raise_for_status()
    data = resp.json()

    retrieved_at = datetime.now(timezone.utc).isoformat()

    records = []
    for item in data["items"][0]["articles"]:
        records.append({
            "title": item["article"],
            "views": item["views"],
            "rank": item["rank"],
            "date": date_str,
            "retrieved_at": retrieved_at
        })

    key = f"raw-views/raw-views-{date_str}.json"

    body = "\n".join(json.dumps(r) for r in records)

    s3.put_object(
        Bucket=BUCKET,
        Key=key,
        Body=body.encode("utf-8")
    )

    print(f"Uploaded s3://{BUCKET}/{key}")


# =====================
# RUN FOR 3 DAYS
# =====================
for i in range(N_DAYS):
    fetch_and_upload(BASE_DATE - timedelta(days=i))


Fetching 2025-11-20


ClientError: An error occurred (InvalidAccessKeyId) when calling the PutObject operation: The AWS Access Key Id you provided does not exist in our records.

## Explore the Response Structure

The article data is inside `items[0].articles`.

In [None]:
items = raw.get("items", [])
len(items), items[0].keys() if items else None


In [None]:
articles = items[0].get("articles", []) if items else []
len(articles), articles[0].keys() if articles else None


## Transform: Convert to JSON Lines

Required fields:
- `title`
- `views`
- `rank`
- `date`
- `retrieved_at`

In [None]:
retrieved_at = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z"

records = []
for a in articles:
    # In this endpoint, the title key is usually called 'article'
    title = a.get("article")
    views = a.get("views")
    rank = a.get("rank")

    # Skip any records missing the essentials
    if title is None or views is None or rank is None:
        continue

    records.append({
        "title": title,
        "views": int(views),
        "rank": int(rank),
        "date": date.isoformat(),
        "retrieved_at": retrieved_at,
    })

# Preview a few
records[:5]


In [None]:
# Convert to JSON Lines string (one JSON object per line)
json_lines = "\n".join(json.dumps(r, ensure_ascii=False) for r in records)

print("Lines:", len(records))
print("First line:", json_lines.splitlines()[0][:200])


## Load: Upload JSON Lines to S3

Target key format:
`raw-views/raw-views-YYYY-MM-DD.json`

In [None]:
s3 = boto3.client("s3")

s3_key = f"raw-views/raw-views-{date.strftime('%Y-%m-%d')}.json"

s3.put_object(
    Bucket=S3_WIKI_BUCKET,
    Key=s3_key,
    Body=json_lines.encode("utf-8"),
    ContentType="application/json",
)

print(f"Uploaded to s3://{S3_WIKI_BUCKET}/{s3_key}")


## Test: Confirm the File Exists in S3

In [None]:
try:
    s3.head_object(Bucket=S3_WIKI_BUCKET, Key=s3_key)
    print(f"✅ File found at s3://{S3_WIKI_BUCKET}/{s3_key}")
except Exception as e:
    print(f"❌ File not found at s3://{S3_WIKI_BUCKET}/{s3_key}")
    raise


## What to do next (required)

Re-execute this notebook **two more times**, each time changing `DATE_PARAM` to a different day, so you upload **three different files** total to S3:

- `raw-views/raw-views-YYYY-MM-DD.json`
- `raw-views/raw-views-YYYY-MM-DD.json`
- `raw-views/raw-views-YYYY-MM-DD.json`

(Three distinct dates.)