In [1]:
from pyspark.sql.functions import udf, col, lower, regexp_replace, concat_ws, trim
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.types import ArrayType, StringType, IntegerType

file_location = "dbfs:///FileStore/tables/all-news/*.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_file = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location) \
  .select('id', 'title', 'content') \
  .na.drop()

# Delete punctuation
df_cleaned = df_file.select('id', (lower(regexp_replace('title', "[^a-zA-Z\\s]", " ")).alias('title')), \
                                  (lower(regexp_replace('content', "[^a-zA-Z\\s]", " ")).alias('content')))

df_cleaned = df_cleaned.select('id', (regexp_replace('title', "[!-~]?\\b[\\w]\\b[!-~]?", " ")).alias('title'), \
                                     (regexp_replace('content', "[!-~]?\\b[\\w]\\b[!-~]?", " ")).alias('content'))

df_cleaned = df_cleaned.select('id', (regexp_replace(trim(col('title')), " +", " ")).alias('title'), \
                                     (regexp_replace(trim(col('content')), " +", " ")).alias('content'))

# Tokenize title
title_tokenizer = Tokenizer(inputCol='title', outputCol='tokenized_title')
df_tokenized_title = title_tokenizer.transform(df_cleaned).select('id', 'tokenized_title', 'content')

# Remove stopwords from title
stopwords_title_remover = StopWordsRemover(inputCol='tokenized_title', outputCol='cleaned_title')
df_title_removed_stopwords = stopwords_title_remover.transform(df_tokenized_title).select('id', 'cleaned_title', 'content')

# Clean words whose lenght is less than 1
filter_length_udf = udf(lambda row: [x for x in row if len(x) > 1], ArrayType(StringType()))
df_final_title = df_title_removed_stopwords.withColumn('cleaned_title', filter_length_udf(col('cleaned_title')))

# Tokenize content
content_tokenizer = Tokenizer(inputCol='content', outputCol='tokenized_content')
df_tokenized_content = content_tokenizer.transform(df_final_title).select('id', 'cleaned_title', 'tokenized_content')

# Remove stopwords from content
stopwords_remover = StopWordsRemover(inputCol='tokenized_content', outputCol='cleaned_content')
df_removed_stopwords = stopwords_remover.transform(df_tokenized_content).select('id', 'cleaned_title', 'cleaned_content')

# Filter length in content
df_final = df_removed_stopwords.withColumn('cleaned_content', filter_length_udf(col('cleaned_content')))

# Make title and content strings and id an integer
df_final = df_final.withColumn('cleaned_title', concat_ws(" ", 'cleaned_title')) \
           .withColumn('cleaned_content', concat_ws(" ", 'cleaned_content')) \
           .withColumn('id', df_final['id'].cast(IntegerType())) \
           .select('id', col('cleaned_title').alias('title'), col('cleaned_content').alias('content')) 
           
#display(df_final)

In [2]:
articles_rdd = df_final.rdd.map(lambda x: (x['id'], x['title'], x['content']))
#print(articles_rdd.take(5))

In [3]:
# Inverted index

import itertools
import operator

def accumulate(l):
  it = itertools.groupby(l, operator.itemgetter(0))
  for key, subiter in it:
     yield key, sum(item[1] for item in subiter)


inverted_index_rdd = articles_rdd.flatMap(lambda line: [(word , (line[0], 1)) for word in (line[1] + " " + line[2]).split(" ")]) \
                                 .groupByKey() \
                                 .map(lambda word: (word[0], list(word[1]))) \
                                 .map(lambda lista: (lista[0], sorted(list(accumulate(lista[1])), key = lambda x: -x[1]))) \
                                 .cache()


#inverted_index_rdd.take(5)

In [4]:
# Online

new_df = df_file.withColumn('id', df_file['id'].cast(IntegerType())) \
                .select('id', 'title', 'content') 

file_rdd = new_df.rdd.map(lambda x: (x['id'], x['title']))
file_map = file_rdd.collectAsMap()

In [5]:
dbutils.widgets.text("word", "Please enter word to search")
dbutils.widgets.text("search", "Please enter id to search")

In [6]:
toSearch = str(dbutils.widgets.get("word"))
final_result = inverted_index_rdd.filter(lambda x, toSearch=toSearch: x[0] == toSearch) \
                                 .flatMap(lambda result: result[1])

final_result_list = final_result.collect()
#print(final_result_list)

In [7]:
def printing_result(): 
  cont = 0
  maximum = 5
  for i in final_result_list:
    if cont == maximum: break
    if i[0] != None:
      cont += 1
      yield i[1], list(((k, v) for k, v in file_map.items() if k == i[0]))

print(list(printing_result()))

In [8]:
from collections import defaultdict

def accumulate2(l):
  d = defaultdict(list)
  for k, *v in l:
    d[k].append(sum(v))
  for k in d.keys():
    yield k, len(d[k])

news_rdd = articles_rdd.flatMap(lambda line: [(line[0] , (word, 1)) for word in (line[1] + " " + line[2]).split(" ")]) \
                       .groupByKey() \
                       .map(lambda word: (word[0], list(word[1]))) \
                       .filter(lambda x: x[0] != None) \
                       .map(lambda lista: (lista[0], sorted(list(accumulate2(lista[1])), key = lambda x: -x[1]))) \
                       .cache()

#news_rdd.count()

In [9]:
from functools import reduce

id_search = int(dbutils.widgets.get("search"))
if not id_search in file_map or id_search == None: print("Not found")
else:
  new_title = file_map[id_search]
  in_new_rdd = news_rdd.filter(lambda x, id_search=id_search: x[0] == id_search)  
  in_new_list = in_new_rdd.collect()
  
  def news_similarity2(rdd_other_news):
    list1 = in_new_list[0][1]
    list2 = rdd_other_news[1]
    list3 = []
    for value in list1:
      for v in list2:
        if value[0] == v[0]: list3.append(value[0]) 
    union = list1 + list2
    distance_list = list(filter((lambda x, list3=list3: x[0] in list3), union))
    if len(distance_list) != 0:
      last_result = reduce(lambda a, b: (a[0], a[1] + b[1]) if a[0] != "" and b[0] != "" else 0, distance_list)[1]
      result = [rdd_other_news[0], last_result, len(list3)]
    else:
      result = [rdd_other_news[0], 0, 0]
    return result

  other_news = news_rdd.filter(lambda x, id_search=id_search: x[0] != id_search) 
  sim_news = other_news.map(news_similarity2) \
                          .sortBy(lambda x: -x[1])
                          
  sim_news_df = sim_news.toDF(["id","similarity","words"])
  #display(sim_news_df)

In [10]:
aux = sim_news.take(5)
news_result_final = []
for i in aux:
  news_result_final.append(i[0])
print(id_search, new_title, news_result_final)