In [9]:
url = "jdbc:mysql://localhost:3306/flashdeals"
properties = {
    "user": "root",
    "password": ""
}

In [6]:
# read a table from database.

product_clusters_df = spark.read.format("jdbc").options(
url = url,
driver="com.mysql.jdbc.Driver",
dbtable="product_clusters",
user="root",
password=""
).load()

In [7]:
product_clusters_df.show(5)

+---+----------+-------+--------------------+--------------------+
| id|product_id|cluster|          created_at|          updated_at|
+---+----------+-------+--------------------+--------------------+
|  2|         2|      1|2016-11-21 06:05:...|2016-11-21 06:05:...|
|  3|         2|      1|2016-11-21 06:05:...|2016-11-21 06:05:...|
|  4|         2|      1|2016-11-21 06:05:...|2016-11-21 06:05:...|
|  6|      null|      1|2016-11-21 06:05:...|2016-11-21 06:05:...|
|  7|      null|      1|2016-11-21 07:04:...|2016-11-21 07:04:...|
+---+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [2]:
# read a table from database.

products_df = sqlContext.read.format("jdbc").options(
url = url,
driver="com.mysql.jdbc.Driver",
dbtable="products",
user="root",
password=""
).load()

In [11]:
# read a table from database.

product_cluster_df = sqlContext.read.format("jdbc").options(
url = url,
driver="com.mysql.jdbc.Driver",
dbtable="product_clusters",
user="root",
password=""
).load()

In [19]:
products_df.count()

1

In [4]:
products_df.dtypes

[('id', 'int'),
 ('name', 'string'),
 ('description', 'string'),
 ('created_at', 'timestamp'),
 ('updated_at', 'timestamp'),
 ('detail_page_url', 'string'),
 ('manufacturer', 'string'),
 ('list_price', 'int'),
 ('title', 'string'),
 ('small_image_url', 'string'),
 ('medium_image_url', 'string'),
 ('price', 'int'),
 ('amount_saved', 'int'),
 ('percentage_saved', 'int'),
 ('is_supersaver_shipping', 'boolean'),
 ('is_prime', 'boolean'),
 ('ASIN', 'string'),
 ('merchant_id', 'int'),
 ('catalog_id', 'int'),
 ('dealer_id', 'int')]

In [20]:
product_cluster_df.dtypes

[('id', 'int'),
 ('product_id', 'int'),
 ('cluster', 'int'),
 ('created_at', 'timestamp'),
 ('updated_at', 'timestamp')]

In [21]:
product_cluster_df.take(1)

[Row(id=1, product_id=1, cluster=0, created_at=datetime.datetime(2016, 11, 21, 6, 5, 32), updated_at=datetime.datetime(2016, 11, 21, 6, 5, 32))]

In [5]:
# simple operation
categories_df = products_df.select(["id", "catalog_id"])
categories_df.show(10)

+----+----------+
|  id|catalog_id|
+----+----------+
|2245|        19|
|2246|        19|
|2247|        19|
|2248|        19|
|2249|        19|
|2250|        19|
|2251|        19|
|2252|        19|
|2253|        19|
|2264|        19|
+----+----------+
only showing top 10 rows



In [8]:
spark.read.jdbc(url=url, table="product_clusters", properties=properties)

DataFrame[id: int, product_id: int, cluster: int, created_at: timestamp, updated_at: timestamp]

In [6]:
# another way to read. see readme for pyspark options for this to work.
products_df = sqlContext.read.jdbc(url = url, table="products", properties=properties)
products_df.count()

46

In [9]:
# save categories_df as a new table in db
categories_df.select('id').write.jdbc(url=url, table="categories", mode = "append", properties=properties)

In [26]:
# overwrite mode works without error, but it doesn't save anything to the table.\
# if using append mode, id needs to be unique to avoid errors. Still, nothing was saved to the table.
product_cluster_df.select('*').write.jdbc(url=url, table="product_clusters", mode = "append", properties=properties)

In [35]:
# this works. need to satisfy two things at least:
# * id is unique
# * schema match with non-nullable data filled
from pyspark.sql import Row
from datetime import date, datetime
new_cluster_df = sqlContext.createDataFrame([Row(id=9, product_id=2, cluster=1, created_at=datetime(2016, 11, 21, 6, 5, 32), updated_at=datetime(2016, 11, 21, 6, 5, 32))])
new_cluster_df.write.jdbc(url=url, table="product_clusters", mode = "append", properties=properties)

