# Wikipedia Page Views Data Pipeline - Solutions

This notebook demonstrates a simple ETL pipeline that:
1. Extracts page view data from the Wikipedia REST API
2. Transforms it to JSON Lines format
3. Uploads directly to S3

## Choose Your Username

Pick a unique username (e.g., your name or initials) and use it consistently:

- **S3 Bucket**: `<username>-wikidata` (e.g., `johndoe-wikidata`)
- **Athena Database**: `<username>` (e.g., `johndoe`)
- **Lambda**: Use the same `<username>-wikidata` bucket

**Important:** No hyphens in database names! Use underscores if needed (e.g., `john_doe`).

In [None]:
# Set your username here - use it consistently across all resources
USERNAME = "zoltanctothceu"

## Setup and Imports

In [None]:
import datetime
import json

import boto3
import requests

## Extract: Retrieve Data from Wikipedia Page Views API

We use the Wikimedia Analytics API to fetch the most viewed pages for a specific date.

**API Documentation:** https://doc.wikimedia.org/generated-data-platform/aqs/analytics-api/reference/page-views.html

In [None]:
# Try different dates to see how the data changes
DATE_PARAM = "2025-11-17"

date = datetime.datetime.strptime(DATE_PARAM, "%Y-%m-%d")

# Construct the API URL for top viewed pages
url = f"https://wikimedia.org/api/rest_v1/metrics/pageviews/top/en.wikipedia/all-access/{date.strftime('%Y/%m/%d')}"
print(f"Requesting REST API URL: {url}")

# Make the API request
wiki_server_response = requests.get(url, headers={"User-Agent": "curl/7.68.0"})
wiki_response_status = wiki_server_response.status_code
wiki_response_body = wiki_server_response.text

print(f"Wikipedia REST API Response body: {wiki_response_body[:500]}...")
print(f"Wikipedia REST API Response Code: {wiki_response_status}")

# Validate response
if wiki_response_status != 200:
    raise Exception(f"Received non-OK status code from Wiki Server: {wiki_response_status}")
print(f"Successfully retrieved Wikipedia data, content-length: {len(wiki_response_body)}")

## Transform: Process Raw Data into JSON Lines

Convert the raw API response into a structured JSON Lines format suitable for analytics. Each line is a valid JSON object representing one page's view statistics.

In [None]:
# Parse the API response and extract top views
wiki_response_parsed = wiki_server_response.json()
top_views = wiki_response_parsed["items"][0]["articles"]

# Transform to JSON Lines format
current_time = datetime.datetime.now(datetime.timezone.utc)
json_lines = ""
for page in top_views:
    record = {
        "title": page["article"],
        "views": page["views"],
        "rank": page["rank"],
        "date": date.strftime("%Y-%m-%d"),
        "retrieved_at": current_time.replace(tzinfo=None).isoformat(),
    }
    json_lines += json.dumps(record) + "\n"

print(f"Transformed {len(top_views)} records to JSON Lines")
print(f"First few lines:\n{json_lines[:500]}...")

---
## Upload to S3

Upload the JSON Lines data directly to S3.

In [None]:
S3_WIKI_BUCKET = f"{USERNAME}-wikidata"
s3 = boto3.client("s3")

# Upload json_lines directly to S3
s3_key = f"raw-views/raw-views-{date.strftime('%Y-%m-%d')}.json"
s3.put_object(
    Bucket=S3_WIKI_BUCKET,
    Key=s3_key,
    Body=json_lines,
)
print(f"Uploaded {len(top_views)} records to s3://{S3_WIKI_BUCKET}/{s3_key}")

In [None]:
# Verify upload
try:
    s3.head_object(Bucket=S3_WIKI_BUCKET, Key=s3_key)
    print(f"File uploaded successfully to s3://{S3_WIKI_BUCKET}/{s3_key}")
except Exception as e:
    print(f"File not found at s3://{S3_WIKI_BUCKET}/{s3_key}")
    raise