In [1]:
import findspark
findspark.init()
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

from pyspark import SparkContext
sc=SparkContext()

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.window import Window

In [3]:
s_df = sqlContext.read.csv("big_data_log/big_data-2019-02-*.log", sep=';').cache()

In [4]:
s_products = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='products',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()

categories = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='categories',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load() 
          
products_categories = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='products_categories',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()
          
categories_content = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='categories_content',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()
          
products_content = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='products_content',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()

orders = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='orders',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()

order_items = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://10.1.3.244/tm_datasc',
          driver='com.mysql.jdbc.Driver',
          dbtable='order_items',
          user='datasc',
          password='scdata',
          serverTimezone='GMT',
          zeroDateTimeBehavior='CONVERT_TO_NULL').load()

In [5]:
s_df = s_df.selectExpr('_c0 as data_id', 
                   '_c1 as data_type', 
                   '_c2 as view_type',
                   '_c3 as customer_id',
                   '_c4 as vote_index',
                   '_c5 as added_qty',
                   '_c6 as token',
                   '_c7 as log_time',
                   '_c8 as session_id',
                   '_c9 as ip_address',
                   '_c10 as device_type',
                   '_c11 as click_success')

s_df = s_df.select(s_df.data_id.cast("integer"),
               s_df.view_type.cast("integer"),
               s_df.data_type.cast("integer"),
               s_df.customer_id.cast("integer"),
               s_df.added_qty.cast("integer"),
               s_df.token.cast("string"),
               s_df.log_time.cast("string"),
               s_df.session_id.cast("string"),
               s_df.ip_address.cast("string"),
               s_df.device_type.cast("integer"),
               s_df.click_success.cast("integer"))


s_df = s_df.fillna({'click_success':'0'})
s_df = s_df.fillna({'added_qty':'0'})
s_df = s_df.fillna({'data_id':'0'})
s_df = s_df.fillna({'view_type':'0'})
s_df = s_df.fillna({'data_type':'0'})

s_df = s_df.filter(s_df.click_success<3)
s_df = s_df.filter(s_df.added_qty<100)
s_df = s_df.filter(s_df.view_type<25)
s_df = s_df.filter(s_df.data_type<25)
s_df = s_df.filter(s_df.device_type<2)
s_df = s_df.filter(s_df.click_success<3)


s_df = s_df.withColumn('log_time', unix_timestamp("log_time", "yyyy-MM-dd HH:mm:ss").cast('timestamp').alias("log_time"))
s_df = s_df.withColumnRenamed('customer_cookie_token', 'token')
s_df = s_df.drop('id')
s_df = s_df.cache()

break_point = '2017-04-29 00:00:00'

s_df = s_df.filter(s_df.log_time>break_point)
s_df = s_df.withColumn('date', s_df['log_time'].cast('date'))
s_df = s_df.withColumn('prod', F.when(s_df.data_type == 1, s_df.data_id).otherwise(0))
s_df = s_df.withColumn('cat', F.when(s_df.data_type == 7, s_df.data_id).otherwise(0))
s_df = s_df.withColumn('class', F.when(s_df.click_success == 1, s_df.click_success).otherwise(0))

s_df = s_df.dropDuplicates()


s_df = s_df.withColumn('year', year("log_time").alias('year'))
s_df = s_df.withColumn('month', month("log_time").alias('month'))
s_df = s_df.withColumn('wday', date_format('date', 'u'))
s_df = s_df.withColumn('hour', hour("log_time").alias('hour'))
s_df = s_df.withColumn('minute', minute("log_time").alias('minute'))
s_df = s_df.select(s_df.data_id,
               s_df.view_type,
               s_df.data_type,
               s_df.customer_id,
               s_df.added_qty,
               s_df.token,
               s_df.log_time,
               s_df.session_id,
               s_df.ip_address,
               s_df.device_type,
               s_df.click_success,
               s_df.date,
               s_df.prod.cast("integer"),
               s_df.cat.cast("integer"),
               s_df.year.cast("integer"),
               s_df.month.cast("integer"),
               s_df.wday.cast("integer"),
               s_df.hour.cast("integer"),
               s_df.minute.cast("integer"))
