In [None]:
BUCKET_NAME = "ir-sagi-bucket"

#!gsutil -m rm -r gs://{BUCKET_NAME}/postings_gcp || true
#!gsutil -m rm -r gs://{BUCKET_NAME}/pr || true
#!gsutil -m rm -r gs://{BUCKET_NAME}/notebooks || true
#!gsutil -m rm -r gs://{BUCKET_NAME}/ir_project_indexes || true

#!gsutil -m mkdir gs://{BUCKET_NAME}/ir_project_indexes || true


In [None]:
!gcloud dataproc clusters list --region us-central1
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes || true
!ls -l /usr/lib/spark/jars/graph*


In [None]:
import pyspark
import hashlib
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
from google.cloud import storage

nltk.download('stopwords')

from nltk.corpus import stopwords

words = stopwords.words('english')

with open("stopwords_en.txt", "w", encoding="utf-8") as f:
    for w in words:
        f.write(w + "\n")

!gsutil cp stopwords_en.txt gs://ir-sagi-bucket/ir_project_indexes/stopwords_en.txt




In [None]:

def _hash(s: str) -> str:
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = frozenset([
    "category", "references", "also", "external", "links",
    "may", "first", "see", "history", "people", "one", "two",
    "part", "thumb", "including", "second", "following",
    "many", "however", "would", "became"
])
all_stopwords = english_stopwords.union(corpus_stopwords)

def tokenize(text: str):
    tokens = [m.group() for m in RE_WORD.finditer(text.lower())]
    return [t for t in tokens if t not in all_stopwords]


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:

bucket_name = 'ir-sagi-bucket'
full_path = f"gs://{bucket_name}/"
paths = []

client = storage.Client()
blobs = client.list_blobs(bucket_name)

for b in blobs:
    if b.name == 'graphframes.sh':
        continue
    if b.name.startswith('ir_project_indexes/'):
        continue

    paths.append(full_path + b.name)

print("num files =", len(paths))



In [None]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

In [None]:
# Count number of wiki pages
N_DOCS = parquetFile.count()
print(N_DOCS)

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

**END OF SETUP**

**avgdl_body**

In [None]:
bucket_name = BUCKET_NAME
BASE_GCS_DIR = "ir_project_indexes"
BODY_BASE_DIR = "ir_project_indexes/body_index"

# =========================
# Build AVGDL (single file) on GCS
# =========================

import tempfile, os, glob, subprocess

# doc_text_pairs: RDD of (text, doc_id)

dl_rdd = doc_text_pairs.map(lambda x: len(tokenize(x[0])))

total_dl = dl_rdd.map(int).sum()
doc_cnt  = dl_rdd.count()
avgdl = total_dl / float(doc_cnt) if doc_cnt else 0.0

print("DOC_COUNT =", doc_cnt)
print("TOTAL_DL  =", total_dl)
print("AVGDL     =", avgdl)

tmp_dir = tempfile.mkdtemp(prefix="avgdl_")
local_avgdl = os.path.join(tmp_dir, "avgdl_body.txt")

with open(local_avgdl, "w") as f:
    f.write(f"{avgdl}\n")

AVGDL_GCS_PATH = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/avgdl_body.txt"
subprocess.check_call(["gsutil", "cp", local_avgdl, AVGDL_GCS_PATH])

print("Uploaded AVGDL to:", AVGDL_GCS_PATH)


**body_index**





In [None]:
bucket_name = BUCKET_NAME
BASE_GCS_DIR = "ir_project_indexes"
BODY_BASE_DIR = "ir_project_indexes/body_index"


In [None]:
# generate and write the dl_body (dict of doc_id -> length), and avgdl (average document length)
#client = storage.Client()
#bucket = client.bucket(BUCKET_NAME)

# create tuples of (doc_id, length)
#doc_len_rdd = doc_text_pairs.map(lambda x: (x[1], len(tokenize(x[0]))))
# dictionary doc_id -> length
#dl_body = dict(doc_len_rdd.collect())
#avgdl_body = sum(dl_body.values()) /  (len(dl_body))

#with bucket.blob(f"{BASE_GCS_DIR}/dl_body.pkl").open("wb") as f:
 #   pickle.dump(dl_body, f)

