In [47]:
import operator
import sys
import pyspark
import numpy as np
from pyspark.sql import SparkSession
from collections import defaultdict

In [48]:
PREFIX = "gs://book-covers-e6893/"
reviews_file = "Books_5.json"
meta_file = "meta_Books.json"

In [4]:
spark = SparkSession.builder \
    .master("local") \
    .appName("covers") \
    .getOrCreate();
sc = spark.sparkContext

In [6]:
reviews = spark.read.json(PREFIX+reviews_file)

In [7]:
metadata = spark.read.json(PREFIX+meta_file)

In [8]:
reviews.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Package Quantity:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |    |-- Style Name:: string (nullable = true)
 |    |-- Style:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



In [54]:
metadata.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: string (nullable = true)
 |-- feature: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- fit: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- main_cat: string (nullable = true)
 |-- price: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- similar_item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin: string (nullable = true)
 |    |    |-- features: struct (nullabl

In [101]:
metadata_small = metadata.select("asin","rank","also_buy")

In [20]:
r = reviews.where(reviews.asin == '0001713353').rdd.map(lambda entry : (entry['asin'], (entry['overall'], 1)))

In [26]:
r.take(100)

[('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (3.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1)),
 ('0001713353', (5.0, 1))]

In [28]:
consolidated = reviews.rdd.map(lambda entry : (entry['asin'], (entry['overall'], 1))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))

In [31]:
consolidated.map(lambda x: (x[0], x[1][0]/x[1][1], x[1][0])).take(2)

[('0005377188', 3.764705882352941, 64.0),
 ('0006490018', 4.0754716981132075, 216.0)]

In [69]:
bucket = "book-covers-e6893"    # TODO : replace with your own bucket name
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/book_metadata'.format(bucket)
# consolidated.map(lambda x: (x[0], x[1][0]/x[1][1], x[1][0])).toDF().write.save(output_directory, format="json")

In [50]:
from google.cloud import bigquery
import subprocess

output_dataset = 'book_metadata' 
output_table = 'book_ratings'
files = output_directory + '/part-*'
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=files
    ).split())
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(output_path, True)

True

In [42]:
df = consolidated.map(lambda x: (x[0], x[1][0]/x[1][1], x[1][0])).toDF()

In [46]:
dff = df.withColumnRenamed("_1", "ASIN").withColumnRenamed("_2", "rating").withColumnRenamed("_3", "total_reviews")

In [49]:
dff.write.save(output_directory, format="json", mode="overwrite")

In [53]:
import numpy as np
from sklearn.neighbors import NearestNeighbors

In [102]:
metadata_small.printSchema()

root
 |-- asin: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [128]:
metadata_small = metadata.select("asin","rank","also_buy")

In [137]:
import re
metadata_small0 = metadata_small.rdd.map(lambda x: (x[0], "0", x[2]) if not x[1] else (x[0], x[1], x[2]))
metadata_small2 = metadata_small0.map(lambda x: (x[0], re.sub("[^0-9]", "", x[1]).replace(",", ''), x[2]))

In [138]:
metadata_small3 = metadata_small2.map(lambda x: (x[0], x[1], "") if x[2] == None else (x[0], x[1], ",".join(x[2])))

In [144]:
metadata_small4 = metadata_small3.map(lambda x: (x[0], 0, x[2]) if not x[1] else (x[0], int(x[1]), x[2]))

In [145]:
metadata_df = metadata_small4.toDF()

In [146]:
metadata_df = metadata_df.withColumnRenamed("_1", "ASIN").withColumnRenamed("_2", "ranking").withColumnRenamed("_3", "also_bought")

In [147]:
metadata_df.show(10)

+----------+--------+--------------------+
|      ASIN| ranking|         also_bought|
+----------+--------+--------------------+
|0000092878| 1349781|0669009075,B000K2...|
|000047715X| 1702625|                    |
|0000004545| 6291012|                    |
|0000013765| 2384057|                    |
|0000000116|11735726|                    |
|0000555010| 2906939|0323056962,012397...|
|0000477141| 2236549|                    |
|0000230022| 2566783|1492630519,007181...|
|0000038504| 2505873|                    |
|0000001589| 4368310|                    |
+----------+--------+--------------------+
only showing top 10 rows



In [148]:
metadata_df.write.save(output_directory, format="json", mode="overwrite")

In [149]:
from google.cloud import bigquery
import subprocess

output_dataset = 'book_metadata' 
output_table = 'book_rankings'
files = output_directory + '/part-*'
subprocess.check_call(
    'bq load --source_format NEWLINE_DELIMITED_JSON '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files}'.format(
        dataset=output_dataset, table=output_table, files=files
    ).split())
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(output_path, True)

True

In [49]:
from google.cloud import bigquery
import subprocess
client = bigquery.Client()

In [50]:
sql = """
SELECT *
FROM
    `eecs-e6893-book-cover.book_cover_data.image_data`
"""
df = client.query(sql).to_dataframe()

