In [None]:
import os
import pandas as pd
from denoicer import Denoicer
from mbti_util import MbtiUtil
from mecab_wakati import MecabWakati
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from wordcloud import WordCloud

In [None]:
mecab = MecabWakati()
mbti_util = MbtiUtil()
denoice = Denoicer()
word_cloud = WordCloud(width=480, height=320, background_color="white", font_path='/usr/share/fonts/truetype/takao-gothic/TakaoGothic.ttf')

In [None]:
def read_tweets_with_before_after(isReply=False):
    path = './database/tweets_with_before_or_after.tsv'
    df = pd.read_csv(path, sep='\t')
    if isReply:
        df = df[df['tweet_text'].str.match('@(\w+) ')]
    else:
        df = df[df['tweet_text'].str.match('(?!@(\w+) )')]
    return df


df = read_tweets_with_before_after(isReply=False)
df.head(5)


In [None]:
query = "コロナ|武漢|Covid|COVID|ワクチン|パンデミック|マスク|自粛|クラスター|蔓延防止|マンボウ|まん延防止|給付金"
df_covid = df[df['tweet_text'].str.contains(f'{query}')]
df_covid = df_covid[df_covid['before_or_after'] == 'after']

In [None]:
# Bag-of-Words ベクトルを付与し各MBTIタイプの頻出単語をアウトプット
def create_bag_of_words_vector_and_output(path):
    docs = []
    for m_type in mbti_util.m_types:
        tweets = df_covid[df_covid['m_type_en'] == m_type]['tweet_text'].values
        words = []
        for tweet in tweets:
            text = denoice.normalize_text(tweet)
            words += mecab.wakati_sentence(text)
        docs.append(' '.join(words).strip())

    vectorizer = CountVectorizer()
    X = vectorizer.fit_transform(docs)
    values = X.toarray()
    feature_names = {}
    for k, v in vectorizer.vocabulary_.items():
        feature_names[v] = k
        
    outputs = []
    for m_type, value in zip(mbti_util.m_types, values):
        dic = {}
        for i, v in enumerate(value):
            dic[feature_names[i]] = v
        top20 = []
        for k, v in sorted(dic.items(), key=lambda x: x[1], reverse=True):
            if len(top20) >= 20:
                break
            if k not in denoice.stop_words:
                top20.append([k, v])
        outputs.append(f'{m_type}\n')
        for top in top20:
            outputs.append(f'\t{top[0]}: {top[1]}\n')
    outputs.append('EOF')

    if not os.path.exists('./covid-19_feature_words/bag-of-words'):
        os.makedirs('./covid-19_feature_words/bag-of-words')
    with open(path, 'w', encoding='utf8') as f:
        for o in outputs:
            f.write(o)
            
# path = 'covid-19_feature_words/bag-of-words/feature_words_top_20.txt'
# create_bag_of_words_vector_and_output(path=path)

In [None]:
# Bag-of-Words の特徴語から WordCloud を作成
def create_wordcloud_bag_of_words(path, save_path_folder):
    dic = {}
    with open(path, 'r', encoding='utf8') as f:
        lines = f.readlines()
        word_tmp = []
        values_tmp = []
        for l in lines:
            if '\t' not in l:
                if word_tmp and values_tmp:
                    for w, v in zip(word_tmp, values_tmp):
                        dic[m_type][w] = v
                m_type = l.strip()
                dic[m_type] = {}
                word_tmp = []
                values_tmp = []
            else:
                split = l.strip().replace('\t', '').replace(' ', '').split(':')
                word_tmp.append(split[0])
                values_tmp.append(float(split[1]))
    if not os.path.exists(save_path_folder):
        os.makedirs(save_path_folder)
    for m_type, d in dic.items():
        if d != {}:
            result = word_cloud.generate_from_frequencies(d)
            with open(f'{save_path_folder}/{m_type}.svg', 'w', encoding='utf-8') as svg:
                svg.write(result.to_svg())
            print(f'{m_type} completed.')

# path = 'covid-19_feature_words/bag-of-words/feature_words_top_20.txt'
# create_wordcloud_bag_of_words(path=path, save_path_folder='covid-19_feature_words/bag-of-words/word_cloud')

In [None]:
# TF-IDF ベクトルを付与し各MBTIタイプの特徴語をアウトプット
def create_tf_idf_vector_and_output(path, max_df=0.9):
    docs = []
    for m_type in mbti_util.m_types:
        tweets = df_covid[df_covid['m_type_en'] == m_type]['tweet_text'].values
        words = []
        for tweet in tweets:
            text = denoice.normalize_text(tweet)
            words += mecab.wakati_sentence(text)
        docs.append(' '.join(words).strip())

    vectorizer = TfidfVectorizer(max_df=max_df)  # 文書全体の90%以上で出現する単語は無視する
    X = vectorizer.fit_transform(docs)
    values = X.toarray()
    feature_names = vectorizer.get_feature_names()

    outputs = []
    for doc_no, vec in zip(range(len(docs)), values):
        title = mbti_util.m_types[doc_no]
        outputs.append(f'{title}\n')
        for w_id, tfidf in sorted(enumerate(vec), key=lambda x: x[1], reverse=True)[:20]:
            word = feature_names[w_id]
            outputs.append('\t{0:s}: {1:f}\n'.format(word, tfidf))
    if not os.path.exists('covid-19_feature_words/tf-idf'):
        os.makedirs('covid-19_feature_words/tf-idf')
    with open(path, 'w', encoding='utf8') as f:
        for o in outputs:
            f.write(o)

# path = 'covid-19_feature_words/tf-idf/feature_words_top_20.txt'
# create_tf_idf_vector_and_output(path=path)

In [None]:
# TF-IDF の特徴語から WordCloud を作成
def create_wordcloud_tf_idf(path, save_path_folder):
    dic = {}
    with open(path, 'r', encoding='utf8') as f:
        lines = f.readlines()
        word_tmp = []
        values_tmp = []
        for l in lines:
            if '\t' not in l:
                if word_tmp and values_tmp:
                    for w, v in zip(word_tmp, values_tmp):
                        for _ in range(int(1*float(v)/float(values_tmp[len(word_tmp)-1]))):
                            if w not in dic[m_type]:
                                dic[m_type][w] = 1
                            else:
                                dic[m_type][w] += 1
                m_type = l.strip()
                dic[m_type] = {}
                word_tmp = []
                values_tmp = []
            else:
                split = l.strip().replace('\t', '').replace(' ', '').split(':')
                word_tmp.append(split[0])
                values_tmp.append(float(split[1]))
    if not os.path.exists(save_path_folder):
        os.makedirs(save_path_folder)
    for m_type, d in dic.items():
        if d != {}:
            result = word_cloud.generate_from_frequencies(d)
            with open(f'{save_path_folder}/{m_type}.svg', 'w', encoding='utf-8') as svg:
                svg.write(result.to_svg())
            print(f'{m_type} completed.')

path = 'covid-19_feature_words/tf-idf/feature_words_top_20.txt'
create_wordcloud_tf_idf(path=path, save_path_folder='covid-19_feature_words/tf-idf/word_cloud')