s_df = s_df.cache()

s_df = s_df.filter("token is not NULL")
s_df = s_df.filter("session_id is not NULL")
s_df = s_df.filter("log_time is not NULL")
s_df = s_df.filter("data_type is not NULL")
s_df = s_df.filter("view_type is not NULL")
s_df = s_df.filter("added_qty is not NULL")
s_df = s_df.filter("year is not NULL")
s_df = s_df.filter("month is not NULL")
s_df = s_df.filter("wday is not NULL")
s_df = s_df.filter("hour is not NULL")
s_df = s_df.filter("minute is not NULL")
s_df = s_df.filter("prod is not NULL")

In [6]:
categories = categories_content.select('category_id', 'name')
categories = categories.withColumnRenamed("name", "category_name")

products_content = products_content.select('product_id', 'name', 'type')
products_content = products_content.withColumnRenamed("name", "product_name")

s_products = s_products.select('product_id', 'brand', 'cena_niza_redovna', 'status')
s_products = s_products.withColumnRenamed("cena_niza_redovna", "price")
s_products = s_products.filter('status==1')

In [7]:
s_products = s_products.alias('a').join(products_content.alias('b'), s_products['product_id']==products_content['product_id'], 'inner').select('a.product_id', 'b.product_name', 'b.type', 'a.brand', 'a.price')

s_products = s_products.alias('a').join(products_categories.alias('b'), s_products['product_id']==products_categories['product_id'], 'inner').select('a.product_id', 'a.product_name', 'a.type', 'b.category_id', 'a.brand', 'a.price')

s_products = s_products.alias('a').join(categories.alias('b'), s_products['category_id']==categories['category_id'], 'inner').select('a.product_id', 'a.product_name', 'a.type', 'b.category_id', 'b.category_name','a.brand', 'a.price')

In [8]:
s_products.show()

+----------+--------------------+-------------+-----------+----------------+---------+-----+
|product_id|        product_name|         type|category_id|   category_name|    brand|price|
+----------+--------------------+-------------+-----------+----------------+---------+-----+
|     54114|Energizer baterij...|        LAMPA|        474|Baterijske lampe|Energizer|  919|
|     54116|Energizer baterij...|        LAMPA|        474|Baterijske lampe|Energizer|  959|
|     54117|Energizer baterij...|        LAMPA|        474|Baterijske lampe|Energizer|  619|
|     54113|Energizer baterij...|        LAMPA|        474|Baterijske lampe|Energizer|  999|
|     35515|  Reflecta LED lampa|        LAMPA|        474|Baterijske lampe| Reflecta|  199|
|    135011|INTEX Dušek na na...|       Duseci|       1677|          Dušeci|    Intex| 6999|
|    135007|INTEX Krevet na n...|       Duseci|       1677|          Dušeci|    Intex| 4999|
|    135013|INTEX Fotelja 1.2...|       Duseci|       1677|          D

In [8]:
tokens = s_df.select('token').distinct()
tokens = tokens.withColumn("user", monotonically_increasing_id())

tokens = tokens.cache()

s_df = s_df.alias('a').join(tokens.alias('b'), s_df['token'] == tokens['token'], 'left')\
.select('a.data_id', 'a.view_type', 'a.data_type', 'a.customer_id', 'a.added_qty', 'a.token', 'b.user', 'a.log_time', 'a.ip_address', 'a.session_id', 'a.device_type', 'a.click_success', 'a.prod', 'a.cat', 'a.date','a.year', 'a.month', 'a.wday', 'a.hour', 'a.minute')\
.cache()

In [9]:
s_df.count()

1670512

In [10]:
s_products.columns

['product_id',
 'product_name',
 'type',
 'category_id',
 'category_name',
 'brand',
 'price']

In [11]:
orders = orders.select('order_id', 'customer_id', 'total_value')

In [12]:
order_items = order_items.select('order_item_id', 'order_id', 'product_id', 'name', 'price')

In [13]:
order_items = order_items.join(s_products, ['product_id'], 'inner').select('order_item_id', 'order_id', 'product_id', 'product_name', s_products['price'], 'category_id', 'category_name')