#with bucket.blob(f"{BASE_GCS_DIR}/avgdl_body.pkl").open("wb") as f:
 #   pickle.dump(avgdl_body, f)

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token), 16) % NUM_BUCKETS

In [None]:
from collections import Counter
from operator import add

def word_count(text, doc_id):
    tokens = tokenize(text)
    if not tokens:
        return []
    tf_dict = Counter(tokens)
    return [(token, (doc_id, tf)) for token, tf in tf_dict.items()]

def reduce_word_counts(unsorted_pl):
    # sort the list by wiki_id
    return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
    # postings is a rdd of (token, posting_list) pair. to know what is the token's df we take the length of its posting list.
    return postings.map(lambda x: (x[0], len(x[1])))

def partition_postings_and_write(postings, relevant_base_dir):
    # we get an RDD where each item is a (w, posting_list) pair.
    # first: calculate the appropriate bucket for each w and group by the bucked id.
    # then: each rdd element is (bucket_id, iterator of (w, posting_list) that is allocated to the bucket).
    # now: make the iterator to a list, and write each bucket's posting lists to disk by bucket id using the function.
    return postings.map(lambda x: (token2bucket_id(x[0]), x)).groupByKey().map(lambda bucket_and_its_posting_lists: InvertedIndex.write_a_posting_list(bucket_and_its_posting_lists, relevant_base_dir, bucket_name))

In [None]:
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write outs
_ = partition_postings_and_write(postings_filtered, BODY_BASE_DIR).collect()


In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=BODY_BASE_DIR):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'body')
# upload to gs
index_src = "body.pkl"
index_dst = f'gs://{bucket_name}/{BODY_BASE_DIR}/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
TITLE_BASE_DIR = "ir_project_indexes/title_index"
doc_title_pairs = parquetFile.select("title", "id").rdd
#check:
doc_title_pairs.take(3)

**doc-id -> doc length mapping by tsv + idx**:

In [None]:
# =========================
# Build dl_body.tsv parts (sorted by id) on GCS
# =========================
from pyspark.sql import functions as F

DL_TMP_DIR  = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/dl_body_tsv_parts"
DL_TSV_GCS  = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/dl_body.tsv"
DL_IDX_GCS  = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/dl_body.idx"

# doc_text_pairs: RDD of (text, id)  (כמו שיש לך כבר במחברת)
dl_rdd = doc_text_pairs.map(lambda x: (int(x[1]), int(len(tokenize(x[0])))))
dl_df = spark.createDataFrame(dl_rdd, ["id", "dl"])

dl_sorted = dl_df.sort("id")
dl_sorted.selectExpr("concat(cast(id as string), '\t', cast(dl as string)) as line").write.mode("overwrite").text(DL_TMP_DIR)

print("Wrote DL TSV parts to:", DL_TMP_DIR)


In [None]:
import os, glob, struct, subprocess, tempfile

LOCAL_DIR = tempfile.mkdtemp(prefix="dl_body_build_")
LOCAL_TSV = os.path.join(LOCAL_DIR, "dl_body.tsv")
LOCAL_IDX = os.path.join(LOCAL_DIR, "dl_body.idx")

subprocess.check_call(["gsutil", "-m", "cp", f"{DL_TMP_DIR}/part-*", LOCAL_DIR])
parts = sorted(glob.glob(os.path.join(LOCAL_DIR, "part-*")))

with open(LOCAL_TSV, "wb") as out:
    for p in parts:
        with open(p, "rb") as f:
            for line in f:
                out.write(line.rstrip(b"\n") + b"\n")

REC = struct.Struct("<I Q I")  # uint32 doc_id | uint64 offset | uint32 length

with open(LOCAL_TSV, "rb") as f_tsv, open(LOCAL_IDX, "wb") as f_idx:
    offset = 0
    for line in f_tsv:
        length = len(line)
        tab = line.find(b"\t")
        if tab < 0:
            offset += length
            continue
        doc_id = int(line[:tab])
        f_idx.write(REC.pack(doc_id, offset, length))
        offset += length

print("Local DL TSV/IDX:", LOCAL_TSV, LOCAL_IDX)

subprocess.check_call(["gsutil", "cp", LOCAL_TSV, DL_TSV_GCS])
subprocess.check_call(["gsutil", "cp", LOCAL_IDX, DL_IDX_GCS])

print("Uploaded:", DL_TSV_GCS, DL_IDX_GCS)


