<a href="https://colab.research.google.com/github/nafisenik/DataMinig_pySpark/blob/main/question_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Install Pyspark

In [1]:
! pip install -q pyspark
! pip install hazm



In [2]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
! tar xf spark-3.2.1-bin-hadoop3.2.tgz

### Import libraries

In [3]:
import random
import statistics
import time
import numpy as np
from operator import add
import matplotlib.pyplot as plt
import math
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
import re
from hazm import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from hazm import sent_tokenize, word_tokenize
from hazm import stopwords_list
from hazm import Stemmer, Lemmatizer

### Start spark session

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .getOrCreate()

sc = spark.sparkContext

### Read our data

In [5]:
! wget -q https://raw.githubusercontent.com/SajjadMb/PySpark-walkthrough/main/data/digikala_comments.csv

In [6]:
df = spark.read.option("header", True).option("multiline",True).csv("digikala_comments.csv")
df.show(50)

+----------+--------------------+--------+-------+-----+--------+-------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|product_id|       product_title|title_en|user_id|likes|dislikes|verification_status|      recommend|               title|             comment|          advantages|       disadvantages|
+----------+--------------------+--------+-------+-----+--------+-------------------+---------------+--------------------+--------------------+--------------------+--------------------+
|      3692|ماوس بی‌سیم لاجیت...|      IT| 989472|    0|       0|           verified|             \N|                null|واقعا عالیه. من ک...|                null|                null|
|     90213|شارژر همراه شیاوم...|      AC|3862150|    4|       1|           verified|    recommended|        واقعاً عالیه|سلام، قبل اینکه ن...|  "[""عمر طولانی\r""|""افت بسیار کم می...|
|     59473|یدک پولیشر میکروف...|      HW| 626843|    1|       0|     

### Clean and normalized our Data

#### convert persian numbers to english numbers

In [7]:
def convert_num(text):
  persian = ['۰', '۱', '۲', '۳', '۴', '۵', '۶', '۷', '۸', '۹']
  convertedPersianNums = text.replace('۰','0').replace('۱','1').replace('۲','2').replace('۳','3').replace('۴','4')\
  .replace('۵','5').replace('۶','6').replace('۷','7').replace('۸','8').replace('۹','9')
  return convertedPersianNums

##### Test our function

In [8]:
x = convert_num('۳  من ۲۲ سال سن دارم و هر روز ')
print(x)

3  من 22 سال سن دارم و هر روز 


#### Remove stop words

In [9]:
stopwords = stopwords_list()+['ها','های','یه','ک','ام','ای','اگه','تر','اس','اینو','ازش','واسه','ی','هاش','برا','تره','اونم','چی','ترین','جی','داره','هست']
def remove_stop_words(text):
  text = re.sub('[^آ-ی]', ' ', text)
  tokenized_text = word_tokenize(text)
  #print(tokenized_text)
  all_tokens = [t for t in tokenized_text if t not in stopwords]
  #print(all_tokens)
  text = " ".join(all_tokens)
  return text

##### Test our functions

In [10]:
x = remove_stop_words('من از شهر به روستا رفتم و در راه علی را دیدم')
print(x)

شهر روستا رفتم علی دیدم


In [11]:
#https://github.com/optimopium/PySpark-walkthrough/blob/main/Spark_Tutorial.ipynb
def clean_normalized(text):
  normalizer = Normalizer()
  try:
    #normalize
    cleaned_text = text.replace('_x000D_\n',' ').splitlines() 
    cleaned_text = ' '.join(cleaned_text)
    cleaned_text = normalizer.normalize(cleaned_text)
    cleaned_text = cleaned_text.replace("،"," ").replace("\u200c"," ").replace(r"\u200"," ").replace(r"\u200b"," ")\
            .replace("."," ").replace("-"," ").replace("_"," ").replace("\n"," ").replace("\r"," ").replace("\\"," ")\
            .replace("("," ").replace(")"," ").replace("["," ").replace("!"," ").replace("]"," ")\
            .replace("r"," ").replace("\""," ")
    cleaned_text = re.sub(r"\s+.\u0648.\s+", ' ', cleaned_text)
    cleaned_text = re.sub(r'[^\w\s]', ' ', cleaned_text)
    cleaned_text = convert_num(cleaned_text)
    cleaned_text = re.sub(r'[0-9]', ' ', cleaned_text)
    cleaned_text = re.sub(' +', ' ',cleaned_text)
    cleaned_text = remove_stop_words(cleaned_text)
    return cleaned_text
  except:
    return None

