In [1]:
# Do an initial test of Spark to make sure it works.
import findspark
findspark.init()
import pyspark

In [2]:
pyspark.__version__

'3.5.0'

In [3]:
import os
import re

path = 'data2/data/devdata/'
files = os.listdir(path)
files = [path+f for f in files]
#print(files)

In [4]:
#Removing all \t from the files
for f in files:
    #print(f)
    lines = ''
    with open(f, 'r', encoding="utf8") as file:
        lines = file.readline()
        #print(lines)
        lines = re.sub(r'\t', ' ', lines)
        lines = lines.split()[0] + "\t" + ' '.join(lines.split()[1:])
        file.close()
    with open(f, 'w', encoding="utf8") as file:
        file.write(lines)
        file.close()

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, col, concat_ws, collect_list
from pyspark.sql.types import StringType, ArrayType, StructType, StructField

# Create a Spark session
spark = SparkSession.builder.appName("InvertedIndex").getOrCreate()

In [6]:
spark

In [7]:
# Specify the bigram words
bigram_words = ["computer science", "information retrieval", "power politics", "los angeles", "bruce willis"]

# Read data
data = spark.read.text(files[:]).rdd.map(lambda x: x.value)
print("Data read successfully")

# Define a function to clean and tokenize the text
def clean_and_tokenize(text):
    # Replacing \t, special characters, and numerals with space
    processed_text = re.sub(r'\t', ' ', text)    
    processed_text = re.sub(r'[^a-zA-Z]+', ' ', processed_text)

    processed_text = processed_text.lower()
    processed_text = re.sub(r'\s+', ' ', processed_text)
    words = processed_text.split()

    return words

# Define a UDF for the cleaning and tokenizing function
clean_and_tokenize_udf = udf(clean_and_tokenize, ArrayType(StringType()))

# Define a UDF to extract bigrams
def extract_bigrams(words):
    return [" ".join(bigram) for bigram in zip(words[:-1], words[1:])]

extract_bigrams_udf = udf(extract_bigrams, ArrayType(StringType()))

# Split the data into docId and text
split_data = data.map(lambda x: x.split('\t')).map(lambda x: (x[0], x[1]))

# Creating a DataFrame with the defined schema
schema = StructType([StructField("docId", StringType(), True), StructField("text", StringType(), True)])
df = spark.createDataFrame(split_data, schema=schema)

# Applying the cleaning and tokenizing UDF to the 'text' column
df = df.withColumn("words", clean_and_tokenize_udf(df["text"]))

# Applying the bigram extraction UDF to the 'words' column
df = df.withColumn("bigrams", extract_bigrams_udf(df["words"]))
df_exploded = df.select("docId", "bigrams").withColumn("bigram", explode("bigrams"))

# Filtering only the specified bigram words
df_filtered = df_exploded.filter(col("bigram").isin(bigram_words))

# Group by bigram, docId, and count the occurrences
inverted_index = df_filtered.groupBy("bigram", "docId").count()
inverted_index = inverted_index.withColumn("count", col("count").cast("string"))
inverted_index = inverted_index.withColumn("output", concat_ws(": ", "docId", "count"))

# Group by bigram and collect list of docId: count values
result = inverted_index.groupBy("bigram").agg(collect_list("output").alias("output_list"))
#print(result.count())
#result.show(5, truncate=False)
#print(result.describe())

df_text = result.select(concat_ws(" ", *result.columns).alias("text"))
#df_text.show(5, truncate=False)
#print(df_text.describe())
# df_text.write.mode("overwrite").text("bigram_index.txt")

tmp = df_text.toPandas()
print(tmp.head())

Data read successfully
                                                text
0  computer science 5722018235: 1 5722018508: 1 5...
1  information retrieval 5722018235: 1 5722018508...
2           bruce willis 5722018235: 4 5722018301: 3
3  power politics 5722018235: 3 5722018508: 1 572...
4  los angeles 5722018235: 6 5722018508: 13 57220...


In [8]:
import csv
tmp.to_csv('selected_bigram_index.txt', sep=' ', index=False, header=False, quoting=3, escapechar=' ')

In [None]:
spark.stop()