In [51]:
sdf = spark.createDataFrame(df)

In [52]:
sdf.printSchema()

root
 |-- file_number: string (nullable = true)
 |-- top_color_R: double (nullable = true)
 |-- top_color_G: double (nullable = true)
 |-- top_color_B: double (nullable = true)
 |-- brightness: double (nullable = true)
 |-- colorfullness: double (nullable = true)



In [53]:
data = sdf.rdd.map(lambda x : (x[0], (x[1], x[2], x[3], x[4], x[5]))).collect()

In [54]:
print(len(data))
data[0]

57000


('0557080398', (0.56, 0.56, 0.56, 8.35, 0.0))

In [55]:
test = sdf.rdd.map(lambda x : (x[0], (x[1], x[2], x[3], x[4], x[5]))).toDF()

In [56]:
from pyspark.sql import Window
from pyspark.sql import functions as F
window = Window.orderBy(F.col('_1'))
tmp = test.withColumn('index', F.row_number().over(window)).withColumnRenamed("_1", "ASIN").withColumnRenamed("_2", "features").select("index", "ASIN", "features").rdd.map(lambda x : (x[0], x[1], (x[2][0], x[2][1], x[2][2], x[2][3], x[2][4]))).collect()

In [57]:
test2 = sdf.rdd.map(lambda x : (x[1], x[2], x[3], x[4], x[5])).collect()
test2[3]

(251.97, 251.97, 251.97, 197.87, 0.0)

In [58]:
test2[103]

(1.25, 1.25, 1.25, 7.27, 0.0)

In [59]:
from sklearn.neighbors import NearestNeighbors

In [60]:
knn = NearestNeighbors()
knn.fit(test2)

NearestNeighbors(algorithm='auto', leaf_size=30, metric='minkowski',
         metric_params=None, n_jobs=1, n_neighbors=5, p=2, radius=1.0)

In [61]:
d,i = knn.kneighbors([[201.26, 135.58, 135.58, 132.64, 40.42]])
d

array([[ 0.        ,  7.70909203,  9.21531877, 10.16878557, 10.47136094]])

In [62]:
np.array(data)[i[0]]

array([['0670885517', (201.26, 135.58, 135.58, 132.64, 40.42)],
       ['1570035121', (201.61, 132.15, 132.15, 132.27, 34.45)],
       ['157344331X', (199.21, 138.32, 138.32, 139.02, 35.42)],
       ['1449466338', (197.91, 142.18, 142.18, 130.4, 40.21)],
       ['1565232577', (194.35, 140.26, 140.26, 129.5, 37.55)]],
      dtype=object)

In [63]:
i

array([[55159, 34556, 56260, 27511, 24685]])

In [64]:
np.shape(np.array(data))

(57000, 2)

In [65]:
import pickle 
knnPickle = open('knn-model', 'wb') 
pickle.dump(knn, knnPickle)

In [66]:
import cloudstorage as gcs

In [73]:
from sklearn.externals import joblib 
knnPickle = open('knn-model.jlib', 'wb') 
joblib.dump(knn, knnPickle)
knnPickle.close()
!ls

auth.json  dev	   initrd.img	   lib64       opt   sbin  usr
bin	   etc	   initrd.img.old  lost+found  proc  srv   var
boot	   hadoop  knn-model.jlib  media       root  sys   vmlinuz
copyright  home    lib		   mnt	       run   tmp   vmlinuz.old


In [71]:
output_path = sc._jvm.org.apache.hadoop.fs.Path("gs://book-covers-e6893/")
fs = output_path.getFileSystem(sc._jsc.hadoopConfiguration())
fs.moveFromLocalFile(sc._jvm.org.apache.hadoop.fs.Path("knn-model.jlib"), output_path)

In [74]:
import pandas as pd 
pd.DataFrame(data).to_csv("indexed_books.csv")

In [70]:
pd.DataFrame(data)

Unnamed: 0,0,1
0,0557080398,"(0.56, 0.56, 0.56, 8.35, 0.0)"
1,2951246005,"(254.82, 254.82, 254.82, 254.3, 0.0)"
2,094020813X,"(248.25, 248.25, 248.25, 176.2, 0.0)"
3,1495972666,"(251.97, 251.97, 251.97, 197.87, 0.0)"
4,1570272670,"(12.7, 12.7, 12.7, 58.66, 0.0)"
...,...,...
56995,3037781831,"(246.34, 245.51, 245.51, 237.41, 1.56)"
56996,0692489428,"(250.16, 250.59, 250.59, 209.13, 1.56)"
56997,0300213964,"(132.93, 132.62, 132.62, 108.1, 1.81)"
56998,0002182718,"(105.76, 53.52, 53.52, 150.01412667233552, 43...."


In [72]:
fs.moveFromLocalFile(sc._jvm.org.apache.hadoop.fs.Path("indexed_books.csv"), output_path)