#### Use udf to create our function

In [12]:
clean_comment = udf(lambda z: clean_normalized(z))
stop_words_deletion = udf(lambda z: remove_stop_words(z))

##### Our data befor cleaning

In [13]:
df.select('product_id', 'comment').show(truncate=False)
df_res = df.select(col("product_id"),clean_comment(col("comment")).alias("comment") ) 

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

##### Cleaned data

In [14]:
df_res.show(truncate=False)

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|product_id|comment                                                                                                                                                                                                  

### Count number of words for each comment

<div dir=rtl>
 در این سوال فرض کردم از انجا که ما تمام ردیف هایی که حتی یک پارامتر آن ها null باشد را دور می ریزیم پس تعداد کامنت ها به اندازه تعداد ردیف های دیتاست ما می باشد و از طرفی تعداد کلمات در هر کامنت را نیز به دست آوردم و بعد مجموع را حساب کردم که تعداد کلمات کل را می دهد. 
</div>

In [15]:
df_word_count = df_res.withColumn('wordCount', F.size(F.split(F.col('comment'), ' ')))
df_word_count.select('product_id','wordCount').show(100)

+----------+---------+
|product_id|wordCount|
+----------+---------+
|      3692|        3|
|     90213|       93|
|     59473|       30|
|    120499|      129|
|     67200|       38|
|    133722|      127|
|    148509|       20|
|     56871|        9|
|     49738|       22|
|    161548|        1|
|      5107|       41|
|    156041|       10|
|    134367|       44|
|    114006|       15|
|    151402|       46|
|    146879|       12|
|     81254|       10|
|     26368|        4|
|     90894|       28|
|     21340|        4|
|    144139|       42|
|    138040|       36|
|    105719|       19|
|    147165|       24|
|    116387|       24|
|     73064|       42|
|    110040|       45|
|    139131|       27|
|     80854|        3|
|    163519|        6|
|    102068|      224|
|    129988|       18|
|    130386|       47|
|    160008|       15|
|     93850|       65|
|    154624|       32|
|    106471|        2|
|    112714|       29|
|    104427|       32|
|    133709|       38|
|     55495

#### Number of comments

In [16]:
comment_num = df_res.dropna().distinct().count()
print(f'Number of Comment ----> {comment_num}')

Number of Comment ----> 8287


#### Number of words

In [17]:
words_num = df_word_count.agg({'wordCount': 'sum'}).show()

+--------------+
|sum(wordCount)|
+--------------+
|        147479|
+--------------+



#### Calculate like and dislike difference 

In [18]:
selected_df = df.select("product_id", "likes", "dislikes", "comment")
popularity = selected_df.dropna().withColumn("popularity",selected_df["likes"] - selected_df["dislikes"])
popularity.orderBy("popularity", ascending=False).select('product_id','popularity').show(10)

+----------+----------+
|product_id|popularity|
+----------+----------+
|    517207|     184.0|
|    597475|     124.0|
|    503608|     115.0|
|    701679|     114.0|
|    133901|     113.0|
|    517207|     109.0|
|    443966|     108.0|
|    226249|     102.0|
|    341816|      91.0|
|    313420|      80.0|
+----------+----------+
only showing top 10 rows



#### 100 most frequent words

