# PageRank 팀 코드

### **Install Pyspark**   
just implement all blocks in this section

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

In [None]:
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

### **Upload data**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd /content/drive/MyDrive/

In [None]:
import pandas as pd
import numpy as np
import ast

#modify as your filename
filename = 'data_21_22.csv'

df = pd.read_csv(filename)
#df = df[['cino', 'compound_nouns', 'only_weight_dic', 'use_less_dic']]
#preview for data
df.head()

In [None]:
df = df[['cino', 'compound_nouns', 'only_weight_dic', 'use_less_dic']]

#### replace nan and convert data form to list
*if df['compound_nouns']'s list length is 0, drop that row

In [None]:
#fill 'nan' as str of empty list: '[]'
df['compound_nouns'] = df['compound_nouns'].fillna('[]')

#make a list for text had to be processed
#'lists' used as saving the 'compound_nouns' list on df, 'n_lists' used as a corpus for computing tfidf
lists = []
for element in df['compound_nouns']:
  noun = ast.literal_eval(element)
  lists.append(noun)

df['compound_nouns'] = lists

In [None]:
#fill 'nan' as str of empty list: '[]'
df['only_weight_dic'] = df['only_weight_dic'].fillna('[]')

#make a list for text had to be processed
#'lists' used as saving the 'compound_nouns' list on df, 'n_lists' used as a corpus for computing tfidf
lists = []
for element in df['only_weight_dic']:
  noun = ast.literal_eval(element)
  lists.append(noun)

df['only_weight_dic'] = lists

In [None]:
#fill 'nan' as str of empty list: '[]'
df['use_less_dic'] = df['use_less_dic'].fillna('[]')

#make a list for text had to be processed
#'lists' used as saving the 'compound_nouns' list on df, 'n_lists' used as a corpus for computing tfidf
lists = []
for element in df['use_less_dic']:
  noun = ast.literal_eval(element)
  lists.append(noun)

df['use_less_dic'] = lists

In [None]:
#if df['compound_nouns']'s list length is 0, drop that row
lists = []
for i in range(df.shape[0]):
  if len(df['compound_nouns'][i]) == 0:
    print(i)
    lists.append(i)
df = df.drop(lists)
df = df.reset_index(drop=True)

In [None]:
df

### **PageRank**

#### Import the library

In [None]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
sc = SparkContext("local", "pagerank")

In [None]:
import math
import time
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from collections import defaultdict, Counter

#### Find class type: df['cino_alpha']

In [None]:
#take str part from cino
df['cino_alpha'] = df['cino'].str[0:3]
df['cino_alpha']

#### Compute Tfidf score: df['tfidf_result']

In [None]:
n_lists = []
for element in df['compound_nouns']:
  n_lists.append(' '.join(element))

In [None]:
#compute the tfidf score
vectorizer = TfidfVectorizer()
sp_matrix = vectorizer.fit_transform(n_lists)
tfidf_result = []

word2id = defaultdict(lambda : 0)
for idx, feature in enumerate(vectorizer.get_feature_names()):
  word2id[feature] = idx

for i, sent in enumerate(n_lists):
  k = [ (token, sp_matrix[i, word2id[token]]) for token in sent.split() ]
  tfidf_result.append(k)

#save the tfidf score
df['tfidf_result'] = tfidf_result

#### Sum tfidf scores: df['tfidf_result_sum']

In [None]:
#sum the score of same token
lists = []
for element in df['tfidf_result']:
  tf_rdd = sc.parallelize(element)
  lists.append(tf_rdd.reduceByKey(lambda a, b: a+b).collect())

df['tfidf_result_sum'] = lists

#### Apply df['only_weight_dic'] and df['use_less_dic']: df['update_result']

In [None]:
#tfidf_sum_list: 'tfidf_result_sum' of one row
#weight_list: (token, weight) lists of compound_nouns
#3 kinds of weight: weight_p, weight_m, 1
def update (tfidf_sum_list, weight_list):
    results = []
    for i in range(len(tfidf_sum_list)):
      temp = list(tfidf_sum_list[i])
      temp[1] = temp[1] * weight_list[i][1]
      token_result = tuple(temp)
      results.append(token_result)
    return results

In [None]:
#define weight
weight_p = 10
weight_m = 0.0001

#make a list that has (token, weight) tuples from 'compound_nouns'
update_lists = []
for i in range(df.shape[0]):
  w_lists = []
  for token in df['compound_nouns'][i]:
    if token in df['only_weight_dic'][i]:
      w_lists.append((token, weight_p))
    elif token in df['use_less_dic'][i]:
      w_lists.append((token, weight_m))
    else:
      w_lists.append((token, 1))

  update_lists.append(update(df['tfidf_result_sum'][i], w_lists))

