In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=e208d670c8412500c634abe8f5320875bb799cdc77bfe64d48790da78e5f6d5d
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName("IR-Inverted-index").config("spark.executor.memory", "1g").config("spark.executor.cores", 2).getOrCreate()

In [5]:
import zipfile

In [6]:
zip_file_path = "/content/data2.zip"
extract_folder = "/content/extracted_files/"

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_folder)

In [8]:
bigram_rdd = spark.sparkContext.wholeTextFiles("/content/extracted_files/data/devdata/*.txt")

In [9]:
bigram_df = bigram_rdd.toDF()

In [10]:
bigram_df.show(5)

+--------------------+--------------------+
|                  _1|                  _2|
+--------------------+--------------------+
|file:/content/ext...|5722018101\tEvery...|
|file:/content/ext...|5722018235\tThen ...|
|file:/content/ext...|5722018301\tKevin...|
|file:/content/ext...|5722018496\t"It i...|
|file:/content/ext...|5722018508\tQuali...|
+--------------------+--------------------+



In [12]:
from pyspark.sql.functions import col, regexp_replace, lower,expr,explode, split,concat

In [13]:
bigram_df = bigram_df.withColumn("text_cleaned", regexp_replace("_2", "(?<=\t)", ""))

In [14]:
bigram_df = bigram_df.withColumn("doc_id", expr("split(text_cleaned, '\t')[0]"))

bigram_df = bigram_df.withColumn("final_text", expr("lower(regexp_replace(trim(substring(text_cleaned, length(split(text_cleaned, '\t')[0])+2)), '[^a-zA-Z\\s]', ' '))"))

In [15]:
bigram_df.show(5)

+--------------------+--------------------+--------------------+----------+--------------------+
|                  _1|                  _2|        text_cleaned|    doc_id|          final_text|
+--------------------+--------------------+--------------------+----------+--------------------+
|file:/content/ext...|5722018101\tEvery...|5722018101\tEvery...|5722018101|every person has ...|
|file:/content/ext...|5722018235\tThen ...|5722018235\tThen ...|5722018235|then jon falls fo...|
|file:/content/ext...|5722018301\tKevin...|5722018301\tKevin...|5722018301|kevin  take a loo...|
|file:/content/ext...|5722018496\t"It i...|5722018496\t"It i...|5722018496| it is an area wi...|
|file:/content/ext...|5722018508\tQuali...|5722018508\tQuali...|5722018508|qualified candida...|
+--------------------+--------------------+--------------------+----------+--------------------+



In [16]:
from pyspark.sql.functions import col, explode, lag, concat_ws, collect_list
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [17]:
words_df = bigram_df.select("doc_id", F.explode(F.split(F.col("final_text"), " ")).alias("word"))
words_df = words_df.filter(F.col("word") != "")

window = Window.partitionBy("doc_id").orderBy("doc_id")
words_df = words_df.withColumn("prev_word", F.lag("word").over(window))

words_df = words_df.filter(F.col("prev_word").isNotNull())

In [18]:
bigram_index = (
    words_df
    .groupby("prev_word", "word", "doc_id")
    .agg(F.count("doc_id").alias("doc_id_count"))
)

bigram_index_formatted = bigram_index.groupBy("prev_word", "word").agg(F.collect_list(F.concat_ws(':', F.col("doc_id"), F.col("doc_id_count"))).alias("doc_ids"))

output_bigram = bigram_index_formatted.withColumn("bigram_data", F.concat_ws(' ', F.col("prev_word"), F.col("word"), F.expr("concat_ws(' ', doc_ids)")))

output_bigram.select("bigram_data").repartition(1).write.mode("overwrite").text("output_bigram")

In [19]:
final_result = output_bigram.select("bigram_data")

In [20]:
from pyspark.sql.functions import col

final_result = output_bigram.select("bigram_data")

phrases = ["computer science", "information retrieval", "power politics", "los angeles", "bruce willis"]

condition = col("bigram_data").rlike("|".join(phrases))

filtered_df = final_result.filter(condition)

rdd = filtered_df.rdd.map(lambda row: row[0])

output_path = "selected_bigram_index.txt"

rdd.coalesce(1).saveAsTextFile(output_path)