In [None]:

import json
import glob
import re
import nltk
import emoji
import demoji
import os
from nltk.tokenize import word_tokenize
from nltk.stem.wordnet import WordNetLemmatizer
from pattern.text.en import singularize
import pandas as pd
import urllib.request
import numpy as np
from PIL import Image
from wordcloud import WordCloud, ImageColorGenerator
import matplotlib.pyplot as plt
from nltk.corpus import wordnet
from collections import Counter
from transformers import AutoProcessor, AutoModelForCausalLM
from PIL import Image

In [None]:
class Preprocessing:
    def __init__(self, curse_words_path, contractions_path, user_info_path) -> None:
        self.__contractions = self.__load_contractions(contractions_path)
        self.__curse_words = self.__load_curse_words(curse_words_path)
        self.__stop_words = self.__load_stopwords()
        self.__word_dictionary = self.__load_word_dict()
        self.__user_info = self.__load_user_info(user_info_path)

        pass

    def __load_contractions(self, path):
       return json.loads(open(path, "r").read())

    def __load_curse_words(self, path):
      curse_words = open(path, 'r')
      return curse_words.read()

    def __load_stopwords(self):
       return nltk.corpus.stopwords.words("english")

    def __load_word_dict(self):
      return set(nltk.corpus.words.words())

    def __load_user_info(self, path):
      return pd.read_csv(path)

    def __get_wordnet_pos(self, word):
        """Map POS tag to first character lemmatize() accepts"""
        tag = nltk.pos_tag([word])[0][1][0].upper()
        tag_dict = {
            "J": wordnet.ADJ,
            "N": wordnet.NOUN,
            "V": wordnet.VERB,
            "R": wordnet.ADV
        }
        # Noun is the default if the tag is not found
        return tag_dict.get(tag, wordnet.NOUN)
    
    def __plot_word_frequency(self, texts):
        combined_text = ' '.join(texts)
        words = combined_text.split()
        word_freq = Counter(words)
        most_common_words = word_freq.most_common(20)
        words = [word for word, _ in most_common_words]
        frequencies = [freq for _, freq in most_common_words]
        colors = plt.cm.viridis(np.linspace(0, 1, len(words)))

        # Plot the histogram
        plt.figure(figsize=(10, 6))
        plt.bar(words, frequencies, color=colors)
        plt.xlabel('Words')
        plt.ylabel('Frequency')
        plt.title('Word Frequency Histogram (Top 20)')
        plt.xticks(rotation=90)
        plt.show()


    def __clean_tweet(self, tweet, label):
        text = re.sub(r"[^\w\s']|_", "", tweet['text'])
        text = re.sub(r"(?<!\w)'|'(?!\w)", "", text)

        # Remove links
        text = re.sub(r'http\S+|www\S+|\S+\.com\S+', '', text)
        # Remove user mentions with @
        text = re.sub(r'@\w+', '', text)
        text = re.sub(r'^[Rr][Tt]\s+', '', text)

        for word in text.split():
          if word.lower() in self.__contractions:
            text = text.replace(word, self.__contractions[word.lower()])

        # tokenizing tweet
        tokenized_text = word_tokenize(text)
        i = 0
        while i < len(tokenized_text):
          tokenized_text[i] = tokenized_text[i].lower()
          # changing verbs to its base form
          tokenized_text[i] = WordNetLemmatizer().lemmatize(tokenized_text[i], self.__get_wordnet_pos(tokenized_text[i]))
          # changing plural nouns to their singular forms
          tokenized_text[i] = singularize(tokenized_text[i])

          # removing all single chars from tweets except "I"
          if len(tokenized_text[i]) == 1 and \
                  tokenized_text[i] != 'i':
            del tokenized_text[i]
          elif tokenized_text[i].lower() == 'thi':
            del tokenized_text[i]
          # removing stopwords
          elif tokenized_text[i] in self.__stop_words:
            del tokenized_text[i]
          # removing all meaningless texts
          elif tokenized_text[i].lower() not in self.__word_dictionary and\
                  tokenized_text[i].lower() not in self.__curse_words:
            del tokenized_text[i]
          else:
            i += 1
        return [label, ' '.join(tokenized_text)]

    def __check_folder_exists(self, folder_name):
      if not os.path.isdir(folder_name):
          os.makedirs(folder_name)

    def __save_images(self, url, file_name, label, data_set):
      try:
          if label == 0:
            folder_name = str(label) + data_set
          elif label == 1:
            folder_name = str(label) + data_set
          elif label == None:
            folder_name = str(label) + data_set
          else:
            print('invalide label!')
            return

          self.__check_folder_exists(folder_name)
          file_name = file_name + '.jpg'
          full_filename = os.path.join(folder_name, file_name)
          # downloading image from the link
          urllib.request.urlretrieve(url, full_filename)
          return True
      except BaseException as err:
          return False

    def download_shen_images(self, path, label):
        data_set = ""
        if(label == 1):
          data_set = "shen_postive"
        else:
          data_set = "shen_negative"
        files = glob.glob(path, recursive=True)
        for file in files:
            with open(file) as f:
                json_file = json.load(f)
                if 'media' in json_file['entities']:
                    for media in json_file['entities']['media']:
                        self.__save_images(media['media_url_https'],
                                          media['id_str'], label, data_set=data_set)

    def download_clpsych_images(self, path):
      example_usernames = [x.split('.')[0] for x in os.listdir(path)]

      def tweet_processor(x):
          return json.loads(x)
      counter = 0
      for username in example_usernames:
          IN = open(path + username + '.tweets')
          tweets = filter(None, map(tweet_processor, IN))
          for tweet in tweets:
              label = label = self.__user_info.loc[self.__user_info['anonymized_screen_name']
                                                  == tweet['user']['screen_name']]
              # print(label)
              screen_name = label.iloc[0][0]
              label = label.iloc[0][4]
              data_set = ""
              if(label == 1):
                data_set = "clp_postive"
              else:
                data_set = "clp_negative"
                
              if 'media' in tweet['entities'].keys():
                  self.__save_images(tweet['entities']['media'][0]
                                    ['media_url_https'], screen_name + str(counter), label, data_set=data_set)
                  counter = counter + 1

    def extract_emojis(self, tweet):
        if tweet['lang'] == 'en':
          all_emojis = ''.join(
              c for c in tweet['text'] if c in emoji.distinct_emoji_list(tweet['text']))
          emojis_meaning = demoji.findall(all_emojis)
          return emojis_meaning

    def __combine_tweets(self, p_tweets, n_tweets):
        all_tweets = pd.concat([p_tweets, n_tweets])
        all_tweets = all_tweets.sample(frac=1)
        return all_tweets

    # visualizing tweets

    def __visualise_tweets(self, tweet_list):
        tweet_list = ' '.join(tweet_list)
        Mask = np.array(Image.open('./Twitter-PNG-Image.png'))
        image_colors = ImageColorGenerator(Mask)
        wc = WordCloud(background_color='black', height=1500,
                      width=4000, mask=Mask).generate(tweet_list)
        plt.figure(figsize=(10, 10))
        plt.imshow(wc.recolor(color_func=image_colors),
                  interpolation="hamming")
        plt.axis('off')
        plt.show()

    def __load(self, path, label, dataset):
      non_cleaned_tweets = []
      tweets_list = []
      tweets_emojis = []
      if dataset == 'shen':
        # loading file from given path
        files = glob.glob(path, recursive=True)
        # print(len(files))
        for file in files:
          with open(file) as f:
            tweet = json.load(f)
            if tweet['lang'] == 'en':
              non_cleaned_tweets.append(tweet['text'])
              tweet_text = self.__clean_tweet(tweet, label)
              if len(tweet_text) > 1:
                tweets_list.append(tweet_text)
              tweet_emojis = self.extract_emojis(tweet)
              if len(tweet_emojis) > 0:
                temp = [label, ' '.join(list(tweet_emojis.values()))]
                tweets_emojis.append(temp)

      if dataset == 'clpsych':
        example_usernames = [x.split('.')[0] for x in os.listdir(path)]

        def tweet_processor(x):
            return json.loads(x)
        j = 0
        print('Loading tweets...')
        for username in example_usernames:
            j = j + 1
            # print(username + ' and umber is ',j)
            IN = open(path + username + '.tweets')
            tweets = filter(None, map(tweet_processor, IN))
            tweet_counter = 0
            for tweet in tweets:
                if tweet_counter == 100:
                  break
                tweet_counter += 1
                if tweet['lang'] == 'en':
                  label = self.__user_info.loc[self.__user_info['anonymized_screen_name']
                                              == tweet['user']['screen_name']]
                  label = label.iloc[0][4]
                  if label == 'ptsd' or 'condition':
                    continue
                  # print(label)
                  if label == 'control':
                    label = 0
                  if label == 'depression':
                    label = 1
                  print(tweet)
                  tweet_text = self.__clean_tweet(tweet, label)
                  if len(tweet_text[1].split(' ')) > 1:
                    tweets_list.append(tweet_text)
                    # print(tweet_text)
                  tweet_emojis = self.extract_emojis(tweet)
                  if len(tweet_emojis) > 0:
                    temp = [label, ' '.join(list(tweet_emojis.values()))]
                    # print(temp)
                    tweets_emojis.append(temp)

      print('Before pre-processing')              
      self.__visualise_tweets(non_cleaned_tweets)
      print('Before pre-processing')
      self.__plot_word_frequency(non_cleaned_tweets)
      tweets_list = pd.DataFrame(tweets_list, columns=['Label', 'Tweet'])
      print('After pre-processing')
      self.__visualise_tweets(list(tweets_list['Tweet']))
      print('After pre-processing')
      self.__plot_word_frequency(list(tweets_list['Tweet']))
      tweets_emojis = pd.DataFrame(tweets_emojis, columns=[
                                  'Label', 'EmojisMeaning'])
      print(tweets_list)
      return tweets_list, tweets_emojis

    def load_tweets_shen(self, p_labeled_path, n_labeled_path):
      print('Depressed users tweets')
      p_tweets, p_emojis = self.__load(p_labeled_path, 0, 'shen')
      print('Non-depressed users tweets')
      n_tweets, n_emojis = self.__load(n_labeled_path, 1, 'shen')
      all_tweets = self.__combine_tweets(p_tweets, n_tweets)
      all_emojis = self.__combine_tweets(p_emojis, n_emojis)
      all_tweets.to_csv('shen_all_tweets.csv')
      all_emojis.to_csv('shen_all_emojis.csv')
      return all_tweets, all_emojis

    def load_tweets_clpsych(self, path):
        all_tweets, all_emojis = self.__load(path, None, 'clpsych')
        all_tweets.to_csv('clpsych_all_tweets')
        all_emojis.to_csv('clpsych_all_emojis')
        return all_tweets, all_emojis

