# Load data and build dataset

# 1 - Imports

In [1]:
import pandas as pd
import numpy as np
import wget
import time
import tarfile
import os
from pathlib import Path

# 2 - Set the necessary general variables

In [2]:
url = 'http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz'

In [3]:
root_path = os.path.dirname(os.getcwd())
data_path = root_path + '/data/'
raw_data_path = data_path + "01_raw/"
init_dataset_path = data_path + "02_intermediate/"
download_output = raw_data_path + "/aclImdb_v1.tar.gz"

In [4]:
train_path = "file://" + init_dataset_path + "train.csv"
test_path = "file://" + init_dataset_path + "test.csv"

# 3 - Dowload and store data

In [5]:
wget.download(url, download_output)

'/mnt/batch/tasks/shared/LS_root/mounts/clusters/seb-compute-poc/code/GIT_REPOS/demo-sentiment-analysis/data/01_raw//aclImdb_v1.tar.gz'

In [6]:
def decompress_dataset(data_file, output_folder):
    tic = time.time()
    tar = tarfile.open(data_file)
    tar.extractall(path=output_folder)
    tar.close()
    toc = time.time()
    print(toc - tic)

In [7]:
decompress_dataset(download_output, raw_data_path)

9933.241953372955


# 4 - Process raw data to build dataset

In [8]:
def get_texts(path, CLASSES):
    texts, labels = [], []
    for idx, label in enumerate(CLASSES):
        for fname in (path / label).glob('*.*'):
            texts.append(fname.open('r', encoding='utf-8').read())
            labels.append(idx)
    return np.array(texts), np.array(labels)

In [9]:
# define function to load process and store dataset as train and test.csv files
def extract_transform_load_dataset(raw_data_path, output_path, timeit=True):
    tic = time.time()

    BOS = 'xbos'
    FLD = 'xfld'
    CLASSES = ['neg', 'pos', 'unsup']
    col_names = ['sentiment', 'review']
    np.random.seed(42)

    PATH = Path(raw_data_path + 'aclImdb/')
    CLAS_PATH = Path(output_path)

    print(PATH)
    trn_texts, trn_labels = get_texts(PATH / 'train', CLASSES)
    print(len(trn_texts))
    print(len(trn_labels))
    df_trn = pd.DataFrame({'review': trn_texts, 'sentiment': trn_labels}, columns=col_names)
    df_trn[df_trn['sentiment'] !=2].to_csv(CLAS_PATH / 'train.csv', index=False)

    val_texts, val_labels = get_texts(PATH / 'test', CLASSES)
    df_val = pd.DataFrame({'review': val_texts, 'sentiment': val_labels}, columns=col_names)
    df_val.to_csv(CLAS_PATH / 'test.csv', index=False) 

    (CLAS_PATH / 'classes.txt').open('w', encoding='utf-8').writelines(f'{o}\n' for o in CLASSES)

    toc = time.time()
    print(np.round(toc - tic, 2), 'sec')  

In [10]:
extract_transform_load_dataset(raw_data_path, init_dataset_path, timeit=True)

/mnt/batch/tasks/shared/LS_root/mounts/clusters/seb-compute-poc/code/GIT_REPOS/demo-sentiment-analysis/data/01_raw/aclImdb
75000
75000
4081.66 sec


# 5 - Load dataset

In [7]:
def load_dataset(train_path, test_path, sample=5000):
    tic = time.time()

    train_dataset = pd.read_csv(train_path).sample(n=sample)

    print(train_dataset.head())

    train_reviews = np.array(train_dataset['review'])
    train_sentiments = np.array(train_dataset['sentiment'])

    test_dataset = pd.read_csv(test_path).sample(n=sample)
    test_reviews = np.array(test_dataset['review'])
    test_sentiments = np.array(test_dataset['sentiment'])

    return train_dataset, test_dataset, train_sentiments, test_sentiments, train_reviews, test_reviews

In [8]:
train_dataset, test_dataset, train_sentiments, test_sentiments, train_reviews, test_reviews = load_dataset(train_path, test_path)

       sentiment                                             review
18295          1  This isn't among Jimmy Stewart's best films--I...
19067          1  I can only echo the praise of the other review...
375            0  This movie is just plain dumb.<br /><br />From...
14903          1  I found the film quite expressive , the way th...
18436          1  Regardless of what personal opinion one may ha...


In [9]:
print(test_reviews)

["I tend to fall in and out of love with anime, as the more you watch the more you notice a lot of shows are just poor copies of the few gems or rehashes of old formulas. But every once in a while one of the true gems comes along and it's originality just blows you away. Haruhi is truly one of those shows. Many anime series are originally manga and sometimes the translation into an animated show is rather poorly done and doesn't utilize the benefits animation has over static drawings. Haruhi is actually based on a series of light novels and fires on all cylinders, beautiful animation, great voice acting, great music and a complete and well paced story. Watch it you won't be disappointed, and I'd suggest watching it in broadcast order it works so much better that way."
 "A low-rent, cheaply made police thriller that's kept bearable by some fair humorous bits, the nice chemistry between the two leads and, especially, by James Remar's satisfying turn as a narcissistic, psychopathic (and, 

# 6 - Store dataset in HDFS

In [None]:
import pyspark
from pyspark.sql.typs import *
from pyspark.sql.context import SQLContext

def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo - equivalent_type(format_type)
    except: typo = StringType()
    return StrucField(string, typo)

def pandas_to_spark(pandas_df):
    column = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types):
        struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

def save_csv(pandas_df, path):
    df = pandas_to_spark(pandas_df)
    df.coalesce(1).write.csv(path = path, header="true", mode="overwrite", sep=",")
    