# Task 1 Feature Extraction

## Instruction on how to read the report for Task 1.Feature Extraction:

I create two local folders called `train_combined` and `test_combined` to store files that are from train test (include positive and negative) and test set (e.g. positive and negative); the train and test set are provided on `hdfs://hadoop.cdms.westernsydney.edu.au:9000/users/ugbigdata/Hadoop/imdb/tinyversion`.


I create a remote folder called `train_test` which stores both filenames existed in train folder and test folder of the provided data (`tinyversion).

I create a Hadoop mapreducer to calculate the TF-IDF value for every file in `train_test` . The TF mapper and reducer are two separated files called `TF_mapper.py` and `TF_reducer.py`. Similarly, I created another two python files called `IDF_mapper.py` and `IDF_reducer.py` to calculate the IDF value of each word.

I have obtained both value of TF and IDF. I created another 2 python files which account for the final calculation of TF-IDF, called `onecode_TFIDF_mapper.py` and `onecode_TFIDF_reducer.py`.

 I created the last python file (which is named `onecode_TFIDF_output.py` which will return the wordlist, the rating score of each file, and the TF-IDF values of each words in each file.

---

Create a separate `train_combined` and `test_combined` folder to check for filenames in each folder

In [None]:
#!/usr/local/hadoop/bin/hdfs dfs -copyToLocal /users/ugbigdata/Hadoop/imdb/tinyversion/train/neg/* /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/tinyversion/train_combined
#!/usr/local/hadoop/bin/hdfs dfs -copyToLocal /users/ugbigdata/Hadoop/imdb/tinyversion/train/pos/* /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/tinyversion/train_combined

#!/usr/local/hadoop/bin/hdfs dfs -copyToLocal /users/ugbigdata/Hadoop/imdb/tinyversion/test/neg/* /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/tinyversion/test_combined
#!/usr/local/hadoop/bin/hdfs dfs -copyToLocal /users/ugbigdata/Hadoop/imdb/tinyversion/test/pos/* /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/tinyversion/test_combined

In [None]:
import os
import sys
train_folderfile = os.listdir("./tinyversion/train_combined/")
for i in range(len(train_folderfile)):
file_realname, rating = train_folderfile[i].split("_")
train_folderfile[i] = file_realname
test_folderfile = os.listdir("./tinyversion/test_combined/")
for i in range(len(test_folderfile)):
file_realname, rating = test_folderfile[i].split("_")
test_folderfile[i] = file_realname

Create a combined `train_test` folder that stores files from both train and test folder

In [None]:
#!/usr/local/hadoop/bin/hdfs dfs -rm -r /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test
#!/usr/local/hadoop/bin/hdfs dfs -mkdir /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test
#!/usr/local/hadoop/bin/hdfs dfs -cp /users/manan/Desktop/Big-Data/Assignment/tinyversion/train/neg/* /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test
#!/usr/local/hadoop/bin/hdfs dfs -cp /users/users/manan/Desktop/Big-Data/Assignment/tinyversion/train/pos/* /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test
#!/usr/local/hadoop/bin/hdfs dfs -cp /users/manan/Desktop/Big-Data/Assignment/Assignment_1/tinyversion/test/neg/* /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test
#!/usr/local/hadoop/bin/hdfs dfs -cp /users/manan/Desktop/Big-Data/Assignment/Assignment_1/tinyversion/test/pos/* /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test

---

### Code for IDF Mapper, Reducer and Code for TF Mapper, Reducer, as well as code for TF-IDF calculation

In [None]:
#IDF Mapper

import sys
import os
import re
from nltk.corpus import stopwords


mapper_output = dict() #output of mapper.py
term_count = 0 #for term count (second index output
stop_words = set(stopwords.words('english')) #create a stopwords
TERM_RE = re.compile(r"[\w']+") #remove unnecessary symbols from the text file
current_filename= "" #check current name of the txt file
#remove number from the string
def remove_number(input_str):
pattern = r'\d+'
result = re.sub(pattern, '', input_str)
return result
#check for line input:
for line in sys.stdin: #REMEMBER THAT this for loop will loop through every files, hence there will be repetition
#Calculating the total number of documents
filepath = os.environ["mapreduce_map_input_file"]
#filename = os.getenv("input_file") #for local file
filename = os.path.split(filepath)[-1]
if current_filename != filename:
current_filename = filename
#remove all unnecessary symbol and number
for term in TERM_RE.findall(line):
if term not in stopwords:
term = remove_number(term)
term = term.strip()
term = term.strip("'")
if len(term) > 1: #remove blank str and one word str
if term.lower() in mapper_output.keys():
mapper_output[term.lower()][0] += 1 #term count
else:
mapper_output[term.lower()] = [1, 1, current_filename] # index 1: term appear in # times in total document
for key in mapper_output.keys():
#this will print the term, term count, (future) term appear in # times in total document
print(",".join([key, str(mapper_output[key][0]), str(mapper_output[key][1]), str(mapper_output[key][2])]))

In [None]:
#IDF Reducer

import sys
import os
import re
import math

reducer_output = dict()
doc_list = set() #distinct document list
for line in sys.stdin:
line = line.strip() #strip line from previous text output
term, term_count, term_appear_1, doc_name = line.split(",")
doc_list.add(doc_name)
#calculate term count in total, and number of times terms appeared in document
if term not in reducer_output.keys():
reducer_output[term] = [int(term_count), 1] #we dont really need term count for IDF
else:
reducer_output[term][0] += int(term_count)
reducer_output[term][1] += 1
doc_list_len = len(doc_list)
for key in reducer_output.keys():
#return term, IDF of term
print(",".join([key, str(math.log10(int(doc_list_len) / int(reducer_output[key][1])))]))

In [None]:
#TF Mapper

import sys
import os
import re
from nltk.corpus import stopwords

TERM_RE = re.compile(r"[\w']+")
mapper_output = dict()
stop_words = set(stopwords.words('english'))
current_filename = ""
#remove number from the string
def remove_number(input_str):
pattern = r'\d+'
result = re.sub(pattern, '', input_str)
return result
#check for line input:
for line in sys.stdin:
filepath = os.environ["mapreduce_map_input_file"]
#filename = os.getenv('input_file')
filename = os.path.split(filepath)[-1]
if current_filename != filename:
current_filename = filename
#remove all unnecessary symbol and number
for term in TERM_RE.findall(line):
if term not in stopwords:
term = remove_number(term)
term = term.strip()
term = term.strip("'")
if len(term) > 1: #remove blank str and one word str
#calculate total number of term in a doc
if term.lower() in mapper_output.keys():
mapper_output[term.lower()][1] += 1 #term counts
else:
mapper_output[term.lower()] = [current_filename, 1] #filename, term counts
for key in mapper_output.keys():
print(",".join([str(mapper_output[key][0]), key, str(mapper_output[key][1])])) #output: filename, term, term counts

In [None]:
#TF Reducer

import sys
import os
import re
import math

filename_total_count = dict()
distinct_term = set()
test_dict = dict()
for line in sys.stdin:
line = line.strip()
filename, term, term_count = line.split(",")
distinct_term.add(term)
#calculate the total # of term in a document
if filename not in filename_total_count.keys():
filename_total_count[filename] = int(term_count)
else:
filename_total_count[filename] += int(term_count)
#creating a test_dict that stores value as a dict
if filename not in test_dict.keys():
test_dict[filename] = {term : int(term_count)}
else:
if term not in test_dict[filename].keys():
test_dict[filename][term] = int(term_count)
else:
test_dict[filename][term] += int(term_count)
#adding non-value for term
for d_term in distinct_term:
for filename_check in test_dict.keys():
if d_term not in test_dict[filename_check].keys():
test_dict[filename_check][d_term] = 0
#sort dictionary
for filename_check in test_dict.keys():
test_dict[filename_check] = dict(sorted(test_dict[filename_check].items()))
#print output
for key in test_dict.keys():
for term_key in test_dict[key].keys():
print(",".join([key, term_key, str(int(test_dict[key][term_key]) / int(filename_total_count[key]))]))

TF-IDF Mapper or `onecode_TFIDF_mapper.py`

In [None]:
#TF-IDF Mapper

import sys
import os
import re

TF_dict = dict()
IDF_dict = dict()
distinct_term = set()
for line in sys.stdin:
line = line.strip()
if len(line.split(',')) == 3: #to get the TF_dictionary
filename, term, term_TF = line.split(",")
if filename not in TF_dict.keys():
TF_dict[filename] = {term : term_TF}
else:
if term not in TF_dict[filename].keys():
TF_dict[filename][term] = term_TF
else: #else it will read the IDF_dictionary
term, term_IDF = line.split(',')
if term not in IDF_dict.keys():
IDF_dict[term] = term_IDF
for key in IDF_dict.keys():
print(','.join([key, str(eval(IDF_dict[key]))])) #print
for key in TF_dict.keys():
for term_key in TF_dict[key].keys():
print(','.join([key, term_key, str(eval(TF_dict[key][term_key]))]))

TF-IDF Reducer or `onecode_TFIDF_reducer.py`

In [None]:
#TF-IDF Reducer

import sys
import os
import re

IDF_dict = dict()
TF_dict = dict()
TFIDF_dict = dict()
for line in sys.stdin:
line = line.strip()
if len(line.split(',')) == 3: #TF_dict
filename, term, TF_values = line.split(',')
#calculate the value of TF-IDF: filename, every terms, TF-IDF
if filename not in TFIDF_dict.keys():
TFIDF_dict[filename] = {term : eval(TF_values) * eval(IDF_dict[term])}
else:
if term not in TFIDF_dict[filename].keys():
TFIDF_dict[filename][term] = eval(TF_values) * eval(IDF_dict[term])
else: #IDF_dict
term, IDF_values = line.split(',')
if term not in IDF_dict.keys():
IDF_dict[term] = IDF_values
for key in TFIDF_dict.keys():
for term_key in TFIDF_dict[key].keys():
print(','.join([key, term_key, str(TFIDF_dict[key][term_key])]))

`onecode_TFIDF_output.py`

In [None]:
TFIDF = open('train_test_output_local/local_TFIDF_mapreduce/part-00000', 'r')
TFIDF_dict = dict()
filename_and_rating_dict = dict()
wordlist = []
for line in TFIDF.readlines():
line = line.strip()
filename, term, TFIDF_values = line.split(',')
file_realname, rating = filename.split('_')
# wordlist
if term not in wordlist:
wordlist.append(term)
# filename and rating dictionary
if file_realname not in filename_and_rating_dict.keys():
filename_and_rating_dict[file_realname] = int(rating)
#filename, words and TFIDF values
if file_realname not in TFIDF_dict.keys():
TFIDF_dict[file_realname] = [eval(TFIDF_values)]
else:
TFIDF_dict[file_realname].append(eval(TFIDF_values))
# return wordlist
word_list = open('train_test_output_local/wordlist.txt', 'w')
word_list.write(str(wordlist))
word_list.close()
#file and rating score
rating_score = open('train_test_output_local/rating_score.txt', 'w')
rating_score.write(str(filename_and_rating_dict))
rating_score.close()
# return filename and its TFIDF values for each word
fileTFIDFdict = open('train_test_output_local/fileTFIDFdict.txt', 'w')
fileTFIDFdict.write(str(TFIDF_dict))
fileTFIDFdict.close()


## TF-IDF computation

Compute TF and IDF calculation separately for every file in the `train_test folder`

In [None]:
#------------------------------------------------------------------IDF MapReduce
!/usr/local/hadoop/bin/hdfs dfs -rm -r /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/IDF_mapreduce/
!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
-mapper IDF_mapper.py \
-reducer IDF_reducer.py \
-input /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test/* \
-output /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/IDF_mapreduce/ \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/IDF_mapper.py \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/IDF_reducer.py
#-------------------------------------------------------------------TF MapReduce
!/usr/local/hadoop/bin/hdfs dfs -rm -r /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/TF_mapreduce/
!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
-mapper TF_mapper.py \
-reducer TF_reducer.py \
-input /users/manan/Desktop/Big-Data/Assignment/tinyversion/train_test/* \
-output /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/TF_mapreduce/ \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/TF_mapper.py \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/TF_reducer.py
#---------------------------------------------------------------TF-IDF Mapreduce
!/usr/local/hadoop/bin/hdfs dfs -rm -r /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/TFIDF_mapreduce/
!/usr/local/hadoop/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
-mapper onecode_TFIDF_mapper.py \
-reducer onecode_TFIDF_reducer.py \
-input /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/IDF_mapreduce/part-00000 \
-input /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/TF_mapreduce/part-00000 \
-output /users/manan/Desktop/Big-Data/Assignment/Data_output/train_test_output/TFIDF_mapreduce/ \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/onecode_TFIDF_mapper.py \
-file /bigdata/users/student/mbhatia/Assignment/Hadoop_ver3/onecode_TFIDF_reducer.py


Use the last python file to return final values, which will be stored within the following name: `train_test_wordlist` for words list, `train_test_review_TFIDF` for TF-IDF results, `train_test_rating_score` for filenames and their correlating rating score

In [None]:
!python3 onecode_TFIDF_output.py #this python script output created 3 file name wordlist.txt, fileTFIDFdict.txt, rating_score.txt
#store wordlist.txt into variable named wordlist
f = open("train_test_output_local/wordlist.txt")
train_test_wordlist = eval(f.read())
#store each review TFIDF as a dict into variable named review_TFIDF
f = open("train_test_output_local/fileTFIDFdict.txt")
train_test_review_TFIDF = eval(f.read())
#rating of each review
f = open("train_test_output_local/rating_score.txt")
train_test_rating_score = eval(f.read()) #contain file realname and rating score as dict

---

# Task 2 Classification

###