In [1]:
import pyspark as ps
from pyspark import SparkConf, SparkContext
from __future__ import unicode_literals
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import json
import gzip
import spacy
%matplotlib inline
np.random.seed(32113)
import pickle
from sklearn.decomposition import NMF
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.feature_extraction.stop_words import ENGLISH_STOP_WORDS
from sklearn.decomposition import NMF
from nltk.corpus import stopwords
from spacy.en import English
from sklearn.grid_search import GridSearchCV
import string
import data_prep_new as dp 
parser = English()
from pyspark.sql.types import *
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import col,isnan,count,when
import pyspark.sql.functions as pys_fun
 
import math

hc = ps.HiveContext(sc)
sql = ps.SQLContext(sc)



## Importing data as a pandas dataframe

In [2]:
path = './reviews_Video_Games.json.gz'
meta_path = './meta_Video_Games.json.gz'

In [18]:
dataframe = dp.getDF(path)

In [19]:
meta = dp.getDF(meta_path)

## Setting HiveContext and SQLContext

In [52]:
hc = ps.HiveContext(sc)
sql = ps.SQLContext(sc)

## Preparing review data

For this notebook, I am running pyspark within my laptop.  
Unfortunately the Pandas dataframe that stores all videogame reviews are too big for my computer.  
Therefore, I will begin my data preparation by picking videogame reviews with more than 50 user votes from this Pandas dataframe.  
I am also droping features that are not used in my prediction model.  

In [60]:
#picking videogame reviews with more than 50 votes
index = [i for i in xrange(len(dataframe.helpful)) if dataframe.helpful[i][1] > 50]
dataframe2 = dataframe.loc[index]
#droping features that are not used
d2 = dataframe2.drop(['unixReviewTime','reviewTime','reviewerName','reviewerID','summary'],axis = 1)

In [7]:
d2.head(10)

Unnamed: 0,asin,helpful,reviewText,overall
81,0700026657,"[27, 52]","If you buy this game, you will regret it. If ...",1.0
272,3866811659,"[89, 91]",Fritz is arguably the most commercially-succes...,4.0
1277,9941113300,"[55, 63]",Markus Pearson Doesn't Have a Amazon account n...,1.0
1759,B000009QCV,"[11, 55]",I bought this game thinking it would be great ...,2.0
2018,B00000DMAD,"[4, 74]","the game's okay, although i don't think making...",1.0
2258,B00000DMAQ,"[2, 61]",When i rented this i returned it after about a...,1.0
2572,B00000DMAT,"[60, 66]",This is the one game that I would play day in ...,5.0
2584,B00000DMAT,"[7, 103]",I should have checked this game out more thoro...,1.0
2621,B00000DMAT,"[4, 79]",Man this game is so bad. IF you dont agree rig...,1.0
2632,B00000DMAT,"[3, 73]",This game is overated. I got this game I got t...,1.0


In [8]:
#Display d2 columns
d2.columns

Index([u'asin', u'helpful', u'reviewText', u'overall'], dtype='object')

### Preparing Schema
Here, I am making a schema for d2 Pandas dataframe.  
A schema in spark is like a structure for a Spark dataframe.  
Sometimes you can convert Pandas dataframe to Spark Dataframe without a Schema.  
However, if you have complex Pandas Dataframe structures   
like column with list or dictionary, the dataframe conversion may not work smoothly without a schema.  
  
By the way,  
the DataType for a python list = ArrayType( element datatype() )   
the DataType for a python dictionary = MapType(key datatype() , value datatype() )

In [61]:
#making a schema for d2
schema = StructType([
        StructField('asin', StringType(), True),
        StructField('helpful', ArrayType(IntegerType()), True),
        StructField('reviewText', StringType(), True),
        StructField('overall', FloatType(), True)])

In the next step, I am converting my Pandas Dataframe to Spark Dataframe.

In [62]:
#making a Spark Dataframe called df from Pandas Dataframe d2
df = hc.createDataFrame(d2,schema)

In [11]:
df.show()

+----------+---------+--------------------+-------+
|      asin|  helpful|          reviewText|overall|
+----------+---------+--------------------+-------+
|0700026657| [27, 52]|If you buy this g...|    1.0|
|3866811659| [89, 91]|Fritz is arguably...|    4.0|
|9941113300| [55, 63]|Markus Pearson Do...|    1.0|
|B000009QCV| [11, 55]|I bought this gam...|    2.0|
|B00000DMAD|  [4, 74]|the game's okay, ...|    1.0|
|B00000DMAQ|  [2, 61]|When i rented thi...|    1.0|
|B00000DMAT| [60, 66]|This is the one g...|    5.0|
|B00000DMAT| [7, 103]|I should have che...|    1.0|
|B00000DMAT|  [4, 79]|Man this game is ...|    1.0|
|B00000DMAT|  [3, 73]|This game is over...|    1.0|
|B00000DMB3| [5, 149]|...  I purchased ...|    1.0|
|B00000DMB3| [8, 177]|Seriously, this r...|    1.0|
|B00000DMB3|[16, 249]|Seriously, it's o...|    1.0|
|B00000DMB3| [5, 149]|If anyone knows h...|    1.0|
|B00000DMB3|  [4, 98]|Generic troll com...|    1.0|
|B00000DMB3|  [1, 65]|There's many that...|    1.0|
|B00000F1GM|

### extracting elements from a list feature
I want to extract elements from the helpful feature (list) and  
store these elements as "helpful_total_votes" and "num_of_helpful_votes"  

In [63]:
#extracting elements of helpful list as columns
df = df.withColumn("helpful_total_votes", df["helpful"].getItem(1)).withColumn("num_of_helpful_votes", df["helpful"].getItem(0))

In [13]:
df.show()

+----------+---------+--------------------+-------+-------------------+--------------------+
|      asin|  helpful|          reviewText|overall|helpful_total_votes|num_of_helpful_votes|
+----------+---------+--------------------+-------+-------------------+--------------------+
|0700026657| [27, 52]|If you buy this g...|    1.0|                 52|                  27|
|3866811659| [89, 91]|Fritz is arguably...|    4.0|                 91|                  89|
|9941113300| [55, 63]|Markus Pearson Do...|    1.0|                 63|                  55|
|B000009QCV| [11, 55]|I bought this gam...|    2.0|                 55|                  11|
|B00000DMAD|  [4, 74]|the game's okay, ...|    1.0|                 74|                   4|
|B00000DMAQ|  [2, 61]|When i rented thi...|    1.0|                 61|                   2|
|B00000DMAT| [60, 66]|This is the one g...|    5.0|                 66|                  60|
|B00000DMAT| [7, 103]|I should have che...|    1.0|                103

