-
Notifications
You must be signed in to change notification settings - Fork 1
/
pos_tagging.py
164 lines (149 loc) · 7.04 KB
/
pos_tagging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import nltk
from nltk.tokenize import word_tokenize
import os
from os import path as osp
import pandas as pd
from config import TWEET_FOLDER_NAMES
from collections import Counter
unify_pos = {"NN":"Noun", "NNS":"Noun", "NNP":"Noun",
"VB":"Verb", "VBD":"Verb", "VBP":"Verb", "VBN":"Verb", "VBG":"Verb", "VBZ":"Verb",
"RB":"Adverb", "RBR":"Adverb", "RBS":"Adverb",
"RP":"Particle",
"PRP$":"Pronoun","PRP":"Pronoun",
"JJ":"Adj", "JJR":"Adj", "JJS":"Adj",
"SYM":"Symbol",
"CC":"Conjunction","IN":"Conjunction",
"CD":"Num",
"DT":"Det","PDT":"Pre-determiner",
"FW":"Foreign",
"":"na","''":"na",
}
def get_unified_pos(pos):
if pos in unify_pos:
return unify_pos[pos]
return pos
def get_pos(sentence,word=""):
if type(sentence) != str:
return "na"
sentence = sentence.lower()
text = word_tokenize(sentence)
POS = nltk.pos_tag(text)
POS_w = [pos for (wrd, pos) in POS if wrd == word]
if len(POS_w) == 0:
return "na"
return POS_w[0]
def update_top_pos_df(top_pos_df, word_pos_counter, curr_word, save_path):
most_common_pos = max(word_pos_counter, key=lambda x: word_pos_counter[x])
top_pos_df["word"].append(curr_word)
top_pos_df["most_common_pos"].append(most_common_pos)
curr_pos_df = pd.DataFrame(top_pos_df)
curr_pos_df.to_csv(osp.join(save_path, "most_common_pos.csv"))
return top_pos_df
def update_pos_df(pos_df, word_pos_counter, curr_word, save_path):
pos_df["word"].append(curr_word)
for k in pos_df.keys():
if k != "word":
pos_df[k].append(word_pos_counter[k])
curr_pos_df = pd.DataFrame(pos_df)
curr_pos_df.to_csv(osp.join(save_path, "all_pos.csv"))
return pos_df
def save_pos_tags(save_path, tweet_path):
for type in ["slang","nonslang"]:
for time in ["old","new"]:
top_pos_df = {"word": [],
"most_common_pos": [],
}
pos_df = {"word": [], "Noun": [], "Verb": [], "Adverb": [], "Adj": [], "Det": [],
"Particle": [], "Num": [], "Symbol": [], "Foreign": [], "Conjunction": [],
}
save_path = osp.join(save_path, time, type)
tweets_folder = osp.join(tweet_path, TWEET_FOLDER_NAMES[type][time])
print("getting tweets from", tweets_folder)
for tweets_file in sorted(os.listdir(tweets_folder)):
tweets = pd.read_csv(osp.join(tweets_folder, tweets_file))
if len(tweets) == 0:
continue
curr_word = tweets.word[0].lower()
tweets["POS"] = tweets["text"].apply(lambda x : get_pos(x,word=curr_word))
tweets["POS_unified"] = tweets["POS"].apply(get_unified_pos)
word_pos_counter = Counter(tweets.POS_unified)
top_pos_df = update_top_pos_df(top_pos_df=top_pos_df,curr_word=curr_word,word_pos_counter=word_pos_counter,save_path=save_path)
pos_df = update_pos_df(pos_df=pos_df,curr_word=curr_word,word_pos_counter=word_pos_counter, save_path=save_path)
def count_tweets_per_word(tweet_path):
"""
Count how many tweets there are per word
"""
tweets_per_word = {}
for time in ["old", "new"]:
tweets_per_word[time] = Counter()
for type in ["slang", "nonslang"]:
tweets_folder = osp.join(tweet_path, TWEET_FOLDER_NAMES[type][time])
print("getting tweets from", tweets_folder)
for tweets_file in sorted(os.listdir(tweets_folder)):
tweets = pd.read_csv(osp.join(tweets_folder, tweets_file))
if len(tweets) == 0:
continue
curr_word = tweets.word[0].lower()
num_tweets = len(tweets["text"])
tweets_per_word[time][curr_word] = num_tweets
return tweets_per_word
def analyse_pos_tags(save_path, causal_data_path):
causal_data = pd.read_csv(causal_data_path)
pos_accross_types = { }
for type in ["slang", "nonslang"]:
words = causal_data.word[causal_data.type == type]
print(len(words),type,"words")
for time in ["old", "new"]:
df_folder_path = osp.join(save_path, time, type)
df_file_path = osp.join(df_folder_path, "most_common_pos.csv")
df = pd.read_csv(df_file_path)
most_common = Counter(df.most_common_pos[df.word.isin(words)])
pos_accross_types[type + "_" + time] = most_common
print(pos_accross_types)
def sum_pos(x, min=10):
"""
Sum the number of times a pos tag was given in 2010, encoded by x[0]
with the number of times it was given in 2020, encoded by x[1]
Only consider the tag if it appeared more than min times in both periods
If it hadn't appeared at least min times in either period, return 0
"""
pos_tag_sum = x[0] + x[1]
has_appeared_min_times = ((x[0]>min) and (x[1]>min))
return pos_tag_sum*int(has_appeared_min_times)
def pos_for_causal(tweets_per_word, save_path, combine=True, minimum=10, percent=False):
pos_tags = {"old": {}, "new": {}}
for word_type in ["slang", "nonslang"]:
for time in ["old", "new"]:
df_folder_path = osp.join(save_path, time, word_type)
df_file_path = osp.join(df_folder_path, "all_pos.csv")
df = pd.read_csv(df_file_path)
pos_tags[time][word_type] = df
df_old_slang = pos_tags["old"]["slang"]
df_old_nonslang = pos_tags["old"]["nonslang"]
df_new_slang = pos_tags["new"]["slang"]
df_new_nonslang = pos_tags["new"]["nonslang"]
df_slang = pd.merge(df_new_slang, df_old_slang, on="word")
POS = ["Noun", "Verb", "Adverb", "Adj"]
for pos in POS:
if combine: df_slang[pos] = df_slang[pos + "_x"] + df_slang[pos + "_y"]
else: df_slang[pos] = df_slang[[pos + "_x", pos + "_y"]].apply(lambda x : sum_pos(x, minimum),
axis=1)
df_slang['most_common'] = df_slang[POS].idxmax(axis=1)
df_nonslang = pd.merge(df_new_nonslang, df_old_nonslang, on="word")
for pos in POS:
if combine: df_nonslang[pos] = df_nonslang[pos + "_x"] + df_nonslang[pos + "_y"]
else: df_nonslang[pos] = df_nonslang[[pos + "_x", pos + "_y"]].apply(lambda x : sum_pos(x, minimum),
axis=1)
df_nonslang['most_common'] = df_nonslang[POS].idxmax(axis=1)
df_all = pd.concat([df_slang, df_nonslang])
df_all["num_tweets"] = df_all["word"].apply(
lambda wrd: tweets_per_word["old"][wrd] + tweets_per_word["new"][wrd])
if combine: MIN = minimum
else: MIN = 0
for pos in POS:
if percent:
pc = MIN/100
df_all[pos+"_binary"] = df_all[[pos,"num_tweets"]].apply(
lambda x: int(x[0] > pc*x[1]), axis=1)
else: df_all[pos + "_binary"] = df_all[pos].apply(lambda x: x > MIN)
return df_all