# Swahili Phrases Counting
In this assignment, instead of calculating the word count as we did, we calculate count of phrases. 
In **Natural Language Processing(NLP)**, when we combine 2 or more words to make phrases, theye are called ```n-grams```. For example, a 2-gram is a two word term. 

## About the dataset
We will use a Swahili news articles datast from [Zindi](https://zindi.africa/competitions/swahili-news-classification/data). We will get both the training and test dataset since we are only interested in the articles content. I already combined the train and text dataset and you can download it from our [course dataset directory].(https://drive.google.com/file/d/1iamzfWtojlYmbL3aMEFu9vOanZkHWfZN/view?usp=sharing)

## Task decription
In the classical word counting tasks, we set to generate word frequencies. However, in this task, we will write a function which generates frequencies 
on ```n-grams``` where ```n = 1, 2, 3 ....k```. 

## Tools
For Spark, refer to documentation for Spark RDD and the intro-tospark-APIs notebook. We will also use the following packages:
- [NLTK](https://www.nltk.org). A Python NLP library. Please install it.
- [Googletranslate](https://pypi.org/project/googletrans/). A Python package for Google translation

## Python setup
Import all the Python packages we need below

In [1]:
# import all the Python packages we need below
from IPython.display import Image
import pandas as pd
import numpy as np
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import findspark
import nltk
from googletrans import Translator
import time
import re

In [2]:
findspark.init()

## Inputs setup
Lets provide paths to input files we will use. 
Its a good practice to create these as global variables. Also, use Python module ```Path``` from pathlib to manage file paths. If using ```Path``` is too hard for you, you can use abolute-paths to files and folders. 

In [3]:
# Altenatively, you can put a full-path to wheree your data is located like below
# DATA_DIR = Path(full-path-folder-where-you-are-keeping-data)
DATA_DIR = Path().cwd().parents[0].joinpath("rD7R72ki9_Assignment_2_BDA")

# Swahili news
SW_NEWS = DATA_DIR.joinpath('sw_short_articles.txt')

In [4]:
SW_NEWS

WindowsPath('C:/Users/GLC/Desktop/AIMS/REVIEW Course/BLOCK 4/Big Data Analysis/rD7R72ki9_Assignment_2_BDA/sw_short_articles.txt')

## Define utility functions

In [5]:
def tokenize_into_ngrams_phrase(text, n=3):
    """
    Takes a text/article, splits into sentences and then creates n-gram tokens and returns them as phrases
    Args:
    text -- the text being processed
    n -- whether to create a phrase with 1,2 3 or more words
    """
    # Remove trailing spaces from the text
    text =text.strip()
    
    # Split the text into sentences using NLTK function 
    # sent_tokenize()
    # convert the text into lower case when you this
    sentences = nltk.sent_tokenize(text.lower())
    
    # create a list to hold all phrases from this text string
    phrases = []
    
    # Loop through all sentences and do the following
    # 1. use NLTK ngrams() function to split each sentence into
    # n-grams
    # 2. For each n-gram, convert it into a list and create a string 
    # where words are separated by space
    # 3. Add that string to the list above
    # ~ 5 lines of code
    for i in sentences:
        ngrm=nltk.ngrams(i.split(), n)
        list_ngrm=list(ngrm)
        strg=[' '.join(i) for i in list_ngrm ]
        phrases.append(strg)
   
    # Return all phrases as a single string by concatening them with comma as a separator
    # using string method like this: ",".join(list)     ",".join(phrases) 
    return [','.join(j) for j in phrases]

In [6]:
def quick_clean_up(phrase, phrase_min_len=3): #10
    """
    Provide a label 1-to remove this row because phrase is too short or contains just numbers or special chacters or 0 when its okay.
    Args:
    phrase -- Target phrase
    phrase_min_len --  Keep only phrases with this length or longer
    
    returns: 1
    """
    # Replace pass with your code
    if len(phrase) < phrase_min_len or phrase.isalnum() or phrase.isnumeric():
        return False
    else:
        return True

In [7]:
def filter_rows(line):
    """
    Used to drop rows in the file we dont need. For example, first row is header, we skip it.
    We use this function with the Spark RDD filter function. As such, we need to return 
    True or False based on things we need.
    Args:
    line -- a single line of text in the target RDD
    """
    # 1. Split line using the delimiter used
    # 2. Retrieve the text which contains the article
    # 3. Skip the header 
    # 4. Skip very short texts (e.g., those with length < 100)
    # You can wrap your code in try-except-block
    # so that if things go wromg, your code can still run
    # replace pass with your code
    try:
        col_items = line.split("\t")
        if col_items[2] != "content" and len(line)>100:
            return True
        else:
            return False
    except:
        return False

In [8]:
def split_line(line):
    """
    Split the line which has a single string
    into a list where each element represents a
    column
    """
    # Split the line
    col_items = line.split("\t")
    
    # We need only the colum with the text
    col_items = col_items[2]
    
    try:
        return str(col_items)
    except:
        return 'NaN'

### Creating a word count function
In the function ```run_phrase_count_on_rdd()``` below, 
the main ingredient is the piece of code where you apply several transformations on the RDD to generate the phrase counts. The main steps you will follow include the following (not necessarily in this same order):
- **filter**. You need to filter the RDD so that you skip the header and other bad rows
- **map**. You will need more than 1 ```map``` operations to apply functions to each line of the RDD to split it 
into columns and convert each line of text into n-grams using NLTK functions. 
- **flatMap**. At some point, you will need to do flatMap to flatten the RDD in preparation for reduce. Please refer to ```introducing-spark-APIs``` notebook.
- **reduce or reduceByKey**. You decide whether to use ```reduce``` or ```reduceByKey```. Please refer to Spark RDD API documentation and ```introducing-spark-APIs``` notebook for details.

#### Hints
1. Before you run this function, make sure you have tested your code to compute phrase count outside the function.
2. When computing word counts, although you can chain the operators together, its recommended to compute one transformation at a time and verify that its working before chaining many operations together.
3. Note that you will pass the ```function tokenize_into_ngrams_phrase(text, n=3)``` above to one of the ```map``` or ```flatMap``` operations explaine above. Please test and esnure that the ```tokenize_into_ngrams_phrase(text, n=3)``` is working before you use it in map.
4. In order to explore the data and test things, feel free to load the data into a pandas DataFrame.
5. Expected number of columns in the output CSV file is 3: ```phrase, count and phrase_en``` and grading process will check this.

#### Converting texts into n-grams
Again, an n-gram is just a phrase with n-words. In this exercise, I recommend we use NLTK to generate n-grams without much effort. For this, you will need to import module/function ```ngrams``` from NLTK and another function ```sent_tokenize``` from 
```nltk.tokenize``` module. Please explore the documentation to understand how to use these functions. 

#### Cleaning up phrases 
During tokenization, we are not performing any preprocessign as required in NLP. As such, we end up with nonsensical and noisy phrases such as ```0000``` or ```blank``` string. We need to remove these from our results. Please feel free to use any approach to remove these. 
However, I have added a function called ``` quick_clean_up()``` which you can use/complete utilizing phrase length to do this clean-up.

#### Translating to English
The phrases are in Swahili.  However, your output will be a CSV file with only phrases which we translate to English. We will use ```Google translate``` for this. Please feel free to use another translaiton package if it works better. For Google-translate, the translate is quite slow, so instead of translating all phrases, we just translate the ```top-k``` occuring phrases (after clean-up). For example, translating 500 phrases is taking around 2-3 minutes for me. So, when testing set your k at some small number. Please learn how to use this package from its documentation page.

In [9]:
def run_phrase_count_on_rdd(phrase_length, out_csv, num_to_translate=100, phrase_min_len = 12):
    """
    Generate phrase count from RDD and return dataframe with 
    phrase counts and their English translations
    Args:
    phrase_length --  whether its 3-grams, 2-gram or 1-gram
    out_csv --  path to CSV to save results
    num_to_translate -- because translation takes too long, we translate only top-k phrases provided by this param
    phrase_min_len -- when cleaning up phrases, we can use length of phrase to decide which phrases to throw out
    
    """
    
    # =======================
    # LOAD DATA INTO RDD
    # ======================
    spark = SparkSession.builder.appName("intro").master("local[*]").getOrCreate()
    sc = spark.sparkContext
    text_rdd = sc.textFile(str(SW_NEWS))
    
    # =======================
    # RUN WORD COUNT
    # ======================
    # Please refer to explnation above on how to compute phrase counts
    rdd_phrase_cnt=(text_rdd.filter(filter_rows)
                    .map(split_line)
                    .flatMap(tokenize_into_ngrams_phrase)
                    .flatMap(lambda x: x.split(','))
                    .map(lambda line: re.sub(r'[^\w\s]', '', line) )
                    .map(lambda line: line.strip())
                    .filter(lambda line: quick_clean_up(line) )
                    .map(lambda x: (x,1))
                    .reduceByKey(lambda x, y: x + y))
    
    # =======================
    # CONVERT INTO DATAFRAME
    # ======================
    # Use collect() to get a list of phrases and their counts
    # and then create a Pandas DataFrame
    # please call column names 'phrase' and 'count'
    # ~ 2 lines of code
    res = rdd_phrase_cnt.collect()
    df = pd.DataFrame(res, columns=['phrase','count'])
    
    # =======================
    # QUICK CLEAN-UP
    # ======================
    # Remove noisy phrases such as "" (blank), '000' numbers  and others
    # Refer to explanation above for hints
    # ~ 1-5 lines of code depending on how you choose to solve this problem    
    
    df=df.sort_values(by="count",ascending=False).reset_index(drop=True)
    df=df.iloc[0:num_to_translate,:]
    
    # ============================================================
    # TRANSLATE TOP K-PHRASES AND SAVE THOSE PHRASES TO CSV FILE
    # ============================================================
    # translate to English only top- phrases and save the resulting
    # dataframe to CSV with the English version of the phrase in
    # a column named: 'phrase_en'
    # ~ 4-6 lines of code
    df=df.iloc[0:num_to_translate,:]
    translator=Translator()
    df['phrase_en']=[translator.translate(df.iloc[i,0], dest='en').text for i in range(df.shape[0])]
    df.to_csv(out_csv, index=False)
    
    print('Successfully saved CSV file for top-{} phrases in the Swahili news corpus'.format( num_to_translate))

## Put everything together

In [11]:
# Initialize spark and create spark-context
spark = SparkSession.builder.appName("intro").master("local[*]").getOrCreate()
sc = spark.sparkContext

# Setup parameters for phrase count
# and yu can use the time variables
# to profile your code

# Record start time using datetime.now()
start = time.time()

# phrase length, your script should work with 
# any phrase length up t0 5
n = 5

# How many phrases to translate to english
# you can start with 100
translate_to_en = 100

# When cleaning up, whats the shortest phrase you
# think is reasonable. For example, it can be n-gram * 4
min_phrase_len = 12

# path tp CSV to save top-k translated phrases
output_path = 'translated_phrases.csv'

# Finaly, run the run_phrase_count_on_rdd() function
run_phrase_count_on_rdd(n, output_path, translate_to_en, 3)
end = time.time()

# Minutes taken to run the whole thing
time_taken = (end-start)/60
print('Translating {} phrases took {} minutes'.format(translate_to_en , int(time_taken)))

Successfully saved CSV file for top-100 phrases in the Swahili news corpus
Translating 100 phrases took 1 minutes