In [64]:
# I don't need the helpful column any more so lets drop it.
df = df.drop(df["helpful"])

In [15]:
df.show()

+----------+--------------------+-------+-------------------+--------------------+
|      asin|          reviewText|overall|helpful_total_votes|num_of_helpful_votes|
+----------+--------------------+-------+-------------------+--------------------+
|0700026657|If you buy this g...|    1.0|                 52|                  27|
|3866811659|Fritz is arguably...|    4.0|                 91|                  89|
|9941113300|Markus Pearson Do...|    1.0|                 63|                  55|
|B000009QCV|I bought this gam...|    2.0|                 55|                  11|
|B00000DMAD|the game's okay, ...|    1.0|                 74|                   4|
|B00000DMAQ|When i rented thi...|    1.0|                 61|                   2|
|B00000DMAT|This is the one g...|    5.0|                 66|                  60|
|B00000DMAT|I should have che...|    1.0|                103|                   7|
|B00000DMAT|Man this game is ...|    1.0|                 79|                   4|
|B00

### Running a SQL query to engineer features
lets make new features called helpful_percentage and Text_length using SQL query.  
Helpful_percentage is the percentage value calcurated by num_helpful_votes/helpful_total_votes.  
Text_length is the length of reviewText.  

In [65]:
#lets register df as a temp table called 'review' so that we can use SQL query 
df.registerTempTable('review')

In [66]:
result = hc.sql('''
SELECT R.*,
ROUND(R.num_of_helpful_votes/R.helpful_total_votes,2) as helpful_percentage,
LENGTH(R.reviewText) as Text_length FROM review as R''')

In [90]:
result.show()

+----------+--------------------+-------+-------------------+--------------------+------------------+-----------+
|      asin|          reviewText|overall|helpful_total_votes|num_of_helpful_votes|helpful_percentage|Text_length|
+----------+--------------------+-------+-------------------+--------------------+------------------+-----------+
|0700026657|If you buy this g...|    1.0|                 52|                  27|              0.52|        485|
|3866811659|Fritz is arguably...|    4.0|                 91|                  89|              0.98|       5950|
|9941113300|Markus Pearson Do...|    1.0|                 63|                  55|              0.87|        263|
|B000009QCV|I bought this gam...|    2.0|                 55|                  11|               0.2|       1353|
|B00000DMAD|the game's okay, ...|    1.0|                 74|                   4|              0.05|        344|
|B00000DMAQ|When i rented thi...|    1.0|                 61|                   2|      

#### If you want to do the same code in Pandas dataframe....

In [None]:
#Numpy and Pandas way of doing last 2 steps.
'''
game_df = d2
helpful = np.array(game_df['helpful'])
helpful = helpful.reshape(len(game_df),1)
helpful_num1 = np.zeros((len(helpful),1))
helpful_num2 = np.zeros((len(helpful),1))
for i in range(len(helpful)):
    helpful_num1[i] = helpful[i][0][0]
    helpful_num2[i] = helpful[i][0][1]
game_df['helpful_total_votes'] = helpful_num2
game_df['num_of_helpful_votes'] = helpful_num1
new_df = new_df.drop(['helpful',axis = 1)
new_df['helpful_percent'] = (new_df['num_of_helpful_votes']/ \
                                 new_df['helpful_total_votes']).round(2)
new_df.index = range(len(new_df))
length = np.zeros((len(new_df),1))
for i in new_df.index:
     length[i] = int(len(new_df['reviewText'][i]))
new_df['text_length']=length
'''

#longer code! and it's a bit confusing to follow.

## Preparing meta data

The metadata pandas dataframe contains information regarding products.  
This dataframe contains lots of useful information for my prediction model,  
but it needs to be cleaned up before merging it to review dataframe that we just created above.  

In [19]:
meta.head(5)

Unnamed: 0,asin,description,price,imUrl,related,salesRank,categories,title,brand
0,0078764343,Brand new sealed!,37.98,http://ecx.images-amazon.com/images/I/513h6dPb...,"{u'also_bought': [u'B000TI836G', u'B003Q53VZC'...",{u'Video Games': 28655},"[[Video Games, Xbox 360, Games]]",,
1,043933702X,In Stock NOW. Eligible for FREE Super Saving ...,23.5,http://ecx.images-amazon.com/images/I/61KKRndV...,"{u'also_viewed': [u'B000067NP1', u'0439573947'...",{u'Video Games': 44080},"[[Video Games, PC, Games]]",,
2,0439339987,Grandma Groupers kelp seeds are missing and wi...,8.95,http://ecx.images-amazon.com/images/I/416QZg89...,"{u'also_bought': [u'B000314VVU', u'B000PXUOTE'...",{u'Video Games': 49836},"[[Video Games, PC, Games]]",,
3,0439342260,This software is BRAND NEW. Packaging may diff...,,http://ecx.images-amazon.com/images/I/61Wvu-Uj...,{u'also_viewed': [u'043934302X']},{u'Video Games': 49156},"[[Video Games, PC, Games]]",,
4,0439339960,a scholastic clubs fairs cd rom game,,http://ecx.images-amazon.com/images/I/51k3oRCF...,{u'also_viewed': [u'B00028D7TG']},{u'Video Games': 52262},"[[Video Games, PC, Games]]",,


In [20]:
#dropping columns that I am not using in my model
meta2 = meta.drop(['imUrl','related','title','brand','description'],axis = 1)
print meta2.columns

Index([u'asin', u'price', u'salesRank', u'categories'], dtype='object')


### Metadata cleaning Part 1.
  
So let me start my first metadata cleaning.  
I need to fill null values and change non-dict items into a dict in the salesRank feature.  
I also want to extracting keys and values of salesRank as features.  
I made a python user defined function that does following:
1. Assign {"Video Games": max(salesRank.values)+1} for the data that is missing salesRank values.  
2. Create rank_kyes and rank_values columns with withColumn function.  