In [14]:
order_items.show()

+-------------+--------+----------+--------------------+-----+-----------+--------------------+
|order_item_id|order_id|product_id|        product_name|price|category_id|       category_name|
+-------------+--------+----------+--------------------+-----+-----------+--------------------+
|       577708|  431766|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       879493|  682939|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       909992|  707726|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       924841|  719593|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       970279|  755479|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       975925|  760189|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|       976611|  760701|     40634|Samsung mikrotala...| 9888|       1024|Samostalne mikrot...|
|      1000628|  780110|     40634|Samsu

In [15]:
s_df = s_df.select('data_id', 'view_type', "data_type", 'customer_id', 'token', 'log_time', 'session_id', 'device_type', 'prod', 'cat', 'date')

In [16]:
s_df = s_df.filter("data_type==11")

In [18]:
s_df = s_df.join(orders, orders['order_id']==s_df['data_id'], "inner").select(
'data_id', orders['customer_id'], 'token','total_value', 'date').join(order_items, s_df['data_id']==order_items['order_id'], 'inner').select(
    'data_id', 'customer_id', 'token', 'date', 'order_id', 'total_value', 'product_id', 'product_name','category_id', 'category_name', 'price')

In [19]:
s_df.show()

+-------+-----------+--------------------+----------+--------+-----------+----------+--------------------+-----------+--------------------+------+
|data_id|customer_id|               token|      date|order_id|total_value|product_id|        product_name|category_id|       category_name| price|
+-------+-----------+--------------------+----------+--------+-----------+----------+--------------------+-----------+--------------------+------+
| 996315|     545433|v7cov2kbu0d4f0t9b...|2019-02-04|  996315|     199990|    134124|LG SMART Televizo...|        578|          Televizori|199990|
| 998409|          0|mlg5roj6q9pcol9qn...|2019-02-06|  998409|       5780|    125257|BOSCH ketler TWK ...|       1134|      Kuvala za vodu|  3990|
| 998409|          0|mlg5roj6q9pcol9qn...|2019-02-06|  998409|       5780|    125424|VOX aparat za sen...|       1090| Tosteri za sendviče|  1790|
|1000502|          0|6bcltfrbr171ckbei...|2019-02-08| 1000502|       2499|    132410|GORENJE &Scaron;t...|        168|

In [20]:
window = Window.partitionBy(s_df['token']).orderBy(s_df['date'].asc())

first_transaction = s_df.select(col('token'), col('date'), row_number().over(window).alias('row_number')) \
  .where(col('row_number') <= 1).drop('row_number')
first_transaction = first_transaction.withColumnRenamed('date', 'first_transaction')

In [21]:
window = Window.partitionBy(s_df['token']).orderBy(s_df['date'].desc())

last_transaction = s_df.select(col('token'), col('date'), row_number().over(window).alias('row_number')) \
  .where(col('row_number') <= 1).drop('row_number')
last_transaction = last_transaction.withColumnRenamed('date', 'last_transaction')

In [22]:
#shopping_frequency (num of orders per customer_id)
frequency = s_df.groupBy("token").count().withColumnRenamed("count", "frequency")

In [23]:
#recency (difference between last transaction and today)
recency = last_transaction.withColumn("current_date", F.date_format(F.current_date(), "y-M-d")).withColumn("date", F.date_format(last_transaction.last_transaction, "y-M-d"))
recency = recency.withColumn("days_between", F.datediff(recency.current_date, recency.last_transaction))
recency = recency.withColumnRenamed('days_between','recency').drop('last_transaction', 'current_date')

In [24]:
length = first_transaction.join(last_transaction, ['token'], 'inner')
length = length.withColumn("length", F.datediff(length.last_transaction, length.first_transaction))

In [35]:
s_df.show()