In [19]:
df_res.withColumn('word', F.explode(F.split(F.col('comment'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show(100)

+-------+-----+
|   word|count|
+-------+-----+
|استفاده| 1783|
|  کیفیت| 1594|
|   میشه| 1389|
|  خریدم| 1300|
|   گوشی| 1174|
|   خوبه| 1140|
|  عالیه| 1140|
|  واقعا| 1076|
|   اصلا|  981|
|  میکنم|  933|
|  نداره|  884|
|   قیمت|  884|
|  میکنه|  883|
|   خرید|  858|
|    کار|  837|
|   دیجی|  808|
|   راضی|  801|
|   شارژ|  792|
|پیشنهاد|  769|
|   کالا|  707|
|   باشه|  642|
|   دیگه|  635|
|  محصول|  629|
|   سلام|  615|
|  قیمتش|  565|
| دستگاه|  492|
|  گرفتم|  491|
|   دارم|  489|
|   ساعت|  464|
|   حتما|  464|
| دوستان|  455|
|    ماه|  454|
|    مدل|  453|
|    رنگ|  444|
|   الان|  440|
|  راضیم|  428|
|   ارزش|  413|
|    جنس|  408|
|   میده|  381|
|  نمیشه|  381|
|   بازی|  378|
|    کنه|  374|
|   هستش|  367|
|    سال|  366|
|   هستم|  352|
|    دست|  346|
|   مشکل|  330|
|  بخرید|  323|
|  خوبیه|  323|
|   دستم|  322|
|   راحت|  315|
|    نمی|  313|
|    بشه|  312|
|    بگم|  303|
|   توجه|  293|
| دوربین|  291|
|   شگفت|  290|
| العاده|  283|
|   صفحه|  280|
|   صدای

#### Tokenize our comment for FPgrowth

In [20]:
def tokenize(input_text):
  try:
    token_text = word_tokenize(input_text)
    token_text_final = list(set(token_text))
    return ','.join(token_text_final)
  except:
    return None

##### Test our function

In [21]:
tokenize('سلام من خوبم تو خوبی؟')

'سلام,؟,خوبم,خوبی,من,تو'

In [22]:
order_product = popularity.orderBy("popularity", ascending=False)
my_product = order_product.take(50)
products = spark.createDataFrame(my_product)
products.show()

+----------+-----+--------+--------------------+----------+
|product_id|likes|dislikes|             comment|popularity|
+----------+-----+--------+--------------------+----------+
|    517207|  206|      22|خلاصه بگم ماشین ب...|     184.0|
|    597475|  129|       5|با سلام._x000D_\n...|     124.0|
|    503608|  125|      10|    هیچ تاثیری نداشت|     115.0|
|    701679|  145|      31|من این مداد رنگی ...|     114.0|
|    133901|  122|       9|این دوربین قطعا ا...|     113.0|
|    517207|  136|      27|خیلی وسیله ی بی ک...|     109.0|
|    443966|  110|       2|اینا به این درد م...|     108.0|
|    226249|  105|       3|سلام دوستان_x000D...|     102.0|
|    341816|   96|       5|ببینید اینکه یکی ...|      91.0|
|    313420|  100|      20|    بهقیمتش نمی عرضه|      80.0|
|    253710|   84|       8|دوستان دیر شارژ ش...|      76.0|
|    203451|   67|       3|من تقریبا یک سال ...|      64.0|
|    334514|   72|       9|این برندحرف نداره...|      63.0|
|    700304|   60|       2|در یک کلام ، 

In [23]:
filter_products = products.select('product_id','comment')
filter_products.show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [24]:
products_sel = filter_products.select(col("product_id"),stop_words_deletion(col("comment")).alias("comment") ) 
products_sel.show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
comment_tokenization = udf(lambda z: tokenize(z))
df_tokenize = products_sel.select(col("product_id"),comment_tokenization(col("comment")).alias("comment") ) 
df_tokenize.show(20)

+----------+--------------------+
|product_id|             comment|
+----------+--------------------+
|    517207|ماشین,حرکت,بگم,نخ...|
|    597475|ساخته,ساده,تشخیص,...|
|    503608|        نداشت,تاثیری|
|    701679|نقاش,رنگی,دیدم,کا...|
|    133901|دی,حسرت,قطعا,هاست...|
|    517207|ماشین,پراید,ایمن,...|
|    443966|خورشد,طلایی,کنه,خ...|
|    226249|ترجیحا,خدا,اصلی,ب...|
|    341816|قابلیت,کنسول,برعک...|
|    313420|    عرضه,نمی,بهقیمتش|
|    253710|نیس,ساخته,راحت,زم...|
|    203451|هوشمند,سرگرمی,زما...|
|    334514|امریکاییه,هرچندچی...|
|    700304|   انتظار,کلام,داشتم|
|    137928|تقلب,کشیدم,ساخته,...|
|    388255|میشه,برسه,پاره,ذر...|
|    189875|بیسیم,راحت,عرض,کر...|
|    700304|راضی,بودنش,واقعا,...|
|    135021|ماشالله,رفتن,سریع...|
|    619216|      لطفا,کامنت,پاک|
+----------+--------------------+
only showing top 20 rows



In [26]:
from pyspark.sql.functions import split, col
df2 = df_tokenize.select(split(col("comment"),",").alias("commentArray")) \
    .drop("comment")
df2.printSchema()
df2.show()

root
 |-- commentArray: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+
|        commentArray|
+--------------------+
|[ماشین, حرکت, بگم...|
|[ساخته, ساده, تشخ...|
|     [نداشت, تاثیری]|
|[نقاش, رنگی, دیدم...|
|[دی, حسرت, قطعا, ...|
|[ماشین, پراید, ای...|
|[خورشد, طلایی, کن...|
|[ترجیحا, خدا, اصل...|
|[قابلیت, کنسول, ب...|
|[عرضه, نمی, بهقیمتش]|
|[نیس, ساخته, راحت...|
|[هوشمند, سرگرمی, ...|
|[امریکاییه, هرچند...|
|[انتظار, کلام, دا...|
|[تقلب, کشیدم, ساخ...|
|[میشه, برسه, پاره...|
|[بیسیم, راحت, عرض...|
|[راضی, بودنش, واق...|
|[ماشالله, رفتن, س...|
|  [لطفا, کامنت, پاک]|
+--------------------+
only showing top 20 rows



In [27]:
df_tokenize.printSchema

<bound method DataFrame.printSchema of DataFrame[product_id: string, comment: string]>

In [28]:

from pyspark.ml.fpm import FPGrowth
fpGrowth = FPGrowth(itemsCol="commentArray", minSupport=0.1, minConfidence=0.1)
model = fpGrowth.fit(df2)

# Display frequent itemsets.
model.freqItemsets.show()

# Display generated association rules.
model.associationRules.show()

# transform examines the input items against all the association rules and summarize the
# consequents as prediction
model.transform(df2).show()




+--------------------+----+
|               items|freq|
+--------------------+----+
|               [بشه]|   6|
|      [بشه, استفاده]|   5|
|              [مثلا]|   5|
|               [بگم]|   8|
|        [بگم, کیفیت]|   6|
|[بگم, کیفیت, استف...|   5|
|      [بگم, استفاده]|   6|
|              [دیجی]|   6|
|              [دیگه]|   6|
|        [دیگه, میشه]|   5|
|         [دیگه, کار]|   5|
|     [دیگه, استفاده]|   5|
|              [قیمت]|   7|
|              [میده]|   5|
|              [ساعت]|   6|
|              [سلام]|  10|
|     [سلام, استفاده]|   5|
|               [کنه]|   9|
|              [حتما]|   7|
|     [حتما, استفاده]|   6|
+--------------------+----+
only showing top 20 rows

+----------------+----------+------------------+------------------+-------+
|      antecedent|consequent|        confidence|              lift|support|
+----------------+----------+------------------+------------------+-------+
|          [سلام]| [استفاده]|               0.5|2.0833333333333335|    0.1