In [1]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import GaussianMixture

In [2]:
spark = SparkSession.builder.appName('Project').getOrCreate()

In [3]:
store_sales=spark.read.csv("store_sales.tbl", inferSchema = True, header = True, sep = '|')

In [4]:
items=spark.read.csv("items.tbl", inferSchema = True, header = True, sep = '|')

In [5]:
store_sales.show()

+-----------------+--------------+-----------+----------+----------+-------------------+
|ss_transaction_id|ss_customer_id|ss_store_id|ss_item_id|ss_quality|              ss_ts|
+-----------------+--------------+-----------+----------+----------+-------------------+
|             2582|             0|        153|       745|         2|2013-07-21 01:17:01|
|             2582|             0|        153|       651|         2|2013-07-21 01:17:01|
|             2582|             0|        153|       967|         1|2013-07-21 01:17:01|
|             2582|             0|        153|        10|         3|2013-07-21 01:17:01|
|             2582|             0|        153|       301|         3|2013-07-21 01:17:01|
|             2582|             0|        153|       176|         3|2013-07-21 01:17:01|
|             2582|             0|        153|       967|         2|2013-07-21 01:17:01|
|             2582|             0|        153|       990|         2|2013-07-21 01:17:01|
|             2582|  

In [6]:
items.show()

+---------+---------+-------------+---------------+-------+------------+----------+
|i_item_id|   i_name|i_category_id|i_category_name|i_price|i_comp_price|i_class_id|
+---------+---------+-------------+---------------+-------+------------+----------+
|        0|item#0000|            0|         cat#00|  53.09|       28.33|         2|
|        1|item#0001|            2|         cat#02|  28.89|       35.29|         8|
|        2|item#0002|            5|         cat#05|    4.7|        8.98|        15|
|        3|item#0003|            7|         cat#07|   79.5|       87.26|         1|
|        4|item#0004|           10|         cat#10|  55.31|       98.76|        10|
|        5|item#0005|           13|         cat#13|  31.11|       30.29|         5|
|        6|item#0006|           15|         cat#15|   6.92|        11.5|        19|
|        7|item#0007|           18|         cat#18|  81.72|       69.42|        19|
|        8|item#0008|            1|         cat#01|  57.53|       88.45|    

In [29]:
items.show()

+---------+---------+-------------+---------------+-------+------------+----------+
|i_item_id|   i_name|i_category_id|i_category_name|i_price|i_comp_price|i_class_id|
+---------+---------+-------------+---------------+-------+------------+----------+
|        0|item#0000|            0|         cat#00|  53.09|       28.33|         2|
|        1|item#0001|            2|         cat#02|  28.89|       35.29|         8|
|        2|item#0002|            5|         cat#05|    4.7|        8.98|        15|
|        3|item#0003|            7|         cat#07|   79.5|       87.26|         1|
|        4|item#0004|           10|         cat#10|  55.31|       98.76|        10|
|        5|item#0005|           13|         cat#13|  31.11|       30.29|         5|
|        6|item#0006|           15|         cat#15|   6.92|        11.5|        19|
|        7|item#0007|           18|         cat#18|  81.72|       69.42|        19|
|        8|item#0008|            1|         cat#01|  57.53|       88.45|    

In [7]:
store_sales.createTempView('store_sales')

In [8]:
items.createTempView('items')

In [9]:
q="SELECT ss.ss_customer_id AS cid, count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7, count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,count(CASE WHEN i.i_class_id=12 THEN 1 ELSE NULL END) AS id12, count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16 FROM store_sales ss INNER JOIN items i ON ss.ss_item_id = i.i_item_id WHERE i.i_category_name IN ('cat#01','cat#02','cat#03','cat#04','cat#05','cat#06','cat#07','cat#08','cat#09','cat#10','cat#11','cat#12','cat#013','cat#14','cat#15') AND ss.ss_customer_id IS NOT NULL GROUP BY ss.ss_customer_id HAVING count(ss.ss_item_id) > 3"

In [10]:
df = spark.sql(q)

In [15]:
assembler = VectorAssembler(inputCols=["cid", "id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8", "id9", "id10"],outputCol ="features")