#save the update score of tfidf
df['update_results'] = update_lists

In [None]:
sorted(df['update_results'][8], key =lambda t: t[1], reverse=True)

#### Edit tfidf score with threshold: df['tfidf_edit']

In [None]:
#define threshold
threshold = 0.05
zero = 0

#change tfidf score which is lower than threshold as zero
lists = []
for element in df['update_results']:
  tokens = []
  for token in element:
    token_l = list(token)
    if token_l[1] < threshold:
      token_l[1] = zero
      token = tuple(token_l)
    tokens.append(token)
  lists.append(tokens)

#save the edit version of tfidf
df['tfidf_edit'] = lists

#### K means

##### 1. Compute initial cluster with class type dictionary(alpha_dict): df['cino_alpha_index']
##### 2. Find cino index of every row with cino dictionary(cino_dict): df['cino_index']
##### 3. Find token index of every token with token dictionary(token_dict): df['token_index']
##### 4. Make a location point of every token: df['kmeans_point']
('cino_alpha_index', ['cino_index', 'token_index', 'tfidf_edit'])

In [None]:
#make a dictionary of 'cino alpha'
lists = []
lists = list(df.cino_alpha.unique())
alpha_dict = {alpha : i for i,alpha in enumerate(lists)}

#make a dictionary of 'cino'
lists = []
lists = list(df['cino'])
cino_dict = {cino : i for i,cino in enumerate(lists)}

#make a dictionary of 'compound_nouns'
lists = []
for element in df['compound_nouns']:
  lists = lists + element
lists = list(set(lists))
token_dict = {token : i for i,token in enumerate(lists)}

In [None]:
#compute initial cluster with class type dictionary
lists = []
for alpha in df['cino_alpha']:
  lists.append(alpha_dict[alpha])
df['cino_alpha_index'] = lists

#find cino index of every row with class type dictionary
lists = []
for cino in df['cino']:
  lists.append(cino_dict[cino])
df['cino_index'] = lists

#find token index of every token with token dictionary
t_lists = []
for element in df['compound_nouns']:
  lists = []
  for token in element:
    lists.append(token_dict[token])
  t_lists.append(lists)
df['token_index'] = t_lists

In [None]:
#make a location point of every token ('kmeans_point')
#(cluster, 'cino_index', 'token_index', 'tfidf_edit')
k_lists = []
for i in range(df.shape[0]):
  lists = []
  for k in range(len(df['tfidf_edit'][i])):
    lists.append((df['cino_alpha_index'][i], np.array([df['cino_index'][i], df['token_index'][i][k], df['tfidf_edit'][i][k][1]])))
  k_lists.append(lists)
df['kmeans_point'] = k_lists

##### 5. Define functions
    1) Compute distance between two points(compute_square_distance)   
    2) Assign the points to nearest centroid(assign_to_nearest_centroid)   
    3) Compute the difference between old_centroids and new_centroids(compute_metric)   
    4) Cluster the data with k clusters(Kmeans)

In [None]:
def compute_square_distance(a, b):
    return np.sum((a - b)**2)

In [None]:
def assign_to_nearest_centroid(pair, centroids):
    squared_distances = list(map(lambda centroid: compute_square_distance(pair[1], centroid[1]), centroids))
    nearest_centroid_id = np.argmin(squared_distances)

    return nearest_centroid_id, pair[1]

In [None]:
#difference between old_centroids and new_centroids
def compute_metric(old_centroids, new_centroids):
    old_centroids = sorted(old_centroids, key=lambda item: item[0])
    new_centroids = sorted(new_centroids, key=lambda item: item[0])

    distances = map(
        lambda zipped: np.sqrt(compute_square_distance(zipped[0][1], zipped[1][1])),
        zip(old_centroids, new_centroids)
    )
    sum_of_distances = sum(distances)

    return sum_of_distances