In [21]:
meta2, max_of_rank_val = dp._add_rankings(meta2,'Video Games')

In [22]:
meta2.head(5)

Unnamed: 0,asin,price,salesRank,categories,rank_keys,rank_values
0,0078764343,37.98,{u'Video Games': 28655},"[[Video Games, Xbox 360, Games]]",Video Games,28655
1,043933702X,23.5,{u'Video Games': 44080},"[[Video Games, PC, Games]]",Video Games,44080
2,0439339987,8.95,{u'Video Games': 49836},"[[Video Games, PC, Games]]",Video Games,49836
3,0439342260,,{u'Video Games': 49156},"[[Video Games, PC, Games]]",Video Games,49156
4,0439339960,,{u'Video Games': 52262},"[[Video Games, PC, Games]]",Video Games,52262


In [16]:
max_of_rank_val

131442

In [17]:
meta2.loc[meta2.rank_values==131441]

Unnamed: 0,asin,price,salesRank,categories,rank_keys,rank_values
50820,B00K6EKD9I,,{u'Video Games': 131441},"[[Video Games, PC, Accessories, Controllers, G...",Video Games,131441


In [23]:
#we can now drop salesRank. All info from salesRank are stored in rank_keys and rank_values.
meta3 = meta2.drop(['salesRank'],axis = 1)
print meta3.columns

Index([u'asin', u'price', u'categories', u'rank_keys', u'rank_values'], dtype='object')


### Preparing a Schema for metadata and converting Pandas DF to SparkDF

In [24]:
#making a schema for meta3
schema2 = StructType([
        StructField('asin', StringType(), True),
        StructField('price',FloatType(),True),
#       StructField('salesRank', MapType(StringType(),FloatType()), True), (if we had perfect salesRank data)
        StructField('categories', ArrayType(ArrayType(StringType())), True),
        StructField('rank_keys', StringType(), True),
        StructField('rank_values', IntegerType(), True)])

In [25]:
#making a Spark Dataframe called df_meta with meta3
df_meta = hc.createDataFrame(meta3,schema2)

In [39]:
df_meta.show()