In [16]:
vd = assembler.transform(df)

In [17]:
vd.show()

+----+---+---+---+---+---+----+----+----+---+---+---+----+---+----+----+----+--------------------+
| cid|id1|id3|id5|id7|id9|id11|id13|id15|id2|id4|id6|id12|id8|id10|id14|id16|            features|
+----+---+---+---+---+---+----+----+----+---+---+---+----+---+----+----+----+--------------------+
| 833|  2|  2|  4|  2|  2|   2|   1|   1|  1|  1|  0|   2|  3|   2|   2|   2|[833.0,2.0,1.0,2....|
|1088|  1|  2|  1|  3|  1|   4|   2|   1|  3|  0|  1|   0|  3|   3|   2|   3|[1088.0,1.0,3.0,2...|
|1591|  0|  3|  1|  1|  1|   0|   2|   0|  0|  2|  1|   4|  0|   2|   2|   0|[1591.0,0.0,0.0,3...|
|1959|  0|  1|  0|  0|  1|   0|   0|   0|  0|  2|  0|   0|  0|   0|   0|   0|(11,[0,3,4,9],[19...|
|2866|  0|  3|  3|  4|  1|   1|   2|   1|  3|  2|  2|   2|  2|   3|   1|   0|[2866.0,0.0,3.0,3...|
|3175|  1|  1|  4|  1|  2|   0|   2|   0|  1|  2|  0|   2|  2|   2|   1|   0|[3175.0,1.0,1.0,1...|
|3794|  0|  8|  0|  2|  0|   0|   0|   3|  3|  0|  0|   4|  0|   1|   3|   1|(11,[0,2,3,7,10],...|
|3918|  0|

In [43]:
gmm = GaussianMixture().setK(2).setFeaturesCol('features').setSeed(538009335)

In [25]:
gmm.write().save("/home/liuba/Downloads/mlflow-master/examples/myGaussianModel")

In [44]:
model = gmm.fit(vd)

In [34]:
model.hasSummary

True

In [35]:
weights = model.weights

In [38]:
print(weights)

[0.5330573639100111, 0.46694263608998887]


In [42]:
mu = model.gaussiansDF.show()
print(mu)

NameError: name 'mean' is not defined

In [37]:
model.gaussiansDF.show()

+--------------------+--------------------+
|                mean|                 cov|
+--------------------+--------------------+
|[5515.22187832696...|1.001682665018740...|
|[5364.68276690749...|9670878.146828335...|
+--------------------+--------------------+



In [55]:
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[7.12346294e+03 7.80419580e-01 2.50349650e+00 2.33426573e+00
 9.11888112e-01 2.23916084e+00 1.06433566e+00 2.23776224e+00
 1.43356643e+00 1.33286713e+00 1.62797203e+00]
[584.45744681   0.7358156    2.59397163   2.27659574   0.92021277
   2.39539007   0.98404255   2.42553191   1.38475177   1.35638298
   1.79078014]
[1.01384828e+04 7.04827586e-01 2.60551724e+00 2.31586207e+00
 9.33793103e-01 2.27034483e+00 1.05517241e+00 2.44000000e+00
 1.47586207e+00 1.37517241e+00 1.69931034e+00]
[3.01176529e+03 7.71900826e-01 2.55371901e+00 2.46115702e+00
 9.14049587e-01 2.26942149e+00 1.12561983e+00 2.33719008e+00
 1.44132231e+00 1.39834711e+00 1.82644628e+00]
[5.66789359e+03 8.46938776e-01 2.62099125e+00 2.37755102e+00
 9.91253644e-01 2.36151603e+00 1.06997085e+00 2.43148688e+00
 1.53644315e+00 1.41107872e+00 1.79446064e+00]
[4.29899845e+03 7.88253478e-01 2.62287481e+00 2.41731066e+00
 1.00927357e+00 2.45285935e+00 1.04945904e+00 2.51004637e+00
 1.44976816e+00 1.38176198e+00 1.7465

In [57]:
mlflow.log_metric('center', center)

NameError: name 'mlflow' is not defined