In [None]:
def Kmeans(datalist):
    k_rdd = sc.parallelize(datalist)
    centroids1 = k_rdd.groupByKey().mapValues(lambda value: np.mean(list(value), axis=0)*(1/3)).collect()
    centroids2 = k_rdd.groupByKey().mapValues(lambda value: np.mean(list(value), axis=0)*(2/3)).map(lambda t: (2*t[0], t[1])).collect()
    centroids = centroids1 + centroids2

    for i in range(MAX_ITER):
        print(i)
        centroids_bc = sc.broadcast(centroids)

        new_rdd = k_rdd.map(lambda pair: assign_to_nearest_centroid(pair, centroids_bc.value))
        new_centroids_rdd = new_rdd.groupByKey().mapValues(lambda value: np.mean(list(value), axis=0)).cache()
        new_centroids = new_centroids_rdd.collect()

        update_centroids = {centroid[0] : centroid[1][1] for centroid in enumerate(centroids)}
        for centroid in new_centroids:
          update_centroids[centroid[0]] = centroid[1]
        update_centroids = list(update_centroids.items())

        metric = compute_metric(centroids, update_centroids)
        print(metric)
        if metric < THRESHOLD:
            break

        centroids = update_centroids

    return new_rdd

##### 6. Transform the data to implement Kmeans
    : make a list of df['kmeans_point']

In [None]:
#make a dataset for kmeans
#p_lists: concat all tokens location point
p_lists = []
for element in df['kmeans_point']:
    for token in element:
        p_lists.append(token)

k_rdd = sc.parallelize(p_lists)

##### 7. Implement Kmeans with proper THRESHOLD and MAXITER

In [None]:
#implement kmeans with maximum iteration(MAX_ITER) and stop point(TRESHOLD)
THRESHOLD = 0.1
MAX_ITER = 1000
result_rdd = Kmeans(p_lists)

##### 8. confirm the cluster of every row: df['final_cluster']
    : confirmed cluster: mode value of tokens in a row
##### 9. save the frequency of tokens in every row: df['token_cluster']

In [None]:
#group the token location points by cino_index
result_list = result_rdd.map(lambda t: (int(t[1][0]), t[0])).groupByKey().mapValues(list).collect()

#find the mode cluster of every row
#count_lists: frequency of tokens in every row
#mode_lists: mode cluster number of every row
count_lists = []
mode_lists = []
for element in result_list:
  mode_dict = Counter(element[1])
  temp = mode_dict.most_common()
  count_lists.append(temp)
  mode_lists.append(temp[0][0])

df['final_clu'] = mode_lists
df['tokens_clu'] = count_lists

#### PageRank

##### 1. Define functions
    1) Compute LinkStructure by blocks(make_link_set)   
    2) Compute PageRank by blocks(pr_map)

In [None]:
def make_link_set(dest_set, block_cnt, block_size):
  source = dest_set[0]
  block_based_dest_set = [(source, []) for i in range(block_cnt)]

  for dest in dest_set[1]:
    block_based_dest_set[math.ceil(dest/block_size)-1][1].append(dest)

  for i in range(block_cnt):
    if len(block_based_dest_set[i][1]) != 0:
      yield (i, block_based_dest_set[i])

In [None]:
def pr_map(block_data, block_size, N):
  block_id, link_sets = block_data
  new_sr = np.zeros(block_size) + 1.5/N

  for link_set in link_sets:
    source, dest_set = link_set
    val = pr.value[source] / len(dest_set)

    for dest in dest_set:
      new_sr[math.ceil(dest/block_cnt)-1] += val

  yield (block_id, new_sr.tolist())

##### 2. Transform the data to compute PageRank
    1) Make a dataframe about cluster member: df_cluster
    2) Define function that extract specific cluster's rows: extract_proper_row

In [None]:
#make dataframe about cluster member (df_cluster)
df_clu = df.groupby('final_clu')['cino'].apply(list).reset_index(name='clu_elements')

In [None]:
#extract rows which is 'the' cluster member from df
#return the particular rows in df
def extract_proper_row(clu_elements, df):
  df_temp = df[df['cino'] == 'AAA0000']
  for element in clu_elements:
    df_temp = pd.concat([df_temp, df[df['cino']==element]])
  return df_temp

##### 3. Implement PageRank of every cluster with proper block_cnt and maxIter
    1) Make a dataframe with specific cluster's rows: df_row
    2) Map (source, dest)pair make
        source: temp index of cino (df['temp_index'])   
        dest: token_index
    3) Build a link structure in blocks
    4) Compute PageRank in blocks



In [None]:
#define parameters for implementing PageRank
N = len(token_dict)

block = [16, 8, 4, 2, 1]
maxIter = 20
execution_time = []

