<h1 align="center">MDSC-302 Assignment</h1>
<h3 align="right">Registration number: 20233</h3>

# Sustainable Development Goals

In [1]:
# Import required libraries
import numpy as np
import pandas as pd
import PyPDF2
import re
import json
import nltk
from nltk.corpus import stopwords 
from nltk.tokenize import word_tokenize 
from typing import Any, Iterable, List, Tuple, Union, NoReturn
import ipywidgets as widgets
from IPython.display import display, clear_output

In [2]:
# Downlading nltk stopwords and punkt 
nltk.download("stopwords")
nltk.download("punkt")
print()




[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\gunta\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\gunta\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [3]:
# Enabling extensions
#!jupyter nbextension enable --py widgetsnbextension --sys-prefix
#!jupyter serverextension enable voila --sys-prefix

In [4]:
# n-gram matcher -(From OSDG github)
class NgramMatcher:
    def __init__(self,
                 ngrams: Iterable[str],
                 lowercase: bool = True,
                 token_pattern: str = r'(?u)\b\w+\b',
                 ngram_size: Union[int, Tuple[int, int]] = (1, 1)):
        r"""
        Ngram matcher and counter.

        Parameters
        ----------
        ngrams : Iterable[str]
            List of ngrams to match.
        lowercase : bool, by default True
            Lowercases all characters.
        token_pattern : str, by default r'(?u)\b\w+\b'
            Regex expression that designates a token.
        ngram_size : Union[int, Tuple[int, int]], by default (1, 1)
            Minimum and maximum boundaries for token amount per ngram.
            If integer is provided, it represents both minimum and maximum boundaries.
        """
        self.__validate_ngrams(ngrams)
        self.ngrams = np.array(ngrams)

        self.lowercase = lowercase
        if lowercase:
            self.ngram_index_map = {ngram.strip().lower(): idx for idx, ngram in enumerate(ngrams)}
        else:
            self.ngram_index_map = {ngram.strip(): idx for idx, ngram in enumerate(ngrams)}

        self.token_pattern = token_pattern
        self.__token_pattern = re.compile(token_pattern)
        if self.__token_pattern.groups > 1:
            raise ValueError(f'Too many groups in ngram pattern : {self.__token_pattern.groups}')

        if isinstance(ngram_size, int):
            self.ngram_size = (ngram_size, ngram_size)
        elif isinstance(ngram_size, tuple) and len(ngram_size) == 2:
            self.ngram_size = ngram_size
        else:
            raise ValueError(f'Expected int or tuple of length 2 for argument ngram_size. Got {type(ngram_size)}.')


    @staticmethod
    def __validate_ngrams(ngrams) -> NoReturn:
        if isinstance(ngrams, str) or not hasattr(ngrams, '__iter__'):
            raise ValueError('Terms must be iterable.')
        if len(ngrams) == 0:
            raise ValueError('Empty terms passed.')
        if len(ngrams) != len(set(ngrams)):
            raise ValueError('Terms contain duplicate entries.')
        try:
            _ = ''.join(ngrams)
        except TypeError:
            raise TypeError('Terms contain non str type values.')


    @staticmethod
    def __validate_documents(documents: Any) -> NoReturn:
        if isinstance(documents, str) or not hasattr(documents, '__iter__'):
            raise TypeError('Iterable of strings is expected.')
        if any(not isinstance(doc, str) for doc in documents):
            raise TypeError('Documents contain non str values.')


    def _generate_ngrams(self, document: str) -> List[str]:
        """
        Extracts ngrams from text.

        Parameters
        ----------
        document : str
            Text

        Returns
        -------
        List[str]
            List of generated ngrams.
        """
        if self.lowercase:
            document = document.lower()
        tokens = self.__token_pattern.findall(document)

        min_n, max_n = self.ngram_size
        if max_n == 1:
            return tokens

        if min_n == 1:
            ngrams = list(tokens)
            min_n += 1
        else:
            ngrams = list()

        n_tokens = len(tokens)

        for k in range(min_n, min(max_n + 1, n_tokens + 1)):
            for j in range(n_tokens - k + 1):
                ngrams.append(
                    ' '.join(tokens[j:j+k])
                )

        return ngrams


    def _match_ngrams(self, documents: Iterable[str]) -> List[Tuple[List[int], List[int]]]:
        """
        Matches ngrams to texts.
        For each document:
          1. Converts document into tokens
          2. Generates ngrams of size defined in ngram_size
          3. Crossreferences ngrams with matching ngrams
          > Counts each ngram frequency

        Parameters
        ----------
        documents : Iterable[str]
            List of texts.

        Returns
        -------
        List[Tuple[List[int], List[int]]]
            Each element is a tuple.
              - index 0 contains ngram indices List[int]
              - index 1 contains ngram frequencies List[int]
        """
        ngrams = []

        ngram_index_map = self.ngram_index_map

        self.__validate_documents(documents)
        for document in documents:
            ngram_counts = dict()
            for ngram in self._generate_ngrams(document):
                try:
                    idx = ngram_index_map[ngram]
                    try:
                        ngram_counts[idx] += 1
                    except KeyError:
                        ngram_counts[idx] = 1
                except KeyError:
                    continue

            ngrams.append((list(ngram_counts.keys()),
                           list(ngram_counts.values())))

        return ngrams


    def match(self, documents: Iterable[str]) -> List[Tuple[List[int], List[int]]]:
        """
        Matches ngrams to texts.

        Parameters
        ----------
        documents : Iterable[str]
            List of texts.

        Returns
        -------
        List[Tuple[List[int], List[int]]]
            Each element is a tuple.
              - index 0 contains ngram indices List[int]
              - index 1 contains ngram frequencies List[int]
        """
        ngrams = self._match_ngrams(documents)
        return ngrams


In [5]:
# Loading fos_ids and fos_names 
fos_ids = np.load('fos_ids.npy', allow_pickle=True)
fos_names = np.load('fos_names.npy', allow_pickle=True)

# creating object for n-gram matcher
ngram_matcher = NgramMatcher(fos_names,
                             lowercase=True,
                             token_pattern=r'(?u)\b\w+\b',
                             ngram_size=(1, 4))

In [6]:
# function for mapping fos_names to fos_ids
def mapping_ids(text):
    idxs, frequencies = ngram_matcher.match([text])[0]
    ngrams = sorted(zip(fos_ids[idxs], fos_names[idxs], frequencies), key=lambda ng: len(ng[1]), reverse=True)
    descored_ngrams = list()

    for idx, (ngram_id, ngram_name, frequency) in enumerate(ngrams):
        for _, fol_ngram_name, fol_frequency in ngrams[:idx]:
            if ngram_name in fol_ngram_name:
                frequency -= fol_frequency
        
        if frequency > 0:
            descored_ngrams.append([ngram_id, ngram_name, frequency])
        
    ngrams = descored_ngrams
    submerged_ngrams, drop_ngram_ids = list(), set()
    
    for idx, (ngram_id, ngram_name, frequency) in enumerate(ngrams):
        for ngram_id2, ngram_name2, frequency2 in ngrams[idx+1:]:
            if ngram_name2 in ngram_name:
                frequency += frequency2
                drop_ngram_ids.add(ngram_id2)
        
        submerged_ngrams.append([ngram_id, ngram_name, frequency])
    submerged_ngrams = list(filter(lambda ng: ng[0] not in drop_ngram_ids, submerged_ngrams))

    return {fos_id: frequency for fos_id, _, frequency in submerged_ngrams}

In [7]:
# getting mappings of sdg and fos_ids
with open('OSDG-mapping.json', 'r') as file_:
    mapping = [(sdg, set(fos_ids)) for sdg, fos_ids in json.load(file_).items()]

with open('OSDG-fosmap.json', 'r') as file_:
    fosmap = json.load(file_)

In [8]:
# function to preprocess the text
def preprocessText(text):
    text = text.lower()
    text = re.sub(r'\d+', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    text = text.strip()
    stop_words = set(stopwords.words('english'))
    word_tokens = word_tokenize(text)
    filtered_sentence = [] 
  
    for w in word_tokens: 
        if w not in stop_words: 
            filtered_sentence.append(w)
    text_clean = (' '.join(filtered_sentence))
    return text_clean

In [9]:
# function for mapping fos_ids to sdg's
def sdgs_imp(fos):
    sdgs = []
    fos_ids = fos.keys()
    
    for sdg, sdg_fos_ids in mapping:
        importance_fos_ids = sdg_fos_ids.intersection(fos_ids)
        
        if len(importance_fos_ids) >= 1:
            importance = 0      
            for fos_id in importance_fos_ids:
                importance += fos.get(fos_id)
                
            sdgs.append({'SDG': sdg,
                         'Importance': float(importance),
                         'keywords': list(map(lambda fos_id: fosmap[fos_id], importance_fos_ids))})
    
    return sorted(sdgs, key=lambda x: x['Importance'], reverse=True)

In [10]:
# function for getting all sdg's
def allsdg(sdgs):
    if sdgs:
        for sdg in sdgs:
            print(sdg)
    else:
        print("No SDG's Found")

In [11]:
def pdf_text(path):
    text = ""
    try:
        pdfFileObj = open(path, 'rb')
        pdfReader = PyPDF2.PdfFileReader(pdfFileObj)
        for page in range(0,pdfReader.numPages):
            text = text + pdfReader.getPage(page).extractText()
        pdfFileObj.close()
    except:
        return "File not exsists"
    return text

In [12]:
# widget for uploading a text file
uploader = widgets.FileUpload(multiple=False, description = 'Upload text file')
pdf_file = widgets.Text(style={'description_width': 'initial'})

## Upload a text file 

In [13]:
display(uploader)

FileUpload(value={}, description='Upload text file')

## Give pdf file path

In [14]:
display(pdf_file)

Text(value='', style=DescriptionStyle(description_width='initial'))

In [15]:
# widget for getting top sdg
button_send = widgets.Button(
                description='Submit',
                tooltip='Send',
                style={'description_width': 'initial'})

# widget for output
output = widgets.Output()

# function to take action for widget button1
def on_button_clicked(event):
    with output:
        clear_output()
        
        if(uploader.value !={}):
            print("SDG's from Text:")
            allsdg(sdgs_imp(mapping_ids(preprocessText(str(uploader.value)))))
        else:
            print("Text file not found\n")
        if(pdf_file.value != ''):
            print("SDG's from Pdf:")
            allsdg(sdgs_imp(mapping_ids(preprocessText(pdf_text(pdf_file.value)))))
        else:
            print("\nPDF not found")
# Actions for corresponding buttons
button_send.on_click(on_button_clicked)

# Displaying the buttons
display(button_send)
display(output)

Button(description='Submit', style=ButtonStyle(), tooltip='Send')

Output()