**doc_id -> doc norm : tsv + idx**:

In [None]:
# RE GENERATE FOR THAT: (WITHOUT RUNNING BODY INDEX):
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)

In [None]:
import math
from operator import add
from pyspark.sql import functions as F

N_DOCS = parquetFile.count()  # מספר מסמכים כולל (כמו DOC_COUNT בשרת)

NORMS_TMP_DIR = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/docnorms_body_tsv_parts"
NORMS_TSV_GCS = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/docnorms_body.tsv"
NORMS_IDX_GCS = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/docnorms_body.idx"

def term_to_doc_squares(term_and_pl):
    term, pl = term_and_pl
    df = len(pl)
    if df <= 0:
        return []
    # תואם לשרת אם הוא משתמש log(N/df) (או תחליף ל-smoothing אם תרצה)
    idf = math.log(N_DOCS / df)
    out = []
    for doc_id, tf in pl:
        if tf <= 0:
            continue
        w = (1.0 + math.log(tf)) * idf
        out.append((int(doc_id), float(w*w)))
    return out

doc_sumsq = postings_filtered.flatMap(term_to_doc_squares).reduceByKey(add)
doc_norms = doc_sumsq.mapValues(lambda s: math.sqrt(s))

norms_df = spark.createDataFrame(doc_norms, ["id", "norm"]).sort("id")
norms_df.selectExpr("concat(cast(id as string), '\t', cast(norm as string)) as line") \
        .write.mode("overwrite").text(NORMS_TMP_DIR)

print("Wrote NORMS TSV parts to:", NORMS_TMP_DIR)


In [None]:
import os, glob, struct, subprocess, tempfile

LOCAL_DIR = tempfile.mkdtemp(prefix="docnorms_body_build_")
LOCAL_TSV = os.path.join(LOCAL_DIR, "docnorms_body.tsv")
LOCAL_IDX = os.path.join(LOCAL_DIR, "docnorms_body.idx")

subprocess.check_call(["gsutil", "-m", "cp", f"{NORMS_TMP_DIR}/part-*", LOCAL_DIR])
parts = sorted(glob.glob(os.path.join(LOCAL_DIR, "part-*")))

with open(LOCAL_TSV, "wb") as out:
    for p in parts:
        with open(p, "rb") as f:
            for line in f:
                out.write(line.rstrip(b"\n") + b"\n")

REC = struct.Struct("<I Q I")

with open(LOCAL_TSV, "rb") as f_tsv, open(LOCAL_IDX, "wb") as f_idx:
    offset = 0
    for line in f_tsv:
        length = len(line)
        tab = line.find(b"\t")
        if tab < 0:
            offset += length
            continue
        doc_id = int(line[:tab])
        f_idx.write(REC.pack(doc_id, offset, length))
        offset += length

subprocess.check_call(["gsutil", "cp", LOCAL_TSV, NORMS_TSV_GCS])
subprocess.check_call(["gsutil", "cp", LOCAL_IDX, NORMS_IDX_GCS])

print("Uploaded:", NORMS_TSV_GCS, NORMS_IDX_GCS)


**Generate doc_id - > title mapping without exceeding**


In [None]:
#titles.pkl
#titles = dict(doc_title_pairs.map(lambda x: (x[1], x[0])).collect())

#with bucket.blob(f"{BASE_GCS_DIR}/titles.pkl").open("wb") as f:
 #   pickle.dump(titles, f)


In [None]:
# Key Idea: The TSV file contains all document IDs and titles written sequentially.
# The index maps each document ID to its exact byte offset in the TSV file.
# After a binary search in the index, the title is read directly in O(1).

# Build titles.tsv (sorted by id) as text files on GCS
from pyspark.sql import functions as F

# Temporary directory in GCS where Spark will write many part-files
TITLES_TMP_DIR = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/titles_tsv_parts"
# Final output locations in GCS
TITLES_TSV_GCS = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/titles.tsv"
TITLES_IDX_GCS = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/titles.idx"

# Select only document id and title from the parquet file
titles_df = parquetFile.select(F.col("id").cast("long").alias("id"), F.col("title").cast("string").alias("title"))

# Sort by document id so the index can be binary-searched later
titles_sorted = titles_df.sort("id")

