In [1]:
import pyspark as ps
from pyspark.sql.types import *
from pyspark.ml.clustering import LDA, KMeans
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [2]:
spark = (ps.sql.SparkSession.builder
        .master("local[3]")
        .appName("capstone")
        .getOrCreate()
        )
sc = spark.sparkContext

In [3]:
import pandas as pd
df = pd.read_csv('s3a://capstone-3/data/products_art_only.csv')

In [4]:
df.drop('Unnamed: 0', axis=1,inplace=True)
spark_df = spark.createDataFrame(df)

In [5]:
spark_df.printSchema()

root
 |-- vendor_variant_id: long (nullable = true)
 |-- vendor_id: long (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_description: string (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- taxonomy_name: string (nullable = true)
 |-- taxonomy_id: double (nullable = true)
 |-- weblink: string (nullable = true)
 |-- color: string (nullable = true)
 |-- material: string (nullable = true)
 |-- pattern: string (nullable = true)
 |-- is_returnable: boolean (nullable = true)
 |-- ship_surcharge: double (nullable = true)
 |-- is_assembly_required: boolean (nullable = true)
 |-- is_feed: long (nullable = true)
 |-- commission_tier: string (nullable = true)
 |-- inventory_type: string (nullable = true)
 |-- division: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- combo: string (nullable = true)



In [6]:
spark_df.select('product_title','combo').show()

+--------------------+--------------------+
|       product_title|               combo|
+--------------------+--------------------+
|Framed Canvas Pri...|Framed Canvas Pri...|
|Framed Print, Abs...|Framed Print, Abs...|
|Janel Foo Glasswo...|Janel Foo Glasswo...|
|The Arts Capsule ...|The Arts Capsule ...|
|Native Maps - Aus...|Native Maps - Aus...|
|Framed Print, Tur...|Framed Print, Tur...|
|Sarah Campbell Wa...|Sarah Campbell Wa...|
|Framed Print, Ope...|Framed Print, Ope...|
|Framed Print, Mid...|Framed Print, Mid...|
|The Arts Capsule ...|The Arts Capsule ...|
|Canvas Print - Ab...|Canvas Print - Ab...|
|Ashley Mary Art L...|Ashley Mary Art L...|
|The Arts Capsule ...|The Arts Capsule ...|
|Ashley Mary Balan...|Ashley Mary Balan...|
|The Arts Capsule ...|The Arts Capsule ...|
|The Arts Capsule ...|The Arts Capsule ...|
|Felt Wall Art, Bl...|Felt Wall Art, Bl...|
|Minted for west e...|Minted for west e...|
|Erik Barthels Pri...|Erik Barthels Pri...|
|The Arts Capsule ...|The Arts C

In [7]:
def tfidf_pipeline():
    tokenizer = Tokenizer(inputCol="combo", outputCol="words")
    hashingTF = HashingTF(inputCol='words', outputCol="rawFeatures", numFeatures=20)
    idf = IDF(inputCol='rawFeatures', outputCol="features")
    pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
    return pipeline

In [8]:
pipeline = tfidf_pipeline()
features_df = pipeline.fit(spark_df).transform(spark_df)

In [9]:
def get_kmeans_rec(dataset,item_id, num_recs = 3):
    kmeans = KMeans(k=10)
    model = kmeans.fit(dataset)
    result = model.transform(dataset)
    labels = result.select('prediction')
    item_cluster_label = result.filter(col('vendor_variant_id') == str(item_id)).select('prediction').collect()[0][0]
    cluster_members = result.filter(col('prediction') == item_cluster_label)
    cluster_members.select('product_title').show(num_recs)
    return result

def get_centers(model):   
    # Evaluate clustering by computing Within Set Sum of Squared Errors.
    wssse = model.computeCost(dataset)
    print("Within Set Sum of Squared Errors = " + str(wssse))

    # Shows the result.
    centers = model.clusterCenters()
    print("Cluster Centers: ")
    for center in centers:
        print(center)


In [10]:
result = get_kmeans_rec(features_df, 7432702)

+--------------------+
|       product_title|
+--------------------+
|Native Maps - Aus...|
|Sarah Campbell Wa...|
|Framed Canvas Pri...|
+--------------------+
only showing top 3 rows



In [11]:
item_id = 7432702
result.filter(col('vendor_variant_id') == str(item_id)).select('product_title','weblink').collect()[0][1]

'http://www.ballarddesigns.com/antique-aviary-giclee-prints/wall-decor/all-art/225658'

In [20]:
min = 100.00
max = 500.00
restrict_min = spark_df.where(col('sale_price') > min)
restricted = restrict_min.where(col('sale_price') < max)

In [21]:
restricted

DataFrame[vendor_variant_id: bigint, vendor_id: bigint, product_title: string, product_description: string, vendor_name: string, taxonomy_name: string, taxonomy_id: double, weblink: string, color: string, material: string, pattern: string, is_returnable: boolean, ship_surcharge: double, is_assembly_required: boolean, is_feed: bigint, commission_tier: string, inventory_type: string, division: string, category: string, price: double, sale_price: double, combo: string]

In [22]:
item_row = spark_df.filter(col('vendor_variant_id') == str(item_id))

In [23]:
item_row

DataFrame[vendor_variant_id: bigint, vendor_id: bigint, product_title: string, product_description: string, vendor_name: string, taxonomy_name: string, taxonomy_id: double, weblink: string, color: string, material: string, pattern: string, is_returnable: boolean, ship_surcharge: double, is_assembly_required: boolean, is_feed: bigint, commission_tier: string, inventory_type: string, division: string, category: string, price: double, sale_price: double, combo: string]

In [24]:
restricted.count()

105697

In [25]:
new_restricted = restricted.union(item_row)

In [26]:
new_restricted.count()

105698

In [27]:
spark.stop()