In [66]:
product_cluster_df.agg({"id": "max"}).collect()[0]['max(id)']

7

In [12]:
# this works
from pyspark.sql import Row
from datetime import date, datetime
new_id = product_cluster_df.agg({"id": "max"}).collect()[0]['max(id)'] + 1
new_cluster_df = sqlContext.createDataFrame([Row(id=new_id, product_id=5, cluster=1, created_at=datetime.now(), updated_at=datetime.now())])
new_cluster_df.write.jdbc(url=url, table="product_clusters", mode = "append", properties=properties)

### Pipeline

* Tokenizer of lower case
* StopWordsRemover
* HashingTF
* IDF
* Normalizer
* kmean

### Tokenize the title

In [12]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="title", outputCol="title_words")
words_df = tokenizer.transform(products_df)
words_df.select(["title", "title_words"]).show(1)

+--------------------+--------------------+
|               title|         title_words|
+--------------------+--------------------+
|InnoGear® 100ml A...|[innogear®, 100ml...|
+--------------------+--------------------+
only showing top 1 row



### Remove stop words

In [13]:
from pyspark.ml.feature import StopWordsRemover
stopwords = StopWordsRemover(inputCol="title_words", outputCol="title_words_filtered")
words_filter_df = stopwords.transform(words_df)
words_filter_df.select(["title", "title_words", "title_words_filtered"]).show(1, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title                                                                                                                                                                                              |title_words                                                                                                                                                                  

### HashingTF and IDF

In [14]:
from pyspark.ml.feature import HashingTF, IDF, CountVectorizer

tf = HashingTF(inputCol="title_words_filtered", outputCol="title_tf", numFeatures=16)
words_filter_tf_df = tf.transform(words_filter_df)
words_filter_tf_df.count()

# Can also use CountVectorizer which allows easier inspection of the bag of words.

idf = IDF(inputCol="title_tf", outputCol="title_tf_idf")
words_filter_tf_idf_df = idf.fit(words_filter_tf_df).transform(words_filter_tf_df)
words_filter_tf_idf_df.select(["title_tf", "title_tf_idf"]).show(3, truncate = False)

+----------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title_tf                                                                    |title_tf_idf                                                                                                                                                                                                                                    |
+----------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(16,[0,2,3,4,5,6,7,8,9,14,15],[2.0,1.0,

### Normalize

In [15]:
from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol="title_tf_idf", outputCol="title_tf_idf_norm")
words_filter_tf_idf_normalize_df = normalizer.transform(words_filter_tf_idf_df)
words_filter_tf_idf_normalize_df.select(["title_tf_idf", "title_tf_idf_norm"]).show(2, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title_tf_idf                                                                                                                                                                                                                                    |title_tf_idf_norm                                                                                                                                                                                                                                    |
+---------------------

### kmean

In [16]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="title_tf_idf_norm", k=2, seed=20)
model = kmeans.fit(words_filter_tf_idf_normalize_df)
words_kmeans_df = model.transform(words_filter_tf_idf_normalize_df)
words_kmeans_df.groupBy('catalog_id').avg().select("avg(prediction)").show(46)

+------------------+
|   avg(prediction)|
+------------------+
|0.3076923076923077|
|              0.65|
+------------------+



In [17]:
model.save('kmeans_model_only')

In [20]:
load_kmeans = KMeansModel.load('kmeans_model_only')

NameError: name 'KMeansModel' is not defined

In [80]:
words_kmeans_df.select(["catalog_id", "prediction"]).show(46)

+----------+----------+
|catalog_id|prediction|
+----------+----------+
|        19|         0|
|        19|         1|
|        19|         1|
|        19|         0|
|        19|         1|
|        19|         1|
|        19|         0|
|        19|         0|
|        19|         0|
|        21|         0|
|        21|         1|
|        21|         0|
|        21|         0|
|        21|         1|
|        21|         1|
|        21|         1|
|        21|         0|
|        21|         1|
|        21|         0|
|        19|         0|
|        21|         1|
|        19|         0|
|        19|         0|
|        21|         1|
|        21|         0|
|        19|         1|
|        21|         0|
|        19|         1|
|        21|         1|
|        19|         0|
|        19|         1|
|        19|         0|
|        19|         0|
|        19|         0|
|        19|         0|
|        21|         1|
|        21|         1|
|        21|         1|
|        21|    

### Write the whole process in a pipeline

In [75]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Normalizer
from pyspark.ml.clustering import KMeans


step = 0 

step += 1
tokenizer = Tokenizer(inputCol="title", outputCol=str(step) + "_tokenizer")

step += 1
stopwords = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol=str(step) + "_stopwords")

step += 1
tf = HashingTF(inputCol=stopwords.getOutputCol(), outputCol=str(step) + "_tf", numFeatures=160)

step += 1
idf = IDF(inputCol=tf.getOutputCol(), outputCol=str(step) + "_idf")

step += 1
normalizer = Normalizer(inputCol=idf.getOutputCol(), outputCol=str(step) + "_normalizer")

step += 1
kmeans = KMeans(featuresCol=normalizer.getOutputCol(), predictionCol = str(step) + "_kmeans", k=2, seed=20)

kmeans_pipeline = Pipeline(stages=[tokenizer, stopwords, tf, idf, normalizer, kmeans])

model = kmeans_pipeline.fit(products_df)
words_prediction = model.transform(products_df)

In [77]:
words_prediction.groupBy('catalog_id').avg().select("avg(" + stages[-1].getPredictionCol() + ")").show()

+-------------------+
|      avg(6_KMeans)|
+-------------------+
|0.19230769230769232|
|               0.05|
+-------------------+



In [78]:
new_df.

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_sc',
 '_schema',
 '_sort_cols',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'coalesce',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createOrReplaceTempView',
 'createTempView',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'foreachPartition',
 'freqItems',
 'groupBy',
 'groupby',
 'head',
 'intersect',
 'isLocal',
 'isStreaming',
 'is_cached',
 'join',
 'limit',
 'na',
 'orderBy',
 'persist',
 'printSchema',
 'randomSplit',
 'rdd',
 'registerTempTable',
 'repartition',
 'replace',
 'rollup',
 '

#### Another way to write the pipeline

In [6]:
feature = "title"
stages=[Tokenizer(), StopWordsRemover(), HashingTF(numFeatures=32), IDF(),
                                   Normalizer(), KMeans(k = 2, seed = 20)]
stages[0].setInputCol(feature)
stages[0].setOutputCol("1_"+stages[0].__class__.__name__)

for (i, s) in enumerate(stages[1:-1]):
    i = i + 1
    s.setInputCol(stages[i-1].getOutputCol())
    s.setOutputCol(str(i+1)+"_"+s.__class__.__name__)

stages[-1].setFeaturesCol(stages[-2].getOutputCol())
stages[-1].setPredictionCol(str(len(stages)) + "_" + stages[-1].__class__.__name__)

pipeline = Pipeline(stages=stages)
model = pipeline.fit(products_df)
prediction = model.transform(products_df)

In [29]:
loaded_pipeline = pyspark.ml.PipelineModel.load('kmeans')

In [36]:
loaded_pipeline.transform(products_df).select("6_kmeans").show(5) # test loaded pipeline

+--------+
|6_kmeans|
+--------+
|       0|
|       1|
|       1|
|       0|
|       1|
+--------+
only showing top 5 rows



In [43]:
loaded_pipeline.transform(products_df.filter(products_df.catalog_id == 21)).select("6_kmeans").show(5) # fit only part of the products_df

+--------+
|6_kmeans|
+--------+
|       0|
|       1|
|       0|
|       0|
|       1|
+--------+
only showing top 5 rows



In [57]:
# fit with any input with a title field
test_df = sqlContext.createDataFrame([{'title': 'a b c'}])
loaded_pipeline.transform(test_df).select("6_kmeans").show(5) 

+--------+
|6_kmeans|
+--------+
|       1|
+--------+



In [56]:
test_df.collect()

[Row(title=u'a b c')]

In [120]:
prediction.groupBy('catalog_id').avg().select("avg(" + stages[-1].getPredictionCol() + ")").show()

+------------------+
|     avg(6_KMeans)|
+------------------+
|0.3076923076923077|
|              0.65|
+------------------+



In [69]:
def parse(lp):
    uid = lp[lp.find('(') + 1: lp.find(')')]
    title = lp[lp.find('[') + 1: lp.find(']')]
    return uid, title 

In [73]:
id_title = parse('(333) [a b c]')

In [74]:
id_title[1]

'a b c'