# Create one TSV line per document: "id<TAB>title"
titles_sorted.selectExpr("concat(cast(id as string), '\t', title) as line").write.mode("overwrite").text(TITLES_TMP_DIR)

print("Wrote TSV parts to:", TITLES_TMP_DIR)


In [None]:
import os, glob, struct, subprocess, tempfile

# Create a temporary local directory on the Spark driver
LOCAL_DIR = tempfile.mkdtemp(prefix="titles_build_")
# Local paths for the merged TSV and the binary index
LOCAL_TSV = os.path.join(LOCAL_DIR, "titles.tsv")
LOCAL_IDX = os.path.join(LOCAL_DIR, "titles.idx")

# Download all Spark part-files from GCS to the local driver disk
subprocess.check_call(["gsutil", "-m", "cp", f"{TITLES_TMP_DIR}/part-*", LOCAL_DIR])

# Collect and sort all part files by filename
parts = sorted(glob.glob(os.path.join(LOCAL_DIR, "part-*")))

# Merge all part-files into one TSV file
with open(LOCAL_TSV, "wb") as out:
    for p in parts:
        with open(p, "rb") as f:
            for line in f:
                out.write(line.rstrip(b"\n") + b"\n")

# Define binary index record format:
# uint32 doc_id | uint64 offset | uint32 length  (16 bytes total)
REC = struct.Struct("<I Q I")

# Build the index by scanning the TSV sequentially
with open(LOCAL_TSV, "rb") as f_tsv, open(LOCAL_IDX, "wb") as f_idx:
  # current byte offset in the TSV file
    offset = 0
    for line in f_tsv:
        length = len(line)
        tab = line.find(b"\t")
        if tab < 0:
            offset += length
            continue
        doc_id = int(line[:tab])
        f_idx.write(REC.pack(doc_id, offset, length))
        # Move offset to the start of the next line
        offset += length

print("Local TSV/IDX:", LOCAL_TSV, LOCAL_IDX)

# Upload the merged TSV file to GCS
subprocess.check_call(["gsutil", "cp", LOCAL_TSV, TITLES_TSV_GCS])
# Upload the binary index file to GCS
subprocess.check_call(["gsutil", "cp", LOCAL_IDX, TITLES_IDX_GCS])

print("Uploaded:", TITLES_TSV_GCS, TITLES_IDX_GCS)


In [None]:
# check
prev = -1
bad_at = None
with open(LOCAL_IDX, "rb") as f:
    while True:
        b = f.read(REC.size)
        if not b:
            break
        doc_id, off, ln = REC.unpack(b)
        if doc_id < prev:
            bad_at = (prev, doc_id)
            break
        prev = doc_id

print("OK sorted" if bad_at is None else f"NOT sorted: {bad_at[0]} -> {bad_at[1]}")


**title index**

In [None]:
# titles index

# word counts map
word_counts = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

postings_filtered = postings.filter(lambda x: len(x[1]) > 50)
w2df_dict = calculate_df(postings_filtered).collectAsMap()

# write postings
_ = partition_postings_and_write(postings_filtered, TITLE_BASE_DIR).collect()

# merge posting_locs
from collections import defaultdict
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=TITLE_BASE_DIR):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)

inverted = InvertedIndex()
inverted.posting_locs = super_posting_locs
inverted.df = w2df_dict

inverted.write_index('.', 'title')
!gsutil cp title.pkl gs://{bucket_name}/{TITLE_BASE_DIR}/title.pkl



**ANCHOR INDEX**

In [None]:
ANCHOR_BASE_DIR = "ir_project_indexes/anchor_index"


In [None]:
# anchor_index = term -> [(doc_id, tf)]. doc_id is the id of each doc that has been reached with a link that the link's text contains the term.
from pyspark.sql.functions import col
from pyspark.sql import Row

# pages:(source_id, anchor_array), anchor_array = array of structs, each struct:
# id: long, text: string. id in the struct is target id.
pages = parquetFile.select("id", "anchor_text").rdd.map(lambda r: (r["id"], r["anchor_text"]))

# create RDD of (text, target_id) for each anchor_text in anchor_array of specific source doc.
# dont care about source_id
doc_anchor_pairs = (pages.flatMap(lambda row: [] if row[1] is None else [(a["text"], a["id"]) for a in row[1] if a and a["text"] and a["id"] is not None]))