In [None]:
class GIT_Caption:
    def __init__(self) -> None:
        self.processor = AutoProcessor.from_pretrained(
            "microsoft/git-base-coco")
        self.model = AutoModelForCausalLM.from_pretrained(
            "microsoft/git-base-coco")

    def __extract_caption(self, imgs_path):
        generated_captions = []
        for img in imgs_path:
            image = Image.open(img)

            pixel_values = self.processor(
                images=image, return_tensors="pt").pixel_values

            generated_ids = self.model.generate(
                pixel_values=pixel_values, max_length=50)
            generated_captions.append(self.processor.batch_decode(
                generated_ids, skip_special_tokens=True)[0])

        return generated_captions

    def generate_captions(self, imgs_path, label):
        imgs = glob.glob(imgs_path, recursive=True)
        captions = self.__extract_caption(imgs)
        labels = np.full(len(imgs), label)
        data = {'Label': labels, 'Captions': captions}
        captions_df = pd.DataFrame(data)
        return captions_df

In [None]:
pr = Preprocessing('./curse_words.txt', './contractions.json',
                   './anonymized_user_info_by_chunk.csv')
pr.load_tweets_clpsych('./0/')
print('done')

In [None]:
cap_generator = GIT_Caption()
cap_generator.generate_captions("./shen_positive")
cap_generator.generate_captions("./shen_negative")
cap_generator.generate_captions("./clp_positive")
cap_generator.generate_captions("./clp_negative")