# IBM Advanced Data Science Capstone Project
## Sentiment Analysis of Amazon Customer Reviews
### Harsh V Singh, Apr 2021

## Extract, Transform, Load (ETL)

This notebook contains the comprehensive step-by-step process for preparing the raw data to be used in the project. The data that we are using is avaiable in the form of two csv files (train.csv/ test.csv). We will read these files into memory and then store them in parquet files with the same name. *Spark csv reader is not able to handle commas within the quoted text of the reviews. Hence, we will first read the files into Pandas dataframes and then export them into parquet files*.

## Importing required Python libraries and initializing Apache Spark environment

In [2]:
from ibm_botocore.client import Config
import ibm_boto3

import pandas as pd
import csv
import time
from pathlib import Path
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType, ArrayType
from pyspark.sql.functions import udf, rand, col, concat, coalesce
from pyspark.ml.feature import HashingTF, IDF, Word2Vec, Word2VecModel

conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .getOrCreate()

In [11]:
# @hidden_cell
# The following code contains the credentials for a file in your IBM Cloud Object Storage.
# You might want to remove those credentials before you share your notebook.
creds = {
    'IAM_SERVICE_ID': 'iam-ServiceId-a1e6ae17-a480-4a92-b3b8-b5927994ec39',
    'IBM_API_KEY_ID': '0bgYhpiB1he87QNeaFI-4_WD04bUy0JZZPTKR86Tyf-Z',
    'ENDPOINT': 'https://s3.eu.cloud-object-storage.appdomain.cloud',
    'IBM_AUTH_ENDPOINT': 'https://iam.cloud.ibm.com/oidc/token',
    'BUCKET': 'ibmadvanceddatasciencecapstonepro-donotdelete-pr-nswuywrsyrm5si'
}

#LAhA7x-MDHxGUxmx6ZY0CU8m-0zDmlNZJ2RT5kHwKfN8

cos = ibm_boto3.client(service_name='s3',
    ibm_api_key_id=creds['IBM_API_KEY_ID'],
    ibm_service_instance_id=creds['IAM_SERVICE_ID'],
    ibm_auth_endpoint=creds['IBM_AUTH_ENDPOINT'],
    config=Config(signature_version='oauth'),
    endpoint_url=creds['ENDPOINT'])

## Reading data from CSV and storing local copies

The data that we are using for this project is avaiable to us in the form of two csv files (train.csv/ test.csv). We will read these files into memory and then store them in parquet files with the same name. 

We will write a function called **readSparkDFFromParquet** will read the parquet files into memory as Spark dataframes. In case the parquet files are not found, this function will call another function called **savePandasDFToParquet** which reads the original csv files into Pandas dataframe and saves them as **parquet** files.  

*The reason why we need to read the csv files into a Pandas dataframe is bacause the Spark csv reader function is not able to handle commas within the quoted text of the reviews. In order to solve that, we will use the Pandas csv reader to process the data initially and then export them into parquet files*.


In [4]:
# Function to print time taken by a particular process, given the start and end times
def printElapsedTime(startTime, endTime):
    elapsedTime = endTime - startTime
    print("-- Process time = %.2f seconds --"%(elapsedTime))

In [5]:
# Schema that defines the columns and datatypes of the data in the csv files
rawSchema = StructType([
    StructField("rating", IntegerType(), True),
    StructField("review_heading", StringType(), True),
    StructField("review_text", StringType(), True)
    ])

## Download raw CSV files and upload converted parquet files

We will first check if the IBM Cloud Storage bucket contains the converted parquet files. If not, we will download the CSV data and then convert them to parquet files using Spark. Finally, we will upload these parquet files to the cloud storage.

In [6]:
# Function to save a Pandas dataframe as a parquet file
def saveCSVToParquet(creds, cos, csvFile, parqPath, rawSchema, printTime=False):
    startTime = time.time()
    # Read csv to pandas dataframe
    cos.download_file(Bucket=creds["BUCKET"], Key=csvFile, Filename=csvFile)
    pandasDF = pd.read_csv(csvFile, header=None)
    pandasDF.columns = rawSchema.names
    # Convert pandas to spark dataframe
    parquetDF = spark.createDataFrame(pandasDF, schema=rawSchema)
    parquetDF.write.mode("overwrite").parquet(parqPath)
    shutil.make_archive(parqPath, 'tar', parqPath)
    
    # Upload parquet file to COS
    cos.upload_file(Filename=parqPath+".tar", Bucket=creds["BUCKET"], Key=parqPath)
    endTime = time.time()
    if printTime:
        printElapsedTime(startTime=startTime, endTime=endTime)
    return

In [8]:
# Fetch existing files from the COS bucket
cosBucketContent = cos.list_objects(Bucket=creds["BUCKET"])["Contents"]
cosFileNames = [x["Key"] for x in cosBucketContent]

# Convert CSV to parquet and upload to COS if files don't exist
if "trainRaw.parquet" not in cosFileNames:
    saveCSVToParquet(cos=cos, creds=creds, csvFile="train.csv", parqPath="trainRaw.parquet", rawSchema=rawSchema, printTime=True)
if "testRaw.parquet" not in cosFileNames:
    saveCSVToParquet(cos=cos, creds=creds, csvFile="test.csv", parqPath="testRaw.parquet", rawSchema=rawSchema, printTime=True)

## Load local data for sanity check

We will load the train and test sets and print a few samples as well as the size of the datasets.

In [7]:
# Function to read a parquet file into a Spark dataframe
# If the parquet file is not found, it will be created from the original csv
def readParquetToSparkDF(creds, cos, parqPath, rawSchema):
    cos.download_file(Bucket=creds["BUCKET"], Key=parqPath, Filename=parqPath+".tar")
    shutil.unpack_archive(parqPath+".tar", parqPath, "tar")
    parquetDF = spark.read.schema(rawSchema).parquet(parqPath)
    return (parquetDF)

In [15]:
# Load train and test parquet dataframes from IBM Cloud Storage
trainRaw = readParquetToSparkDF(cos=cos, creds=creds, parqPath="trainRaw.parquet", rawSchema=rawSchema)
testRaw = readParquetToSparkDF(cos=cos, creds=creds, parqPath="testRaw.parquet", rawSchema=rawSchema)

trainRaw.show(5)

+------+--------------------+--------------------+
|rating|      review_heading|         review_text|
+------+--------------------+--------------------+
|     3|  more like funchuck|Gave this to my d...|
|     5|           Inspiring|I hope a lot of p...|
|     5|The best soundtra...|I'm reading a lot...|
|     4|    Chrono Cross OST|The music of Yasu...|
|     5| Too good to be true|Probably the grea...|
+------+--------------------+--------------------+
only showing top 5 rows