In [None]:
# word counts map
word_counts = doc_anchor_pairs.flatMap(lambda x: word_count(x[0], x[1]))

# postings: token -> [(doc_id, tf), ...]
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

# filtering postings and calculate df (כמו A3)
postings_filtered = postings.filter(lambda x: len(x[1]) > 50)

w2df_dict = calculate_df(postings_filtered).collectAsMap()

# write postings bins + posting_locs pickles
_ = partition_postings_and_write(postings_filtered, ANCHOR_BASE_DIR).collect()


In [None]:
client = storage.Client()
super_posting_locs = defaultdict(list)

for blob in client.list_blobs(bucket_name, prefix=ANCHOR_BASE_DIR):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)

# Create inverted index instance
inverted = InvertedIndex()
inverted.posting_locs = super_posting_locs
inverted.df = w2df_dict

# write the global stats out
inverted.write_index('.', 'anchor')

# upload to gs
!gsutil cp anchor.pkl gs://{bucket_name}/{ANCHOR_BASE_DIR}/anchor.pkl


**PageRank**

In [None]:
def generate_graph(pages):

    # the vertices are all the articles who point to other + those who are only pointed by others.
    # we select to represent by a string tuple that contains the article_id only. that because of the dataframe creation later.
    # take the articles i'ds for those who point to other
    vertices = pages.map(lambda row: row[0])
    # each row originally has a arcticle_id and a list of (target_id, text). need to create (article_id, target_id) tuples for all.
    # for each (target_id, text) in the list we take the target id and ceate (article_id, target_id) tuple.
    # distinct takes only different.
    edges = pages.flatMap(lambda row: set([(row[0], target_id) for target_id, _ in row[1]]))
    # add the articles id's for those who are only pointed by others
    vertices = vertices.union(edges.map(lambda edge_expression: edge_expression[1]))
    # distinct takes only different.
    vertices = vertices.distinct()
    vertices = vertices.map(lambda x: (x,))
    return edges, vertices

pages_links = spark.read.parquet("gs://ir-sagi-bucket/multistream*").select("id", "anchor_text").rdd
# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr = g.pageRank(resetProbability=0.15, maxIter=6)


**PageRank.tsv and PageRank.idx**

In [None]:
from pyspark.sql import functions as F

BASE_GCS_DIR = "ir_project_indexes"
PR_TMP_DIR   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pagerank_tsv_parts"
PR_TSV_GCS   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pagerank.tsv"
PR_IDX_GCS   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pagerank.idx"

# (id, pagerank) + sort by id
pr_df = pr.vertices.select(F.col("id").cast("long").alias("id"),F.col("pagerank").cast("double").alias("pagerank")).sort("id")

# Write "id<TAB>pagerank" as text parts
(pr_df.selectExpr("concat(cast(id as string), '\t', cast(pagerank as string)) as line").write.mode("overwrite").text(PR_TMP_DIR))

print("Wrote PageRank TSV parts to:", PR_TMP_DIR)
print("Will upload final TSV/IDX to:", PR_TSV_GCS, PR_IDX_GCS)


In [None]:
import os, glob, struct, subprocess, tempfile

LOCAL_DIR = tempfile.mkdtemp(prefix="pagerank_build_")
LOCAL_TSV = os.path.join(LOCAL_DIR, "pagerank.tsv")
LOCAL_IDX = os.path.join(LOCAL_DIR, "pagerank.idx")

# Download all part files
subprocess.check_call(["gsutil", "-m", "cp", f"{PR_TMP_DIR}/part-*", LOCAL_DIR])
parts = sorted(glob.glob(os.path.join(LOCAL_DIR, "part-*")))

# Merge parts into one TSV (preserves order if Spark global sort is used)
with open(LOCAL_TSV, "wb") as out:
    for p in parts:
        with open(p, "rb") as f:
            for line in f:
                out.write(line.rstrip(b"\n") + b"\n")

# Index record: uint32 doc_id | uint64 offset | uint32 length  (16 bytes)
REC = struct.Struct("<I Q I")

# Build idx by scanning TSV sequentially
with open(LOCAL_TSV, "rb") as f_tsv, open(LOCAL_IDX, "wb") as f_idx:
    offset = 0
    for line in f_tsv:
        length = len(line)
        tab = line.find(b"\t")
        if tab < 0:
            offset += length
            continue
        doc_id = int(line[:tab])  # assumes id fits uint32 (like Wikipedia ids)
        f_idx.write(REC.pack(doc_id, offset, length))
        offset += length