+-------+-----------+--------------------+----------+--------+-----------+----------+--------------------+-----------+--------------------+------+
|data_id|customer_id|               token|      date|order_id|total_value|product_id|        product_name|category_id|       category_name| price|
+-------+-----------+--------------------+----------+--------+-----------+----------+--------------------+-----------+--------------------+------+
| 996315|     545433|v7cov2kbu0d4f0t9b...|2019-02-04|  996315|     199990|    134124|LG SMART Televizo...|        578|          Televizori|199990|
| 998409|          0|mlg5roj6q9pcol9qn...|2019-02-06|  998409|       5780|    125257|BOSCH ketler TWK ...|       1134|      Kuvala za vodu|  3990|
| 998409|          0|mlg5roj6q9pcol9qn...|2019-02-06|  998409|       5780|    125424|VOX aparat za sen...|       1090| Tosteri za sendviče|  1790|
|1000502|          0|6bcltfrbr171ckbei...|2019-02-08| 1000502|       2499|    132410|GORENJE &Scaron;t...|        168|

In [37]:
monetary = s_df.select('token', 'order_id', 'total_value')
monetary = monetary.groupBy("token").sum()
monetary = monetary.drop("sum(token)").withColumnRenamed("sum(total_value)", "monetary")

In [38]:
print("Monetary: ",monetary.count())
print("Frequency: ",frequency.count())
print("Length: ",length.count())
print("Recency: ",recency.count())

Monetary:  1532
Frequency:  1532
Length:  1532
Recency:  1532


In [39]:
lrfm = frequency.join(length, ['token'], 'inner').join(recency, ['token'], 'inner').join(monetary, ['token'], 'left').fillna(0)
lrfm = lrfm.drop("first_transaction", "last_transaction", "log_date")

In [40]:
lrfm.show()

+--------------------+---------+------+---------+-------+-------------+--------+
|               token|frequency|length|     date|recency|sum(order_id)|monetary|
+--------------------+---------+------+---------+-------+-------------+--------+
|5de71ve7vd3mu6oov...|        1|     0|2019-1-31|     13|       994003|    5499|
|5dkii9k1m4etjr5if...|        2|     0| 2019-2-9|      4|      2002314|   84960|
|5kq1dic520hi5edsk...|        1|     0|2019-1-31|     13|       993614|   15102|
|aho59b24j96b619sr...|        1|     0| 2019-2-2|     11|       995055|   48990|
|jd3qu36a4bm4dnpre...|        1|     0|2019-2-10|      3|      1001839|    6392|
|tiu8886h4l0ie6och...|        1|     0| 2019-2-6|      7|       998095|    5999|
|f35d8e418c9c67df3...|        1|     0| 2019-2-6|      7|       998197|   30990|
|h583qa2nq7i33ta9m...|        1|     0| 2019-2-6|      7|       997455|    3190|
|j491831ms9d83brsl...|        1|     0| 2019-2-6|      7|       997591|    8988|
|kr3ml8339f4sgtofi...|      

In [41]:
pseudo_products = sqlContext.read.csv("cbr.csv", sep=',', header = True).cache()

In [43]:
s_df.columns

['data_id',
 'customer_id',
 'token',
 'date',
 'order_id',
 'total_value',
 'product_id',
 'product_name',
 'category_id',
 'category_name',
 'price']

In [46]:
customer_category = s_df.select('token', 'product_id', 'category_name')

In [47]:
grouped = customer_category.groupBy("token", "category_name").count()
pivoted = grouped.groupBy("token").pivot("category_name").agg(F.sum(F.col("count")))
pivoted = pivoted.na.fill(0)

In [48]:
pivoted = pivoted.join(lrfm, ['token'], 'inner')

In [49]:
pivoted.columns

