# Data Mining Course Spark Exercise
## Sharif University of Technology

In this notebook we are going to analyze farsi news.
Outline of the exercise:
* Dataset preparation
* Preprocessing
* Exploration
* Word Collections
  
## Warning: RDD api only
You **can not** use Dataframe, Dataset, mllib, ml, ... apis of spark in this exercise. You should only use the [RDD api](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD).

# Please enter your name below:
# Name: Mohammad Mehdi Zare
# Student Number: 

# Section 1: Dataset prepartition
This section you need to download [dataset](https://drive.google.com/file/d/1bRxHQDzPr6wDimbM7b89H47kH8O3YV8Y/view?usp=sharing) in a directory you work. After that run the below cell to untar the datase.

**Note 1: Don't change the below command.**

**Note 2: If you use Windows OS, unzip the dataset manually.**

## Install Pypark & Initialization
Uncomment this section if you use google colab or local pc

In [1]:
!pip install python_bidi
!pip install hazm
!pip install pyspark



In [2]:
!pip install findspark



In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("HW2") \
    .master("local[*]") \
    .getOrCreate()

sc=spark.sparkContext

In [5]:
# unzip data
# !tar -xzvf "/content/drive/MyDrive/big data/data.tar.gz" -C "/content/drive/MyDrive/big data/"

# Reading the data

In [6]:
news_rdd = sc.textFile("news_data2.json")

# news_rdd=sc.parallelize(news_rdd.takeSample(False, 200000, 12))

# Section 2: Preprocessing
This section we try to normalize news and remove useless characters (for example /n and /u200c and ...). Also find and remove stop words.

In [7]:
# import neccesary packages here
import json
import string
from math import log10
from hazm.utils import stopwords_list
import numpy as np
import matplotlib.pyplot as plt

In [8]:
news_rdd1= news_rdd
json_rdd = news_rdd1.map(lambda x : json.loads(x))

In [9]:
json_rdd.take(1)

[{'body': 'سرمربی ملوان درباره تصاویر وایرالی دیدار با نساجی یک پست جالب منتشر کرد.\n\nبه گزارش "ورزش سه"، مهدی تارتار با خوشحالی عجیب و غریب دقیقه ۹۰+۳ یکی از چهرههای جالب این هفته لیگ برتر را به خود اختصاص داد که موردتوجه هواداران فوتبال قرار گرفت.\n\nتارتار حالا با انتشار یک پست اینستاگرامی از تصاویر جشن و خوشحالی خود نوشته که این جادوی طرفداران انزلی است که او را از خود بی خود کرده است.\n\nسرمربی ملوان همچنین وعده داده که تیمش در ادامه فصل نیز با تمام توان خواهد جنگید تا رضایت هواداران را به خود جلب کند.\n\nنوشته او را در ادامه میخوانید:\n\nحالا که به عکسها نگاه میکنم، متحیر میشوم. یادم نمیآید هرگز اینطور شوری گرفته باشم. این جادوی شماست، شما که روی سکو آنچنان حس و انرژی به ما میدهید که لحظه پیروزی برایمان وصفناشدنی باشد. از روی سکوها تا لب خط، از پای تلویزیونها تا در مستطیل سبز؛ ما همدل و یکصدا پیش میرویم. باهم میجنگیم و باهم میخندیم و هرروز تلاش میکنیم تا این خنده را روی صورت شما نگه داریم. این پیروزی برای شماست، شما مردان نیک روی چمن و شما مردمان نیک روی سکو \U0001fa75🤍',
  'ima

In [10]:
rows=json_rdd.count()
print("number of news : "+ str(rows))

number of news : 5000


In [11]:
# I think some news are duplicated, so, I decided to test and remove extra news. 
json_rdd = json_rdd.map(lambda x : (x['uid'],x))\
.reduceByKey(lambda x,y : x)\
.map(lambda x: x[1])

In [12]:
extra=rows-json_rdd.count()
print("number of extra news : "+ str(extra))

number of extra news : 0


In [13]:

def remove_punc(x):
    punks = '!"#$%&()*+,-./:;<=>?@[\]^_`{|}~1234567890۱۲۳۴۵۶۷۸۹۰،؛'
    no_punc = x
    for ch in punks:
      no_punc = no_punc.replace(ch , " ")
    return no_punc.replace("\n" , "").replace("\u200c" , " ")

def remove_stopwords(rdd):
  stopwords = stopwords_list()
  rdd=rdd.replace("های", "").replace("میشود", "")
  rdd = rdd.split()
  result = [word for word in rdd if not word in stopwords]
  return ' '.join(result)

def clean_rdd_util(T , function):
  T['body'] = function(T['body'])
  T['title'] = T['title'].replace("\u200c", " ")
  if "keywords" in T:
    T['keywords']= [function(x) for x in T['keywords']]
  else:
      T['keywords']=[]
  return T


def cleaning(rdd):
  rdd = rdd.map(lambda x: clean_rdd_util(x , remove_punc))
  rdd = rdd.map(lambda x: clean_rdd_util(x, remove_stopwords))
  return rdd;

In [14]:
cleaned= cleaning(json_rdd)

In [15]:
cleaned.take(15)

[{'body': 'سرمربی ملوان تصاویر وایرالی دیدار نساجی پست جالب منتشر گزارش ورزش مهدی تارتار خوشحالی عجیب غریب دقیقه چهره جالب هفته لیگ برتر اختصاص موردتوجه هواداران فوتبال قرار تارتار انتشار پست اینستاگرامی تصاویر جشن خوشحالی نوشته جادوی طرفداران انزلی سرمربی ملوان وعده تیمش ادامه فصل توان خواهد جنگید رضایت هواداران جلب نوشته ادامه میخوانید عکسها نگاه میکنم متحیر میشوم یادم نمیآید هرگز اینطور شوری باشم جادوی شماست سکو آنچنان حس انرژی میدهید لحظه پیروزی برایمان وصفناشدنی سکوها لب خط پای تلویزیونها مستطیل سبز همدل یکصدا میرویم باهم میجنگیم باهم میخندیم هرروز تلاش میکنیم خنده صورت نگه پیروزی شماست مردان نیک چمن مردمان نیک سکو \U0001fa75🤍',
  'image_title_url': 'https://news-cdn.varzesh3.com/pictures/2023/08/09/A/a5l2b23f.jpg',
  'language': 'fa',
  'source': 'ورزش سه',
  'title': 'نوشته خاص تارتار برای لحظه جنجالی (عکس)',
  'date_published': 1696909606,
  'uid': '68feae4bbbedc2d54adbb2369',
  'url': 'https://www.varzesh3.com/news/1971626/%D9%86%D9%88%D8%B4%D8%AA%D9%87-%D8%AE%D8%A7%D8%B5-%D8%

In [16]:
# according to desmose curve on "https://www.desmos.com/calculator/hwtd8h8hdx"  I think, these parameters are good choices.
r=8
b=40
m=r*b
s=.6
shi=7

# STEP1 : SHINGLING

In [17]:
def shingling(text, k): 
    shingles = set()
    for i in range(0, len(text)-k+1 ):
        shingles.add(text[i:i+k])
    return shingles

In [18]:
body = cleaned.take(1)[0]['body']
body

'سرمربی ملوان تصاویر وایرالی دیدار نساجی پست جالب منتشر گزارش ورزش مهدی تارتار خوشحالی عجیب غریب دقیقه چهره جالب هفته لیگ برتر اختصاص موردتوجه هواداران فوتبال قرار تارتار انتشار پست اینستاگرامی تصاویر جشن خوشحالی نوشته جادوی طرفداران انزلی سرمربی ملوان وعده تیمش ادامه فصل توان خواهد جنگید رضایت هواداران جلب نوشته ادامه میخوانید عکسها نگاه میکنم متحیر میشوم یادم نمیآید هرگز اینطور شوری باشم جادوی شماست سکو آنچنان حس انرژی میدهید لحظه پیروزی برایمان وصفناشدنی سکوها لب خط پای تلویزیونها مستطیل سبز همدل یکصدا میرویم باهم میجنگیم باهم میخندیم هرروز تلاش میکنیم خنده صورت نگه پیروزی شماست مردان نیک چمن مردمان نیک سکو \U0001fa75🤍'

In [19]:
news_data_shingled= cleaned.map(lambda x : (x['uid'],x['body']))\
.map(lambda x: (x[0], shingling(x[1],shi)))

In [20]:
news_data_shingled.take(1)

[('68feae4bbbedc2d54adbb2369',
  {' آنچنان',
   ' اختصاص',
   ' ادامه ',
   ' انتشار',
   ' انرژی ',
   ' انزلی ',
   ' اینستا',
   ' اینطور',
   ' باشم ج',
   ' باهم م',
   ' برایما',
   ' برتر ا',
   ' تارتار',
   ' تصاویر',
   ' تلاش م',
   ' تلویزی',
   ' توان خ',
   ' تیمش ا',
   ' جادوی ',
   ' جالب م',
   ' جالب ه',
   ' جشن خو',
   ' جلب نو',
   ' جنگید ',
   ' حس انر',
   ' خط پای',
   ' خنده ص',
   ' خواهد ',
   ' خوشحال',
   ' دقیقه ',
   ' دیدار ',
   ' رضایت ',
   ' سبز هم',
   ' سرمربی',
   ' سکو آن',
   ' سکو \U0001fa75🤍',
   ' سکوها ',
   ' شماست ',
   ' شوری ب',
   ' صورت ن',
   ' طرفدار',
   ' عجیب غ',
   ' عکسها ',
   ' غریب د',
   ' فصل تو',
   ' فوتبال',
   ' قرار ت',
   ' لب خط ',
   ' لحظه پ',
   ' لیگ بر',
   ' متحیر ',
   ' مردان ',
   ' مردمان',
   ' مستطیل',
   ' ملوان ',
   ' منتشر ',
   ' مهدی ت',
   ' موردتو',
   ' میجنگی',
   ' میخندی',
   ' میخوان',
   ' میدهید',
   ' میرویم',
   ' میشوم ',
   ' میکنم ',
   ' میکنیم',
   ' نساجی ',
   ' نمیآید',
   ' نوش

In [21]:
print ("number of shingles for each sample : "+str(len(news_data_shingled.take(1)[0][1])))

number of shingles for each sample : 582


In [22]:
all_shigle= news_data_shingled.map(lambda x: x[1])\
.flatMap(lambda x : x)\
.distinct()\
.collect()
len_unique = len(all_shigle)

In [23]:
len_unique

1372986

# STEP 2: Minhash

In [24]:
indexes = list(range(1,len_unique+1))
pairs=zip(all_shigle,indexes)
dic_shingles = dict(pairs)
dic_shingles

{'ش مهدی ': 1,
 ' عکسها ': 2,
 'اص مورد': 3,
 'ه چهره ': 4,
 'آنچنان ': 5,
 'ن نیک س': 6,
 'دامه فص': 7,
 'ورزش مه': 8,
 'ان حس ا': 9,
 'میآید ه': 10,
 'لاش میک': 11,
 'مان وصف': 12,
 ' تلاش م': 13,
 'دمان نی': 14,
 'فناشدنی': 15,
 'اران فو': 16,
 'جشن خوش': 17,
 'خواهد ج': 18,
 'یونها م': 19,
 'ران جلب': 20,
 'ان تصاو': 21,
 ' جادوی ': 22,
 'ور شوری': 23,
 'ی دیدار': 24,
 'د لحظه ': 25,
 'لب نوشت': 26,
 'م باهم ': 27,
 'تیمش اد': 28,
 'جالب هف': 29,
 'گه پیرو': 30,
 'ی میدهی': 31,
 'ست سکو ': 32,
 'فته لیگ': 33,
 'عده تیم': 34,
 ' نمیآید': 35,
 'ش ادامه': 36,
 'تارتار ': 37,
 'سرمربی ': 38,
 'جلب نوش': 39,
 'جیب غری': 40,
 ' یادم ن': 41,
 'ب دقیقه': 42,
 'ار نساج': 43,
 'رایمان ': 44,
 'ردمان ن': 45,
 'لوان وع': 46,
 'شته ادا': 47,
 'شته جاد': 48,
 'اشدنی س': 49,
 'م یادم ': 50,
 'یر وایر': 51,
 'نیک سکو': 52,
 ' سکو آن': 53,
 'ب نوشته': 54,
 ' خواهد ': 55,
 'صاویر ج': 56,
 'ژی میده': 57,
 ' شماست ': 58,
 ' ورزش م': 59,
 ' غریب د': 60,
 'شوری با': 61,
 'ه لیگ ب': 62,
 'وان خوا': 63,
 

In [25]:
all_docs_uid=news_data_shingled.map(lambda x : x[0]).collect()
pairs=zip(all_docs_uid,indexes)
dic_docs =dict(pairs)
dic_docs

{'68feae4bbbedc2d54adbb2369': 1,
 '2ad5a78866237762a3f742c51': 2,
 '0097b5fec5d970750b90fa046': 3,
 'f136d860f0dcbd6c98318e580': 4,
 '7b85bc41da526e89274f20958': 5,
 '1c79364ff8bf8a8fa096246a2': 6,
 '5110de54dae55a6f8f7c3ed0a': 7,
 '87b218ce3b0ac72d379358268': 8,
 '837d37a4906f6fb14f43cc81f': 9,
 '20df813955ff47aa322783bfb': 10,
 '5497ccaecdd45e198003e79a2': 11,
 '9337d28000d899408bb3b6722': 12,
 '29cf32170839299276be24a6c': 13,
 '78a1b366a87f861afa30ae8da': 14,
 'c4eaa6025745b6a1e67c4b13d': 15,
 '4f1a05b11ee33ff18bdb443ca': 16,
 'a4d54448e8a57c0890acaf932': 17,
 'e13ce3d66402beff728480ff8': 18,
 '3fa6a7614e698338fd2cfa469': 19,
 '299c48f3cdd7b211af6e87ab2': 20,
 'ea093fec1ce5c4c0c5966f97f': 21,
 '5a701cef1c33f70b07853c6d5': 22,
 'a8db5d2a130f448c618d36506': 23,
 '1abba23838648c3969429590e': 24,
 '66a0d3eaadb9f636786cca5a8': 25,
 'e6e46f0847e854336c7fca326': 26,
 'dfb3b676e24a7158259fe97c1': 27,
 '262e6d64699ba9ee3ad527045': 28,
 'af50f5ddae7645e7330f8c3ea': 29,
 'b9e6c9894752c80bb6ef7

In [26]:
# the family of hash functions, in this case, I use universal hash functions
import random
import math
random.seed(10)


# prime generator
p=2**31-1
class hashFamily:
    def __init__(self, i):
        self.hash_index=i
        self.a=random.randint(1,100)
        self.b=random.randint(1,1000)
    def get_hash_value(self, el_to_hash):
        return  (((self.a*el_to_hash)+self.b)%p)%len_unique



In [27]:
class minhashSigner:
    def __init__(self, sig_size):
        self.sig_size=sig_size
        self.hash_functions = [hashFamily(i) for i in range(0,sig_size)]

    def compute_set_signature(self, set_):
        set_sig = []
        for h_funct in self.hash_functions:
            min_hash = math.inf
            for el in set_:
                ell=dic_shingles[el]
                h = h_funct.get_hash_value(ell)
                if h < min_hash:
                    min_hash = h
            set_sig.append(min_hash)
        return set_sig


In [28]:
minhash = minhashSigner(m)
sig=news_data_shingled.map(lambda x:(x[0], minhash.compute_set_signature(list(x[1]))))

In [29]:
sig.take(1)

[('68feae4bbbedc2d54adbb2369',
  [108,
   550,
   90,
   501,
   907,
   706,
   57,
   569,
   120,
   1008,
   466,
   437,
   636,
   437,
   345,
   356,
   238,
   399,
   457,
   486,
   980,
   1024,
   686,
   95,
   80,
   169,
   989,
   589,
   838,
   353,
   649,
   505,
   128,
   683,
   907,
   226,
   1023,
   298,
   38,
   373,
   752,
   557,
   165,
   631,
   983,
   231,
   1002,
   892,
   466,
   849,
   996,
   489,
   511,
   730,
   183,
   615,
   909,
   290,
   369,
   699,
   1050,
   820,
   515,
   455,
   488,
   35,
   800,
   346,
   290,
   280,
   473,
   238,
   993,
   754,
   785,
   1001,
   568,
   679,
   129,
   524,
   560,
   192,
   609,
   433,
   467,
   782,
   595,
   159,
   903,
   898,
   451,
   136,
   366,
   168,
   637,
   913,
   206,
   942,
   850,
   1010,
   410,
   359,
   560,
   392,
   617,
   225,
   339,
   676,
   709,
   685,
   666,
   572,
   831,
   1001,
   484,
   768,
   949,
   60,
   1013,
   857,
   851,

In [30]:
sss=sig.collect()
qq=dict(sss)

# STEP 3:  LSH


In [31]:
class LSH:
    def __init__(self, r,b, uid):
        self.uid=uid
        self.r=r
        self.b=b
        self.sig=qq[uid]
    def candidate_sig(self, uid):
        temp_Sig=qq[uid]
        flag=False
        for i in range(0,self.b):
            flag =flag or self.sig[r*i:r*i+r-1]==temp_Sig[r*i:r*i+r-1]
            if(flag):
                return True
        return(False)

In [32]:
# in this part I check is there any related news??
# do not run it again.
# for sample_uid  in all_docs_uid:
#     lsh=LSH(r,b,sample_uid)
#     ans=sig.filter(lambda x: lsh.candidate_sig(x[0])).map(lambda x: x[0]).collect()
#     # ans
#     final_uids=[]
#     final_uids.append(sample_uid)
#     ans.remove(sample_uid)
#     for u in ans:
#         set_sig_2 = set(qq[u])
#         set_sig_1 = set(qq[sample_uid])
#         jaccard_similarity_sig = len(set_sig_1.intersection(set_sig_2))/len(set_sig_1.union(set_sig_2))
#         if(jaccard_similarity_sig>s):
#             final_uids.append(u)
#     if(len(final_uids)>1):
#         print(final_uids)

['68feae4bbbedc2d54adbb2369', 'a1fc8e369af3bb152173e01d8', '9179a70694ff478b8eb927e00', '8dc63b7fab7c218e25a3ab18e', 'c8c795b004c95a896b981c7d7', '53529513b997ca790c5fba898']
['0097b5fec5d970750b90fa046', 'aff311ec6247dc1da6696786f', '3b1d68aee6f11a2e81eb97b85', '73051ae16f5aca7afea5ab324', '31e2523b9b04829c2d8169f2f', '41585d069acc7677422c7aa34', 'a2c35c0be7d234b543bc2f3e0', '416d2daa79b8c4882f9d01efc', '8e09b7ae7ac94406b7333dea1', 'ed800395102f1a5ae8e0a61d3', '53ae8f879226a6b1e49257bf7', '3b7a9d48567489a42ce367491', '70476a21d663ec0ad5cb531a7', '48c852a5d47e22062e954a6ea', '49dde776179747c6a58fa0251', '74ad771ebdbb1f77bdcf45165', '1e1ac679c6ba0d3f44fa10417', 'dc605b69d6e8133971f3f8e81', '20f0ceb848db24b577cf50026', 'a3bb9bd94d160cac9b6bb5cdb', '0e3f165bba118c83af1f6d557', '7690321f836d92ae083a19898', 'f547ff972ffb13b6b0c5df797', '0184a2f6b9df84f2203d79f19', '64b45a051e5eb4a52a32f6ca6', '5900a8830f25a2ac0b1bd24ae', '1792e2a01cac0229db82a609f', '6171e56109d25250f66a20e8e', '81bb431e991

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\spark\spark-3.5.0-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\spark\spark-3.5.0-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\Mehdi\anaconda3\lib\socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [38]:
sample_uid= 'e13ce3d66402beff728480ff8'
lsh=LSH(r,b,sample_uid)
ans=sig.filter(lambda x: lsh.candidate_sig(x[0])).map(lambda x: x[0]).collect()
final_uids=[]
final_uids.append(sample_uid)
ans.remove(sample_uid)
for u in ans:
    set_sig_2 = set(qq[u])
    set_sig_1 = set(qq[sample_uid])
    jaccard_similarity_sig = len(set_sig_1.intersection(set_sig_2))/len(set_sig_1.union(set_sig_2))
    if(jaccard_similarity_sig>s):
         final_uids.append(u)
if(len(final_uids)>1):
    print(final_uids)

['e13ce3d66402beff728480ff8', '0c121ea5f3c89668a74620d8a']


In [39]:
print(ans)

['0c121ea5f3c89668a74620d8a']


In [40]:
# above algorithm find ans list as a pair candidate
json_rdd.filter(lambda x : x['uid'] in final_uids).map(lambda x: (x['body'], x['keywords'])).take(10)

[('صادرات خودروهای تولید شده در ترکیه به کشورهای اتحادیه اروپا در نه ماه سال جاری (2023) نسبت به مدت مشابه سال قبل 22 درصد افزایش یافته است.\n\nبر اساس دادههای انجمن صادرکنندگان صنعت خودروی اولوداغ و مجمع صادرکنندگان ترکیه، این بخش محصولات خود را در بازه زمانی ژانویه تا سپتامبر به بیش از 190 کشور و همچنین مناطق مستقل و آزاد فروخته است.\n\nصادرات خودرو و صنایع مرتبط با آن در 9 امسال با افزایش 15 درصدی نسبت به دوره مشابه سال گذشته به 25 میلیارد و 619 میلیون و 838 هزار دلار رسید.\n\nهمچنین طبق این دادهها، سهم بخش خودرو از کل صادرات 15.7 درصد بوده است.\n\nاتحادیه اروپا با سهم 68.6 درصدی در صادرات خودروی ترکیه، در رده اول قرار دارد.\n\nهمچنین کشورهای آلمان و فرانسه به ترتیب در رده اول و دوم مقاصد صادراتی خودروهای تولید شده در ترکیه قرار دارند.',
  ['انجمن صادرکنندگان صنعت خودروی اولوداغ',
   'مجمع صادرکنندگان ترکیه',
   'اتحادیه اروپا',
   'صادرات خودرو']),
 ('صادرات خودروهای تولید شده در ترکیه به کشورهای اتحادیه اروپا در نه ماه امسال 22 درصد افزایش یافت.\n\nصادرات خودروهای تولید شده در ترک

In [43]:
# check sig similarity for sample and next element
ss1=final_uids[0]
ss2=final_uids[1]
set_sig_1 = set(qq[ss1])
set_sig_2 = set(qq[ss2])
jaccard_similarity_sig = len(set_sig_1.intersection(set_sig_2))/len(set_sig_1.union(set_sig_2))
print("jaccard similarity for signature is : "+ str(jaccard_similarity_sig))

jaccard similarity for signature is : 0.8357348703170029


In [44]:
# check thier shingle similarity for sample and next element

sad=news_data_shingled.filter(lambda x: x[0] in [ss1,ss2]).collect()
sad2=dict(sad)
jaccard_similarity_shingle_set = len(set(sad2[ss1].intersection(sad2[ss2])))/len(set(sad2[ss1].union(sad2[ss2])))
print("jaccard similarity for shingle set is : "+ str(jaccard_similarity_shingle_set))

jaccard similarity for shingle set is : 0.967391304347826