print("Local TSV/IDX:", LOCAL_TSV, LOCAL_IDX)

# Upload final files
subprocess.check_call(["gsutil", "cp", LOCAL_TSV, PR_TSV_GCS])
subprocess.check_call(["gsutil", "cp", LOCAL_IDX, PR_IDX_GCS])

print("Uploaded:", PR_TSV_GCS, PR_IDX_GCS)


**pageviews.pkl**

In [None]:
# just as described in assignment 1 + write to TSV and than create idx
pv_path = "https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2"
p = Path(pv_path)
pv_name = p.name
pv_temp = f"{p.stem}-4dedup.txt"

# download
!wget -N $pv_path

# filter english + keep article_id + views (fields 3 and 5) + remove lines with article id or page view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp

# counter creation and save

#wid2pv = Counter()
#with open(pv_temp, "rt") as f:
 #   for line in f:
  #      parts = line.split()
   #     wid2pv.update({int(parts[0]): int(parts[1])})

BASE_GCS_DIR = "ir_project_indexes"
PV_TMP_DIR   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pageviews_tsv_parts"
PV_TSV_GCS   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pageviews.tsv"
PV_IDX_GCS   = f"gs://{BUCKET_NAME}/{BASE_GCS_DIR}/pageviews.idx"




In [None]:
from pyspark.sql import functions as F

lines = spark.read.text(pv_temp)

pv_df = (lines.select(F.split(F.col("value"), r"\s+").alias("parts")).filter(F.size("parts") == 2)
    .select(F.col("parts").getItem(0).cast("long").alias("id"),F.col("parts").getItem(1).cast("long").alias("views")))

pv_agg = pv_df.groupBy("id").agg(F.sum("views").alias("views")).sort("id")

pv_agg.selectExpr("concat(cast(id as string), '\t', cast(views as string)) as line").write.mode("overwrite").text(PV_TMP_DIR)

print("Wrote PageViews TSV parts to:", PV_TMP_DIR)
print("Will upload final TSV/IDX to:", PV_TSV_GCS, PV_IDX_GCS)


In [None]:
import os, glob, struct, subprocess, tempfile
from pathlib import Path


LOCAL_DIR = tempfile.mkdtemp(prefix="pageviews_build_")
LOCAL_TSV = os.path.join(LOCAL_DIR, "pageviews.tsv")
LOCAL_IDX = os.path.join(LOCAL_DIR, "pageviews.idx")

subprocess.check_call(["gsutil", "-m", "cp", f"{PV_TMP_DIR}/part-*", LOCAL_DIR])
parts = sorted(glob.glob(os.path.join(LOCAL_DIR, "part-*")))

with open(LOCAL_TSV, "wb") as out:
    for p in parts:
        with open(p, "rb") as f:
            for line in f:
                out.write(line.rstrip(b"\n") + b"\n")

# IDX record: uint32 doc_id | uint64 offset | uint32 length
REC = struct.Struct("<I Q I")

with open(LOCAL_TSV, "rb") as f_tsv, open(LOCAL_IDX, "wb") as f_idx:
    offset = 0
    for line in f_tsv:
        length = len(line)
        tab = line.find(b"\t")
        if tab < 0:
            offset += length
            continue
        doc_id = int(line[:tab])
        f_idx.write(REC.pack(doc_id, offset, length))
        offset += length

subprocess.check_call(["gsutil", "cp", LOCAL_TSV, PV_TSV_GCS])
subprocess.check_call(["gsutil", "cp", LOCAL_IDX, PV_IDX_GCS])
print("Uploaded:", PV_TSV_GCS, PV_IDX_GCS)


In [None]:
#check:
prev = -1
bad_at = None
line_no = 0

with open(LOCAL_TSV, "rb") as f:
    for line in f:
        line_no += 1
        tab = line.find(b"\t")
        if tab < 0:
            continue
        doc_id = int(line[:tab])
        if doc_id < prev:
            bad_at = (prev, doc_id, line_no)
            break
        prev = doc_id

print("OK sorted" if bad_at is None else f"NOT sorted at line {bad_at[2]}: {bad_at[0]} -> {bad_at[1]}")