['token',
 'Adapteri za auto',
 'Akumulatori',
 'Alati za baštu',
 'Aparati za brijanje',
 'Aparati za espresso',
 'Aparati za galete',
 'Aparati za kokice',
 'Aparati za kuvanje na pari',
 'Aparati za pravljenje jogurta',
 'Aparati za pritisak',
 'Aparati za vertikalno peglanje',
 'Audio zvučnici',
 'Auto punjači za mobilne telefone',
 'Auto radio',
 'Auto zvučnici',
 'Balansirajući skuter',
 'Bežični telefoni',
 'Bežični zvučnici',
 'Bluetooth uređaji',
 'Bokali za filtriranje vode',
 'Bušilice i odvrtači',
 'CD',
 'Cediljke i citrusete',
 'DSLR fotoaparati',
 'DVD plejeri',
 'Daljinski upravljači',
 'Daske i oprema za pegle',
 'Dehidratori',
 'Depilatori',
 'Desktop računari',
 'Diktafoni',
 'Dodaci za blendere',
 'Dodatna oprema za aparate za kafu',
 'Dodatna oprema za računare',
 'Dodatna oprema za tablete',
 'Dodatna oprema za usisivače',
 'Dronovi',
 'Držači mobilnih telefona za auto',
 'Dušeci',
 'Džojstik',
 'Eksterni hard diskovi',
 'Električne džezve',
 'Električne četkice z

In [50]:
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import StandardScaler

In [55]:
vecAssembler = VectorAssembler(inputCols=['Adapteri za auto',
 'Akumulatori',
 'Alati za baštu',
 'Aparati za brijanje',
 'Aparati za espresso',
 'Aparati za galete',
 'Aparati za kokice',
 'Aparati za kuvanje na pari',
 'Aparati za pravljenje jogurta',
 'Aparati za pritisak',
 'Aparati za vertikalno peglanje',
 'Audio zvučnici',
 'Auto punjači za mobilne telefone',
 'Auto radio',
 'Auto zvučnici',
 'Balansirajući skuter',
 'Bežični telefoni',
 'Bežični zvučnici',
 'Bluetooth uređaji',
 'Bokali za filtriranje vode',
 'Bušilice i odvrtači',
 'CD',
 'Cediljke i citrusete',
 'DSLR fotoaparati',
 'DVD plejeri',
 'Daljinski upravljači',
 'Daske i oprema za pegle',
 'Dehidratori',
 'Depilatori',
 'Desktop računari',
 'Diktafoni',
 'Dodaci za blendere',
 'Dodatna oprema za aparate za kafu',
 'Dodatna oprema za računare',
 'Dodatna oprema za tablete',
 'Dodatna oprema za usisivače',
 'Dronovi',
 'Držači mobilnih telefona za auto',
 'Dušeci',
 'Džojstik',
 'Eksterni hard diskovi',
 'Električne džezve',
 'Električne četkice za zube',
 'Električni šporeti',
 'Epilatori',
 'FM transmiteri',
 'Fenovi za kosu',
 'Filteri za bokale za vodu',
 'Fitnes oprema',
 'Foto papir',
 'Friteze',
 'Frižideri sa jednim vratima',
 'Grejalice',
 'Grejni vikleri',
 'Handsfree uređaji',
 'Horizontalni zamrzivači',
 'Igre',
 'Inverter klime',
 'Jonizatori',
 'Kablovi za TV / AV',
 'Kablovi, adapteri i baterije',
 'Kafa',
 'Kertridži i toneri',
 'Kese za vakumiranje',
 'Kombinovani frižideri',
 'Kombinovani šporeti',
 'Kompaktni fotoaparati',
 'Konzole',
 'Kuhinjske vage',
 'Kuhinjski pribor',
 'Kuhinjski roboti',
 'Kuvala za vodu',
 'Laptop računari',
 'Masažeri',
 'Masažeri za stopala',
 'Mašine za meso',
 'Mašine za pranje i sušenje veša',
 'Mašine za pranje veša',
 'Mašine za sušenje veša',
 'Memorijske kartice',
 'Meteorološke stanice',
 'Mikrofoni',
 'Mikseri sa posudom',
 'Mini barovi',
 'Mini linije',
 'Mini šporeti',
 'Miševi',
 'Mlinovi za kafu',
 'Monitori',
 'Multi trimeri',
 'Multicookeri',
 'Multipraktici',
 'Mutilice za nes kafu',
 'Nosači i police',
 'Nutri blenderi',
 'Obavezna auto oprema',
 'Odvlaživači',
 'Oprema za frižidere',
 'Oprema za igrice',
 'Oprema za šivaće mašine',
 'Ostala oprema za telefone',
 'Ovlaživači',
 'Pametni satovi',
 'Parne stanice',
 'Paročistači',
 'Pekare',
 'Pekači i tepsije',
 'Podloge za miš',
 'Postolja za laptopove',
 'Posude za vakumiranje',
 'Power Bank',
 'Prekrivači',
 'Prenosna pojačala',
 'Prese za kosu',
 'Procesori',
 'Programi i softveri',
 'Radijatori',
 'Radio satovi',
 'Risiveri',
 'Robotski usisivači',
 'Roštilji i grilovi',
 'Ruteri',
 'Ručni mikseri',
 'Ručni usisivači',
 'Salamoreznice',
 'Samostalne mašine za pranje sudova',
 'Samostalne mikrotalasne rerne',
 'Samostalni aspiratori',
 'Set-top box tjuneri i risiveri',
 'Setovi posuđa',
 'Setovi ručnog alata',
 'Setovi za pedikir',
 'Sijalice',
 'Slušalice',
 'Smart telefoni',
 'Smoothie blenderi',
 'Sobne antene',
 'Sokovnici za hladno cedjenje',
 'Standardne klime',
 'Standardne pegle',
 'Standardne seckalice',
 'Standardni blenderi',
 'Standardni bojleri',
 'Standardni rešoi',
 'Standardni sokovnici',
 'Standardni telefoni',
 'Standardni tosteri',
 'Stone sudomašine',
 'Styleri',
 'Sušilice za veš',
 'Tableti',
 'Tastature',
 'Televizori',
 'Testere',
 'Tiganji',
 'Torbe i futrole',
 'Tosteri za sendviče',
 'Tranzistori i radio uređaji',
 'Trimeri za bradu',
 'Trimeri za kosu',
 'UPS i prenaponske zaštite',
 'USB flashevi',
 'Ugradne mašine za sudove',
 'Ugradne mikrotalasne rerne',
 'Ugradne ploče',
 'Ugradne rerne',
 'Ugradni aspiratori',
 'Ugradni frižideri',
 'Ugradni setovi',
 'Usisivači sa kesom za prašinu',
 'Usisivači sa posudom za prašinu',
 'Usisivači sa vodenim filterom',
 'Vage za merenje kofera',
 'Vage za merenje telesne težine',
 'Vakum aparati',
 'Vertikalni zamrzivači',
 'Wi-Fi, Bluetooth i USB adapteri',
 'Zvučnici',
 'Zvučnici za kompjuter',
 'iPod i MP3 plejeri',
 'Čitači kartica',
 'Šerpe i lonci',
 'Šivaće mašine',
 'Šporeti na čvrsto gorivo',
 'Štampači',
 'Štapni mikseri',
 'Štapni usisivači',
 'Žični telefoni',
 'frequency',
 'length',
 'recency',
 'monetary'], outputCol='features')

In [56]:
new_df = vecAssembler.transform(pivoted)

scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures')
scaler_model = scaler.fit(new_df)
scaled_data = scaler_model.transform(new_df)
scaled_data.select('features','scaledFeatures').show(truncate=False)

+----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+
|features                                            |scaledFeatures                                                                                                             |
+----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------+
|(192,[28,188,190,191],[1.0,1.0,13.0,5499.0])        |(192,[28,188,190,191],[10.473033799344616,1.1996917553612978,3.8822274689198806,0.09099423272331615])                      |
|(192,[65,127,188,190,191],[1.0,1.0,2.0,4.0,84960.0])|(192,[65,127,188,190,191],[13.870097086028382,11.339778224261645,2.3993835107225956,1.1945315288984248,1.4058683419117912])|
|(192,[152,188,190,191],[1.0,1.0,13.0,15102.0])      |(192,[152,188,190,191],[4.764881484343154,1.1996917

In [57]:
from pyspark.ml.clustering import BisectingKMeans

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(10).setSeed(1)
model = bkm.fit(scaled_data)

transformed = model.transform(scaled_data)

In [58]:
transformed.groupBy('prediction').count().sort(col('prediction').asc()).show(15)

+----------+-----+
|prediction|count|
+----------+-----+
|         0|  734|
|         1|  260|
|         2|  293|
|         3|  137|
|         4|   66|
|         5|   24|
|         6|    9|
|         7|    4|
|         8|    3|
|         9|    2|
+----------+-----+