In [None]:
pr_ranks = []
for cluster in range(df_clu.shape[0]):
  times = []

  #get proper dataset
  df_row = extract_proper_row(df_clu['clu_elements'][cluster], df[['cino', 'token_index', 'final_clu']])

  lists = []
  for i in range(df_row.shape[0]):
    lists.append(i)

  df_row['temp_index'] = lists

  data_lists = []
  for i in df_row.index:
    lists = []
    dest = df_row['temp_index'][i]
    lists.append(dest)
    lists = lists + df_row['token_index'][i]
    data_lists.append(lists)

  #implement pagerank with various block sizes and maxIter
  d_rdd = sc.parallelize(data_lists).map(lambda t: (t[0], t[1:]))
  for block_cnt in block:
    start = time.time()

    block_size = math.ceil(N/block_cnt)
    N = block_cnt*block_size
    pr = sc.broadcast([1./float(N) for _ in range(N)])
    block_based_set = d_rdd.flatMap(lambda s: make_link_set(s, block_cnt, block_size)).groupByKey().mapValues(list)

    for _ in range(maxIter):
      itr = block_based_set.flatMap(lambda b: pr_map(b, block_size, N)).reduce(lambda x, y: x+y)

      lists = []
      zeros = [0 for _ in range(N)]
      count = 0
      for i in range (int(len(itr)/2)):
        if(itr[2*i]==count):
          lists.append(itr[2*i+1])
        else:
          lists.append(zeros)
        count = count + 1

      pr_lists = []
      for element in lists:
        pr_lists = pr_lists + element

      pr = sc.broadcast(pr_lists)

    end = time.time()
    times.append(end-start)

  execution_time.append(times)
  pr_ranks.append(pr.value)

##### 4. Apply pagerank score to df['tfidf_edit']: df['fianl_score']   
    1) Define a function that updates the df['tfidf_edit'] with the PageRank score
    2) Update the df['final_clu'] with new index from clu_dict
    3) Make a list of (token, pr_edit_score)tuples dictionary for applying PageRank score to data

In [None]:
def pr_update (tfidf_sum_list, weight_dict):
    results = []
    for i in range(len(tfidf_sum_list)):
      temp = list(tfidf_sum_list[i])
      temp[1] = temp[1] * weight_dict[i][1]
      token_result = tuple(temp)
      results.append(token_result)
    return results

In [None]:
#update final cluster with new index from clu_dict
clu_dict = {old_clu:i for i, old_clu in enumerate(df_clu['final_clu'])}

lists = []
for element in df['final_clu']:
  lists.append(clu_dict[element])
df['final_clu'] = lists

In [None]:
#make a list of (token, pr_edit_score) tuples dictionary
#pr_edit_score: pr_score*100 + 1

pr_dict = {v:k for k,v in token_dict.items()}
df_pr = pd.DataFrame(pr_ranks)

#clu_prs, lists is just for seeing the actual score of pagerank
clu_prs = []
edit_update = []
for element in df_pr.iloc:
  lists = []
  edit_lists = []
  for i in range(len(pr_dict)):
    if element[i] > min(element):
      lists.append((pr_dict[i], element[i]))
      edit_lists.append((pr_dict[i], element[i]*1000 + 1))
  clu_prs.append(lists)
  edit_update.append(dict(edit_lists))

In [None]:
#trans
final_score = []
for i in range(df.shape[0]):
  w_lists = []
  for token in df['compound_nouns'][i]:
    if token in edit_update[df['final_clu'][i]]:
      w_lists.append((token, edit_update[df['final_clu'][i]][token]))
    else:
      w_lists.append((token, 1))
  final_score.append(pr_update(df['tfidf_edit'][i], w_lists))

df['final_score'] = final_score

#### Output

In [None]:
k_lists = []
for element in df['final_score']:
    k_lists.append(sorted(element, key =lambda t: t[1], reverse=True))
df['sorted_token'] = k_lists

In [None]:
lists = []
c_lists = []
for i in range(df.shape[0]):
    if (len(df.iloc[i]['sorted_token'])>10):
        for j in range(10):
            c_lists.append(df.iloc[i]['cino'])
            lists.append(df.iloc[i]['sorted_token'][j][0])
    elif(len(df.iloc[i]['sorted_token'])>0):
        for j in range(len(df.iloc[i]['sorted_token'])):
            c_lists.append(df.iloc[i]['cino'])
            lists.append(df.iloc[i]['sorted_token'][j][0])
    else:
        continue

In [None]:
df_result = pd.DataFrame(c_lists)
df_result['aa'] = lists
df_result.columns = ['cino', 'keyword']

df_result.to_csv('final_keyword.csv')

In [None]:
df_result

In [None]:
df

In [None]:
df.to_csv('test_pr_result.csv')