Aggregate(by average) deep learning feature for each restaurant

In [8]:
from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.sql import SQLContext, Row

from StringIO import StringIO
from PIL import Image
import numpy as np
import csv
import os, tempfile
import boto
import datetime

In [2]:
# AWS S3 credentials:

AWS_KEY = ""
AWS_SECRET = ""
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET)

In [3]:
# read training image deep learning features
features_train = sc.textFile('s3n://amlyelp/fc7features/train_image_fc7features/')

In [4]:
pid_features_train=features_train.map(lambda x: tuple(x.split('|'))).mapValues(lambda x: np.array(x.split(','),dtype=float))
pid_features_train.take(2)

[(u'11598', array([ 0.        ,  0.        ,  0.        , ...,  0.        ,
          2.57181787,  0.        ])),
 (u'295391',
  array([ 0.       ,  0.       ,  0.       , ...,  1.5179913,  0.       ,  0.       ]))]

In [22]:
# read training image-restaurant-label map
train_label = sc.textFile('s3n://amlyelp/pic_label_trainall.csv')
first_line = train_label.take(1)[0]
train_label = train_label.filter(lambda x: x!= first_line).map(lambda x: x.split(','))\
                            .map(lambda x: (x[0],{'restaurant':x[1],'labels':np.array(x[2:11],dtype=int)}))
train_label.take(1)

[(u'204149',
  {'labels': array([0, 0, 0, 1, 0, 0, 0, 0, 1]), 'restaurant': u'3034'})]

In [14]:
# check if number of images match
print pid_features_train.count()
print train_label.count()

234842
234842


In [7]:
# join image features with restaurants and labels
id_label_feature_train = train_label.leftOuterJoin(pid_features_train)
id_label_feature_train.take(2)

[(u'378466',
  ({'labels': array([0, 0, 0, 1, 0, 0, 0, 0, 0]), 'restaurant': u'227'},
   array([ 0.,  0.,  0., ...,  0.,  0.,  0.]))),
 (u'35540',
  ({'labels': array([0, 1, 1, 0, 1, 1, 1, 1, 0]), 'restaurant': u'2611'},
   array([ 0.        ,  0.        ,  0.        , ...,  5.03832006,
           0.        ,  0.        ])))]

In [9]:
# extract restaurant id and deep learning features, prepare for averaging
res_feature_train = id_label_feature_train.map(lambda x:(x[1][0]['restaurant'], x[1][1]))
res_feature_train.take(2)

[(u'396', array([ 0.,  0.,  0., ...,  0.,  0.,  0.])),
 (u'3407', array([ 0.,  0.,  0., ...,  0.,  0.,  0.]))]

In [10]:
# average image features for each restaurant
sumCount_train = res_feature_train.combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

averageByKey_train = sumCount_train.map(lambda (label, (value_sum, count)): (label, value_sum / count))

In [11]:
averageByKey_train.take(2)

[(u'2543', array([ 0.17495501,  0.26124298,  0.16659264, ...,  0.05740633,
          0.15361293,  0.07678483])),
 (u'3580', array([ 0.76246166,  0.07764588,  0.09794557, ...,  0.02959634,
          0.01696139,  0.04301102]))]

In [None]:
# If taking max of feature instead of averaging
# maxByKey_train = res_feature_train.reduceByKey(lambda x1, x2: np.maximum(x1, x2))
# maxByKey_train.take(2)

In [1]:
# convert restaurant and features to string, prepare for writing to file
feature_avg_train = averageByKey_train.map(lambda x: ','.join([x[0],','.join(x[1].astype(np.str))]))
# feature_avg_train.take(2)

In [15]:
feature_avg_train_list = feature_avg_train.collect()

In [16]:
len(feature_avg_train_list)

2000

In [19]:
# do average feature for test set
features_test = sc.textFile('s3n://amlyelp/fc7features/test_image_fc7features/')
pid_features_test=features_test.map(lambda x: tuple(x.split('|'))).mapValues(lambda x: np.array(x.split(','),dtype=float))
pid_features_test.take(2)

[(u'306310', array([ 0.,  0.,  0., ...,  0.,  0.,  0.])),
 (u'414079',
  array([ 1.7991451,  0.       ,  0.       , ...,  0.       ,  0.       ,  0.       ]))]

In [34]:
test_label = sc.textFile('s3n://amlyelp/test_photo_to_biz.csv')
first_line = test_label.take(1)[0]
test_label = test_label.filter(lambda x: x!= first_line).map(lambda x: tuple(x.split(',')))
test_label.take(2)

[(u'317818', u'003sg'), (u'30679', u'003sg')]

In [23]:
print test_label.count()
print pid_features_test.count()

1190226
237152


In [37]:
pid_res_feature_test = test_label.leftOuterJoin(pid_features_test)
# pid_res_feature_test.take(2)

In [38]:
res_feature_test = pid_res_feature_test.map(lambda x:x[1])
# res_feature_test.take(2)

In [39]:
res_feature_test.filter(lambda x: x[1] is None).collect()

[]

In [40]:
sumCount_test = res_feature_test.combineByKey(lambda value: (value, 1),
                             lambda x, value: (x[0] + value, x[1] + 1),
                             lambda x, y: (x[0] + y[0], x[1] + y[1]))

averageByKey_test = sumCount_test.map(lambda (label, (value_sum, count)): (label, value_sum / count))

In [27]:
averageByKey_test.take(2)

[(u'8i7dh', array([ 0.07007898,  0.04918911,  0.14299838, ...,  0.38499734,
          0.12277267,  0.43560198])),
 (u'blxg3', array([ 0.16781516,  0.25677318,  0.18571631, ...,  0.27989609,
          0.08610374,  0.00900712]))]

In [2]:
feature_avg_test = averageByKey_test.map(lambda x: ','.join([x[0],','.join(x[1].astype(np.str))]))
# feature_avg_test.take(2)

In [42]:
feature_avg_test_list = feature_avg_test.collect()

In [43]:
len(feature_avg_test_list)

10000

In [17]:
from boto.s3.connection import S3Connection
from boto.s3.key import Key

AWS_KEY = ""
AWS_SECRET = ""

conn = S3Connection(AWS_KEY, AWS_SECRET, host='s3.amazonaws.com')
pb = conn.get_bucket('amlyelp')

k = Key(pb)

In [18]:
# save averaged features
tmpf = '\n'.join(feature_avg_train_list)

file_name_to_use_in_s3 = 'fc7features/feature_avg_train.csv'
k.name = file_name_to_use_in_s3
k.set_contents_from_string(tmpf)

119542321

In [44]:
tmpf = '\n'.join(feature_avg_test_list)

file_name_to_use_in_s3 = 'fc7features/feature_avg_test.csv'
k.name = file_name_to_use_in_s3
k.set_contents_from_string(tmpf)

600077602