+----------+-----+--------------------+-----------+-----------+
|      asin|price|          categories|  rank_keys|rank_values|
+----------+-----+--------------------+-----------+-----------+
|0078764343|37.98|[WrappedArray(Vid...|Video Games|      28655|
|043933702X| 23.5|[WrappedArray(Vid...|Video Games|      44080|
|0439339987| 8.95|[WrappedArray(Vid...|Video Games|      49836|
|0439342260|  NaN|[WrappedArray(Vid...|Video Games|      49156|
|0439339960|  NaN|[WrappedArray(Vid...|Video Games|      52262|
|0439374391| 20.0|[WrappedArray(Vid...|Video Games|      59834|
|0439394422|12.96|[WrappedArray(Vid...|Video Games|      36531|
|043940133X| 30.0|[WrappedArray(Vid...|Video Games|      17910|
|0439573947|10.79|[WrappedArray(Vid...|Video Games|      42130|
|0439591295|  NaN|[WrappedArray(Vid...|Video Games|      12108|
|0439591368|19.95|[WrappedArray(Vid...|Video Games|       7367|
|0439591538| 8.99|[WrappedArray(Vid...|Video Games|      35302|
|0439671418|89.99|[WrappedArray(Vid...|V

### Metadata Feature engineering part1.
The Category feature stores products' categories and is a list of lists of lists... This is driving me crazy.  
So, in next few steps, I will try to take all information from this feature and store them in new features.   
  
First, I want to make a feature called num_category.  
This feature contains information regarding total number of categories for each product.

In [26]:
# creating a python list called number_of_category which stores a number of list in each rows
number_of_category = df_meta.select('categories').rdd.flatMap(lambda x: x).map(lambda x: len(x)).collect()
# lets see first 20 elements of this list
print number_of_category[0:20]

[1, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]


Now, I want to make this list as a new feature in my dataframe.  
I really don't know the best way to convert a list to a dataframe column.  
So What I will do is create a new dataframe called nc that only contains 1 feature (number_of_category).

In [27]:
nc= hc.createDataFrame(number_of_category,IntegerType())

In [53]:
nc.show()

+-----+
|value|
+-----+
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    2|
|    2|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
|    1|
+-----+
only showing top 20 rows



#### Adding Index column
I am adding Index column for both df_meta and nc.  
The number of index should match with these two dataframe since they have the same length.

In [28]:
from pyspark.sql.functions import monotonically_increasing_id
df_meta = df_meta.withColumn("id", monotonically_increasing_id())
nc = nc.withColumn("id", monotonically_increasing_id())

In [25]:
df_meta.show()

+----------+-----+--------------------+-----------+-----------+---+
|      asin|price|          categories|  rank_keys|rank_values| id|
+----------+-----+--------------------+-----------+-----------+---+
|0078764343|37.98|[WrappedArray(Vid...|Video Games|      28655|  0|
|043933702X| 23.5|[WrappedArray(Vid...|Video Games|      44080|  1|
|0439339987| 8.95|[WrappedArray(Vid...|Video Games|      49836|  2|
|0439342260|  NaN|[WrappedArray(Vid...|Video Games|      49156|  3|
|0439339960|  NaN|[WrappedArray(Vid...|Video Games|      52262|  4|
|0439374391| 20.0|[WrappedArray(Vid...|Video Games|      59834|  5|
|0439394422|12.96|[WrappedArray(Vid...|Video Games|      36531|  6|
|043940133X| 30.0|[WrappedArray(Vid...|Video Games|      17910|  7|
|0439573947|10.79|[WrappedArray(Vid...|Video Games|      42130|  8|
|0439591295|  NaN|[WrappedArray(Vid...|Video Games|      12108|  9|
|0439591368|19.95|[WrappedArray(Vid...|Video Games|       7367| 10|
|0439591538| 8.99|[WrappedArray(Vid...|Video Gam

In [26]:
nc.show()

+-----+---+
|value| id|
+-----+---+
|    1|  0|
|    1|  1|
|    1|  2|
|    1|  3|
|    1|  4|
|    1|  5|
|    2|  6|
|    2|  7|
|    1|  8|
|    1|  9|
|    1| 10|
|    1| 11|
|    1| 12|
|    1| 13|
|    1| 14|
|    1| 15|
|    1| 16|
|    1| 17|
|    1| 18|
|    1| 19|
+-----+---+
only showing top 20 rows



#### merging two different Spark dataframes using SQL
When running SQL query, I need to register both dataframes as TempTables.  
You always convert a dataframe to a TempTable if you want to run a SQL query with Spark!  
After you register, you can run a simple SQL query to join two dataframes.  
Note: you want to "SELECT m.* ,nc.value" instead of "SELECT *".  
you don't want to have 2 duplicate "id" columns.  

In [29]:
df_meta.registerTempTable('meta')
nc.registerTempTable('nc')

In [30]:
df_meta = hc.sql('SELECT m.*, nc.value AS num_category FROM meta AS m JOIN nc ON m.id == nc.id ORDER BY id')

In [29]:
df_meta.show()

+----------+-----+--------------------+-----------+-----------+---+------------+
|      asin|price|          categories|  rank_keys|rank_values| id|num_category|
+----------+-----+--------------------+-----------+-----------+---+------------+
|0078764343|37.98|[WrappedArray(Vid...|Video Games|      28655|  0|           1|
|043933702X| 23.5|[WrappedArray(Vid...|Video Games|      44080|  1|           1|
|0439339987| 8.95|[WrappedArray(Vid...|Video Games|      49836|  2|           1|
|0439342260|  NaN|[WrappedArray(Vid...|Video Games|      49156|  3|           1|
|0439339960|  NaN|[WrappedArray(Vid...|Video Games|      52262|  4|           1|
|0439374391| 20.0|[WrappedArray(Vid...|Video Games|      59834|  5|           1|
|0439394422|12.96|[WrappedArray(Vid...|Video Games|      36531|  6|           2|
|043940133X| 30.0|[WrappedArray(Vid...|Video Games|      17910|  7|           2|
|0439573947|10.79|[WrappedArray(Vid...|Video Games|      42130|  8|           1|
|0439591295|  NaN|[WrappedAr

### Metadata Feature engineering part2.
Now the crazy part.  
1. I want to extract a unique sets of category words that exist within a category feature lists.  
2. Create a new dataframe ("category dataframe") that has step1. results as column names.
3. Create a indicator matrix where each row represents a product and column reprsents category that the product belongs in.  
4. join the category dataframe with the existing df_meta. 

2.and 3. I was not able to code it in Spark. So I am using my python codes that I previously made.

#### 1. unique sets of category words
In order to make a list of category words, I flatMap categories feature 3 times. 3 times!!!  
In the very last flatMap, I used list comprehension to extract words.  
Why? so that it extract category words only from the lists that contains "Video Games" as their first element of the list.  
I just want to get category names that are related to video game products.    
After running flatMaps and collect(), I have a list of category words.  
There are bunch of duplicate words in this list so I made it into python set and converted back to list.  

In [31]:
category_word = df_meta.select('categories').rdd.flatMap(lambda x: x)\
                    .flatMap(lambda x: x).flatMap(lambda x3: [word for word in x3 if x3[0] == 'Video Games']).collect()
word_list = list(set(category_word))
print word_list

[u'Screen Protectors', u'PlayStation 4', u'PlayStation 3', u'PlayStation 2', u'Game Boy Advance', u'Joysticks', u'GameCube', u'Commodore Amiga', u'3DO', u'Cases & Protectors', u'More Systems', u'Sony PSP', u'LIVE', u'Sega Master System', u'Fitness Accessories', u'Subscription Cards', u'Points & Currency Cards', u'Atari 2600', u'Controllers', u'Light Guns', u'Nintendo 64', u'Nintendo 3DS', u'Linux Games', u'Accessories', u'Skins', u'Steering Wheels', u'Racing Wheels', u'PlayStation', u'Networking', u'Atari 7800', u'Xbox 360', u'Chargers', u'Kids & Family', u'ColecoVision', u'Gamepads', u'Digital Games & DLC', u'PC', u'Consoles', u'Game Boy Color', u'Nintendo NES', u'Drums', u'MMO & Free-to-Play Games', u'Xbox', u'Sega CD', u'Faceplates', u'Game Boy', u'Adapters', u'Batteries', u'Fire TV', u'Sega Genesis', u'Speakers', u'Batteries & Chargers', u'Mac Game Downloads', u'Sega Game Gear', u'PDAs', u'Sega Dreamcast', u'Remotes', u'Xbox One', u'Dance Mats', u'Atari 5200', u'Keyboards', u'Downl

#### 2. & 3. Create a new dataframe ("category dataframe")  and storing category feature list element in the new dataframe.
I have to acknowledge that I was defeated here.  
I was not able to find a way to run this section and next section in spark.   
The whole purpose of these sections is to create a dataframe where columns are category names.  
This dataframe is a binary matrix that stores 1 if product(row) belongs to that category.     
  
For instance if I have category feature for a row that is like [[A,B,D][A,E,H]],  
My new dataframe would look like {A:1,B,1,C:0,D:1,E:1,F:0,G:0,H:1} for that row.   
  
Instead of using Spark code, I'm using my python codes that I previously made.  
It still runs ok. It's not uncomfortably slow.  
I might need to attack this problem later.  

In [32]:
np_cat_case = np.zeros((len(meta3),len(word_list)))
df_category= pd.DataFrame(np_cat_case)
df_category.columns = word_list
df_category = dp._category_fill(meta3['categories'],df_category, 'Video Games')

In [63]:
df_category.head(10)

Unnamed: 0,Screen Protectors,PlayStation 4,PlayStation 3,PlayStation 2,Game Boy Advance,Joysticks,GameCube,Commodore Amiga,3DO,Cases & Protectors,...,Flight Controls,Currency Cards,Games,Super Nintendo,Wii,Hardware,Gaming Mice,Sega Saturn,Cables,Gaming Keyboards
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
6,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
7,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
8,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
9,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


#### 4. join the category dataframe with the existing df_meta
I will perform same steps as the meta_data feature engineering pt1.  
Insert ID column in category dataframe and join these dataframes using SQL query.

In [33]:
cat_df= hc.createDataFrame(df_category)
cat_df = cat_df.withColumn("id", monotonically_increasing_id())

df_meta.registerTempTable('meta')
cat_df.registerTempTable('cat_df')

df_meta2 = hc.sql('''
SELECT m.asin,m.price, m.rank_keys, m.rank_values, m.num_category, cd.* 
FROM meta AS m JOIN cat_df AS cd ON m.id == cd.id ORDER BY cd.id''')

In [65]:
df_meta2.show()

+----------+-----+-----------+-----------+------------+-----------------+-------------+-------------+-------------+----------------+---------+--------+---------------+---+------------------+------------+--------+----+------------------+-------------------+------------------+-----------------------+----------+-----------+----------+-----------+------------+-----------+-----------+-----+---------------+-------------+-----------+----------+----------+--------+--------+-------------+------------+--------+-------------------+---+--------+--------------+------------+-----+------------------------+----+-------+----------+--------+--------+---------+-------+------------+--------+--------------------+------------------+--------------+----+--------------+-------+--------+----------+----------+---------+--------------------+-----------------+-------+------------+-----------+-----+--------+----------+--------------+------------+--------------------------+------+-------------+-------------+--------

In [92]:
print len(df_meta2.columns)

100


## Merging review data and Meta data
Again, We will use SQL query to join two dataframe by asin feature.  
After joining I want to filter the joined data by ranked_keys.  
the ranked_keys contains information regarding category that product is ranked in.  
I do not want to include products that are ranked in the category that is not related to  Video Games.  
So Before I merge dataframes, let me look at ranked_keys with following SQL query.  

In [72]:
df_meta2.registerTempTable('meta')
result.registerTempTable('review')

In [73]:
print result.count()

13792


In [42]:
cat = hc.sql('SELECT rank_keys, COUNT(rank_keys) AS count FROM meta GROUP BY meta.rank_keys ORDER BY count DESC')
cat.distinct().show()

+--------------------+-----+
|           rank_keys|count|
+--------------------+-----+
|         Video Games|47604|
|            Software| 2236|
|         Electronics|  757|
|        Toys & Games|  148|
|            Clothing|   41|
|Sports &amp; Outd...|   36|
|  Home &amp; Kitchen|   26|
|         Movies & TV|   22|
|Cell Phones & Acc...|   17|
|  Camera &amp; Photo|   14|
|               Music|   12|
|Computers & Acces...|   10|
| Musical Instruments|    9|
|Health & Personal...|    4|
|Arts, Crafts & Se...|    4|
|Industrial & Scie...|    2|
|Patio, Lawn & Garden|    2|
|    Kitchen & Dining|    2|
|               Shoes|    2|
|              Beauty|    2|
+--------------------+-----+
only showing top 20 rows



Now you see why we want to filter by rank_keys.   
I am working with video game product reviews so  
I can probably take out reviews for product that are in shoes or kitchen & Dining categories.    
I want to work with categories that are closly related to video games!  
If we filter ranked_keys by total number of product in the category > 50....  

In [35]:
cat2 = hc.sql('SELECT rank_keys, COUNT(rank_keys) AS count FROM meta GROUP BY meta.rank_keys Having COUNT(rank_keys)>50')
cat2.distinct().show()

+------------+-----+
|   rank_keys|count|
+------------+-----+
|Toys & Games|  148|
| Video Games|47604|
| Electronics|  757|
|    Software| 2236|
+------------+-----+



Let's just use products that are ranked in these 4 categories.  
I set up a another SQL query with window function that will filter data by the count of rank_keys.  
If count(rank_keys) over (partition by rank_kyes) > 100 it will save data as meta_filtered.

In [48]:
meta_filtered = hc.sql('SELECT sub.* FROM (SELECT meta.*, COUNT(rank_keys) OVER(PARTITION BY rank_keys) AS count_cat FROM meta) sub WHERE count_cat >100')
meta_filtered = meta_filtered.drop('count_cat')

In [49]:
#double check if my code above is working...
set(meta_filtered.select('rank_keys').collect())

{Row(rank_keys=u'Electronics'),
 Row(rank_keys=u'Software'),
 Row(rank_keys=u'Toys & Games'),
 Row(rank_keys=u'Video Games')}

In [53]:
print meta_filtered.count()

50745


It works!

Now that we know what rank_keys to use to filter, lets add one more way to filter data while merging.  
I am filtering this dataset by reviews that have more than 100 review votes.  

In [82]:
query ='''
SELECT R.reviewText,R.overall, R.helpful_total_votes, R.num_of_helpful_votes, 
R.helpful_percentage, R.Text_length, meta_2.*
FROM review as R JOIN (SELECT meta.*, COUNT(rank_keys) OVER(PARTITION BY rank_keys) AS count_cat FROM meta) meta_2 
ON R.asin = meta_2.asin 
WHERE R.helpful_total_votes>100 AND meta_2.count_cat >100''' 
result2 = hc.sql(query)

In [84]:
#take out id column for result2. when meta data was merged with review data,  
#It no longer function as an id. so we need to delete it and assign new id.
result2 = result2.drop("id","count_cat")
result2 = result2.withColumn("id", monotonically_increasing_id())

In [93]:
print((result2.count(), len(result2.columns)))

(4739, 106)


In [95]:
set (result2.select('rank_keys').collect())

{Row(rank_keys=u'Electronics'),
 Row(rank_keys=u'Software'),
 Row(rank_keys=u'Video Games')}

#### 5. Making dummy variables for rank_keys
I know that the purpose of this project is to run my previous project codes in Spark.  
But I now realize that some functions and code should be kept in python not Pyspark.  
This is a good example. Pandas have their built-in function for dummy variables called get_dummies.  
Pyspark... you need to built your own dummy variable function.  
Yes, Its not hard to code that. But Is it more efficient than the pandas built-in function...?  
For the cleaness purpose, I'm just going to use pandas for this work.  

In [98]:
#converting Spark dataframe to pandas dataframe
panda_result=result2.toPandas()

In [99]:
#dum is the dummie variables for rank_keys
dum = pd.get_dummies(panda_result.rank_keys, drop_first=True)
panda_result2 = pd.concat([panda_result,dum], axis = 1)
dum2 =pd.concat([panda_result.id,dum], axis = 1)

In [100]:
print set(panda_result.rank_keys)
print dum.columns

set([u'Electronics', u'Video Games', u'Software'])
Index([u'Software', u'Video Games'], dtype='object')


In [82]:
#lets turn the df back to spark dataframe.
merged_df_sp = hc.createDataFrame(panda_result2)

ValueError: name already used as a name or title

No bueno :(  
Since setting up a schema for over 100 features would kill my time,  
I guess I will convert "dum" pandas dataframe which only contains 2 features  
and merge 2 using sparkSQL.

In [106]:
#making a schema for dum
dum_schema = StructType([
        StructField('id', IntegerType(), True),
        StructField(dum.columns[0], IntegerType(), True),
        StructField(dum.columns[1], IntegerType(), True)])

In [107]:
dum_spark = hc.createDataFrame(dum2)

In [108]:
dum_spark.show()

+---+--------+-----------+
| id|Software|Video Games|
+---+--------+-----------+
|  0|       0|          1|
|  1|       0|          0|
|  2|       0|          1|
|  3|       0|          1|
|  4|       0|          1|
|  5|       0|          1|
|  6|       0|          1|
|  7|       0|          1|
|  8|       0|          1|
|  9|       0|          1|
| 10|       0|          1|
| 11|       0|          1|
| 12|       0|          1|
| 13|       0|          1|
| 14|       0|          1|
| 15|       0|          1|
| 16|       0|          1|
| 17|       0|          1|
| 18|       0|          1|
| 19|       0|          1|
+---+--------+-----------+
only showing top 20 rows



In [87]:
dum_spark.select("id").collect()

[Row(id=0),
 Row(id=1),
 Row(id=2),
 Row(id=3),
 Row(id=4),
 Row(id=5),
 Row(id=6),
 Row(id=7),
 Row(id=8),
 Row(id=9),
 Row(id=10),
 Row(id=11),
 Row(id=12),
 Row(id=13),
 Row(id=14),
 Row(id=15),
 Row(id=16),
 Row(id=17),
 Row(id=18),
 Row(id=19),
 Row(id=20),
 Row(id=8589934592),
 Row(id=8589934593),
 Row(id=8589934594),
 Row(id=8589934595),
 Row(id=8589934596),
 Row(id=8589934597),
 Row(id=8589934598),
 Row(id=8589934599),
 Row(id=8589934600),
 Row(id=8589934601),
 Row(id=8589934602),
 Row(id=8589934603),
 Row(id=8589934604),
 Row(id=8589934605),
 Row(id=8589934606),
 Row(id=8589934607),
 Row(id=8589934608),
 Row(id=8589934609),
 Row(id=17179869184),
 Row(id=17179869185),
 Row(id=17179869186),
 Row(id=17179869187),
 Row(id=17179869188),
 Row(id=17179869189),
 Row(id=17179869190),
 Row(id=17179869191),
 Row(id=17179869192),
 Row(id=17179869193),
 Row(id=17179869194),
 Row(id=17179869195),
 Row(id=17179869196),
 Row(id=17179869197),
 Row(id=17179869198),
 Row(id=17179869199),
 Row(id

In [114]:
from pyspark.sql.functions import col
dum_spark = dum_spark.select(col("id"),col("Software"),col("Video Games").alias("Video_Games"))
dum_spark.show()

+---+--------+-----------+
| id|Software|Video_Games|
+---+--------+-----------+
|  0|       0|          1|
|  1|       0|          0|
|  2|       0|          1|
|  3|       0|          1|
|  4|       0|          1|
|  5|       0|          1|
|  6|       0|          1|
|  7|       0|          1|
|  8|       0|          1|
|  9|       0|          1|
| 10|       0|          1|
| 11|       0|          1|
| 12|       0|          1|
| 13|       0|          1|
| 14|       0|          1|
| 15|       0|          1|
| 16|       0|          1|
| 17|       0|          1|
| 18|       0|          1|
| 19|       0|          1|
+---+--------+-----------+
only showing top 20 rows



In [115]:
print((dum_spark.count(), len(dum_spark.columns)))

(4739, 3)


In [94]:
print((result2.count(), len(result2.columns)))

(4739, 106)


In [95]:
result2.select("id").show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
only showing top 20 rows



In [57]:
print((testdf.count(), len(testdf.columns)))

(4739, 105)


In [117]:
#merging two tables
dum_spark.registerTempTable('dummy')
result2.registerTempTable('merged_df')

df_merged = hc.sql('''
SELECT m.*, dum.software, dum.Video_Games FROM merged_df AS m JOIN dummy AS dum ON m.id=dum.id''')

In [118]:
df_merged.show()

+--------------------+-------+-------------------+--------------------+------------------+-----------+----------+------+-----------+-----------+------------+-----------------+-------------+-------------+-------------+----------------+---------+--------+---------------+---+------------------+------------+--------+----+------------------+-------------------+------------------+-----------------------+----------+-----------+----------+-----------+------------+-----------+-----------+-----+---------------+-------------+-----------+----------+----------+--------+--------+-------------+------------+--------+-------------------+---+--------+--------------+------------+-----+------------------------+----+-------+----------+--------+--------+---------+-------+------------+--------+--------------------+------------------+--------------+----+--------------+-------+--------+----------+----------+---------+--------------------+-----------------+-------+------------+-----------+-----+--------+-------

In [119]:
print((df_merged.count(), len(df_merged.columns)))

(4739, 108)


## saving and loading my dataframe

In [120]:
df_merged.write.csv('test.csv', mode="overwrite", header=True)

In [55]:
df = spark.read.csv('test.csv', header=True, inferSchema=True)

In [3]:
print((df.count(), len(df.columns)))

(4739, 108)


In [129]:
df_merged.write.json('test.json', mode="overwrite")
df_json = spark.read.json('test.json')

## Price

Here, I want to deal with Price columns.  
When I worked on Price column previously I replaced Null values with average values of products.  
I figure this may introduce a problem.  
  
Previously, I replace null values before splitting training and test dataset.  
However, that is introducing data leakage because the test dataset should not have the same average value.  
  
Therefore, instead of filling null values with average value, I decide to make price column categorical feature.  
the features are:
1. below20  
2. below50  
3. below100  
4. below300  
5. above300  
and   
6. price_unknown (null values)  


In [56]:
#first make a expression that can be used with withColumn function
df_2 = pys_fun.expr(
    """IF(price == "NaN","price_unknown", IF(price <=20, "below20", 
    IF(price>20 AND price<=50, 'below50', IF(price>50 AND price<=100, 'below100', 
    IF(price>100 AND price<=300, 'below300', 'above300')))))""")

In [57]:
#apply the expression above to my dataset and name the new feature "new_price"
df = (df.withColumn("new_price", df_2))

In [58]:
df.select("price","new_price").collect()

[Row(price=14.99, new_price=u'below20'),
 Row(price=90.54, new_price=u'below100'),
 Row(price=28.12, new_price=u'below50'),
 Row(price=48.87, new_price=u'below50'),
 Row(price=32.89, new_price=u'below50'),
 Row(price=59.99, new_price=u'below100'),
 Row(price=nan, new_price=u'price_unknown'),
 Row(price=129.99, new_price=u'below300'),
 Row(price=99.99, new_price=u'below100'),
 Row(price=102.23, new_price=u'below300'),
 Row(price=19.99, new_price=u'below20'),
 Row(price=14.79, new_price=u'below20'),
 Row(price=132.73, new_price=u'below300'),
 Row(price=19.99, new_price=u'below20'),
 Row(price=24.82, new_price=u'below50'),
 Row(price=19.99, new_price=u'below20'),
 Row(price=499.99, new_price=u'above300'),
 Row(price=39.99, new_price=u'below50'),
 Row(price=19.99, new_price=u'below20'),
 Row(price=nan, new_price=u'price_unknown'),
 Row(price=32.76, new_price=u'below50'),
 Row(price=300.0, new_price=u'below300'),
 Row(price=39.75, new_price=u'below50'),
 Row(price=159.99, new_price=u'below3

In [59]:
#now lets make a dummy variables for these categorical values
panda_df=df.toPandas()
dum = pd.get_dummies(panda_df.new_price, drop_first=True)
dum2 =pd.concat([panda_df.id,dum], axis = 1)

In [60]:
dum2.head(10)

Unnamed: 0,id,below100,below20,below300,below50,price_unknown
0,25769803779,0,1,0,0,0
1,154618822659,1,0,0,0,0
2,274877907016,0,0,0,1,0
3,292057776131,0,0,0,1,0
4,326417514596,0,0,0,1,0
5,446676598802,1,0,0,0,0
6,455266533491,0,0,0,0,1
7,472446402575,0,0,1,0,0
8,481036337164,1,0,0,0,0
9,592705486860,0,0,1,0,0


In [61]:
price_dum = hc.createDataFrame(dum2)

In [62]:
price_dum.show()

+-------------+--------+-------+--------+-------+-------------+
|           id|below100|below20|below300|below50|price_unknown|
+-------------+--------+-------+--------+-------+-------------+
|  25769803779|       0|      1|       0|      0|            0|
| 154618822659|       1|      0|       0|      0|            0|
| 274877907016|       0|      0|       0|      1|            0|
| 292057776131|       0|      0|       0|      1|            0|
| 326417514596|       0|      0|       0|      1|            0|
| 446676598802|       1|      0|       0|      0|            0|
| 455266533491|       0|      0|       0|      0|            1|
| 472446402575|       0|      0|       1|      0|            0|
| 481036337164|       1|      0|       0|      0|            0|
| 592705486860|       0|      0|       1|      0|            0|
| 627065225225|       0|      1|       0|      0|            0|
| 730144440325|       0|      1|       0|      0|            0|
| 738734374913|       0|      0|       1

In [63]:
price_dum.registerTempTable('dummy2')
df.registerTempTable('merged_df')

df_2 = hc.sql('''
SELECT m.*, dum.below20, dum.below50,dum.below100,dum.below300,dum.price_unknown FROM merged_df AS m JOIN dummy2 AS dum ON m.id=dum.id''')

In [65]:
df_2.show()

+--------------------+-------+-------------------+--------------------+------------------+-----------+----------+------+-----------+-----------+------------+-----------------+-------------+-------------+-------------+----------------+---------+--------+---------------+---+------------------+------------+--------+----+------------------+-------------------+------------------+-----------------------+----------+-----------+----------+-----------+------------+-----------+-----------+-----+---------------+-------------+-----------+----------+----------+--------+--------+-------------+------------+--------+-------------------+---+--------+--------------+------------+-----+------------------------+----+-------+----------+--------+--------+---------+-------+------------+--------+--------------------+------------------+--------------+----+--------------+-------+--------+----------+----------+---------+--------------------+-----------------+-------+------------+-----------+-----+--------+-------

In [66]:
print((df_2.count(), len(df_2.columns)))

(4739, 114)


In [None]:
# taking out data samples where price is unknown and rank value is unknown.
df_2_1 = df_2.where(~((df_2.price_unknown==1)&(df_2.rank_values == max_of_rank_val)))

In [68]:
#now we do not need price and new_price anymore so I would just drop them.
df_2 = df_2.drop('price','new_price')

In [71]:
df_2 = df_2.drop('rank_keys')

In [34]:
print((df_2.count(), len(df_2.columns)))

(4739, 111)


In [35]:
print((df_2_1.count(), len(df_2_1.columns)))

(4234, 111)


**DO NOT USE write.csv to save data. It cause loss of data samples**  
Instead, try parquet file.

In [36]:
#df_2_1.write.csv('sparkprocess1.csv', mode="overwrite", header=True)
df_2_1 = df_2_1.drop('Video Games')
df_2_1 = df_2_1.toDF(*(c.replace(' ', '_') for c in df_2_1.columns))
df_2_1.write.save("TFIDF_ready.parquet")

### Codes that run everything we did so far

In [3]:
path = './reviews_Video_Games.json.gz'
meta_path = './meta_Video_Games.json.gz'
start_time = time.time()
# review data preparation
dataframe = dp.getDF(path)
meta = dp.getDF(meta_path)
hc = ps.HiveContext(sc)
sql = ps.SQLContext(sc)
index = [i for i in xrange(len(dataframe.helpful)) if dataframe.helpful[i][1] > 100]
dataframe2 = dataframe.loc[index]
d2 = dataframe2.drop(['unixReviewTime','reviewTime','reviewerName','reviewerID','summary'],axis = 1)
schema = StructType([
        StructField('asin', StringType(), True),
        StructField('helpful', ArrayType(IntegerType()), True),
        StructField('reviewText', StringType(), True),
        StructField('overall', FloatType(), True)])
df = hc.createDataFrame(d2,schema)
df = df.withColumn("helpful_total_votes", df["helpful"].getItem(1)).withColumn("num_of_helpful_votes", df["helpful"].getItem(0))
df = df.drop(df["helpful"])
df.registerTempTable('review')
result = hc.sql('''
SELECT R.*,
ROUND(R.num_of_helpful_votes/R.helpful_total_votes,2) as helpful_percentage,
LENGTH(R.reviewText) as Text_length FROM review as R''')

#preparing meta data
meta2 = meta.drop(['imUrl','related','title','brand','description'],axis = 1)
meta2, max_of_rank_val = dp._add_rankings(meta2,'Video Games')
meta3 = meta2.drop(['salesRank'],axis = 1)
schema2 = StructType([
        StructField('asin', StringType(), True),
        StructField('price',FloatType(),True),
        StructField('categories', ArrayType(ArrayType(StringType())), True),
        StructField('rank_keys', StringType(), True),
        StructField('rank_values', IntegerType(), True)])
df_meta = hc.createDataFrame(meta3,schema2)
number_of_category = df_meta.select('categories').rdd.flatMap(lambda x: x).map(lambda x: len(x)).collect()
nc= hc.createDataFrame(number_of_category,IntegerType())
df_meta = df_meta.withColumn("id", monotonically_increasing_id())
nc = nc.withColumn("id", monotonically_increasing_id())
df_meta.registerTempTable('meta')
nc.registerTempTable('nc')
df_meta = hc.sql('SELECT m.*, nc.value AS num_category FROM meta AS m JOIN nc ON m.id == nc.id ORDER BY id')
category_word = df_meta.select('categories').rdd.flatMap(lambda x: x)\
                    .flatMap(lambda x: x).flatMap(lambda x3: [word for word in x3 if x3[0] == 'Video Games']).collect()
word_list = list(set(category_word))
np_cat_case = np.zeros((len(meta3),len(word_list)))
df_category= pd.DataFrame(np_cat_case)
df_category.columns = word_list
df_category = dp._category_fill(meta3['categories'],df_category, 'Video Games')
cat_df= hc.createDataFrame(df_category)
cat_df = cat_df.withColumn("id", monotonically_increasing_id())

df_meta.registerTempTable('meta')
cat_df.registerTempTable('cat_df')

df_meta2 = hc.sql('''
SELECT m.asin,m.price, m.rank_keys, m.rank_values, m.num_category, cd.* 
FROM meta AS m JOIN cat_df AS cd ON m.id == cd.id ORDER BY cd.id''')

#mergning review and meta:
df_meta2.registerTempTable('meta')
result.registerTempTable('review')

query ='''
SELECT R.reviewText,R.overall, R.helpful_total_votes, R.num_of_helpful_votes, 
R.helpful_percentage, R.Text_length, meta_2.*
FROM review as R JOIN (SELECT meta.*, COUNT(rank_keys) OVER(PARTITION BY rank_keys) AS count_cat FROM meta) meta_2 
ON R.asin = meta_2.asin 
WHERE meta_2.count_cat >100''' 

result2 = hc.sql(query)
result2 = result2.drop("id","count_cat")
result2 = result2.withColumn("id", monotonically_increasing_id())
panda_result=result2.toPandas()
dum = pd.get_dummies(panda_result.rank_keys, drop_first=True)
dum = dum.rename(columns={'Video Games': 'Video_Games'})
dum2 =pd.concat([panda_result.id,dum], axis = 1)
dum_spark = hc.createDataFrame(dum2)
dum_spark.registerTempTable('dummy')
result2.registerTempTable('merged_df')

df_merged = hc.sql('''
SELECT m.*, dum.software, dum.Video_Games FROM merged_df AS m JOIN dummy AS dum ON m.id=dum.id''')

#price
ifquery = pys_fun.expr(
    """IF(price == "NaN","price_unknown", IF(price <=20, "below20", 
    IF(price>20 AND price<=50, 'below50', IF(price>50 AND price<=100, 'below100', 
    IF(price>100 AND price<=300, 'below300', 'above300')))))""")
df_merged = (df_merged.withColumn("new_price", ifquery))
panda_df=df_merged.toPandas()
dum3 = pd.get_dummies(panda_df.new_price, drop_first=True)
dum4 =pd.concat([panda_df.id,dum3], axis = 1)
price_dum = hc.createDataFrame(dum4)
price_dum.registerTempTable('dummy2')
df_merged.registerTempTable('merged_df')

df_2 = hc.sql('''
SELECT m.*, dum.below20, dum.below50,dum.below100,dum.below300,dum.price_unknown FROM merged_df AS m JOIN dummy2 AS dum ON m.id=dum.id''')
df_2 = df_2.drop('price','new_price','rank_keys')
df_2_1 = df_2.where(~((df_2.price_unknown==1)&(df_2.rank_values == max_of_rank_val)))
#changing names of columns. Parquet file format does not allow spaces and special characters.
df_2_1 = df_2_1.drop('Video Games')
df_2_1 = df_2_1.toDF(*(c.replace(' ', '_') for c in df_2_1.columns))
df_2_1.write.save("TFIDF_ready.parquet")
print("--- %s seconds ---" % (time.time() - start_time))

--- 516.07486105 seconds ---


### checking Output files

In [4]:
test= spark.read.load("TFIDF_ready.parquet")

In [5]:
print "total number of datapoints: {}".format(test.count())
print "total number of features: {}".format(len(test.columns))

total number of datapoints: 4234
total number of features: 110


In [6]:
test.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in test.columns]).show()

+----------+-------+-------------------+--------------------+------------------+-----------+----+-----------+------------+-----------------+-------------+-------------+-------------+----------------+---------+--------+---------------+---+------------------+------------+--------+----+------------------+-------------------+------------------+-----------------------+----------+-----------+----------+-----------+------------+-----------+-----------+-----+---------------+-------------+-----------+----------+----------+--------+--------+-------------+------------+--------+-------------------+---+--------+--------------+------------+-----+------------------------+----+-------+----------+--------+--------+---------+-------+------------+--------+--------------------+------------------+--------------+----+--------------+-------+--------+----------+----------+---------+--------------------+-----------------+-------+------------+-----------+-----+--------+----------+--------------+------------+---