In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import random

import time

spark = SparkSession \
        .builder \
        .appName("Phone_Similarity") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [None]:
#import phone from mongodb


In [None]:
phone_data=spark.read.option("multiline","true").json('products.json')
phone_data.printSchema()
phone_data = phone_data.select('_id',
                                'title',
                                'category',
                                'color',
                                'memory',
                                'pin',
                                'ram',
                                'screenSize',
                                'status',
                                'price')

In [None]:
from pyspark.ml.feature import VectorAssembler
phone_data.columns

In [None]:
phone_data.head(5)

In [None]:
import pyspark.sql.functions as F 
categ = phone_data.select('category').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('category') == cat,1).otherwise(0).alias(str(cat)) for cat in categ]
phone_data = phone_data.select(exprs + phone_data.columns)
phone_data.columns

In [None]:
categ = phone_data.select('color').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('color') == cat,1).otherwise(0).alias(str(cat)) for cat in categ]
phone_data = phone_data.select(exprs + phone_data.columns)
phone_data.columns

In [None]:
categ = phone_data.select('status').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('status') == cat,1).otherwise(0).alias(str(cat)) for cat in categ]
phone_data = phone_data.select(exprs + phone_data.columns)
phone_data.columns

In [None]:
from pyspark.sql.types import DoubleType
changedTypedf = phone_data.withColumn("screenSize", phone_data["screenSize"].cast(DoubleType()))
changedTypedf.head(5)

In [None]:
assemble=VectorAssembler(inputCols=['99',
 'New',
 'Shiny Black',
 'Turquoise',
 'Silver',
 'Green',
 'Purple',
 'Blue',
 'White',
 'Gold',
 'Mint Green',
 'Black',
 'Red',
 'Pink',
 '6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'price'], outputCol='features')
assembled_data=assemble.transform(changedTypedf)
assembled_data.show(2)

In [None]:
from pyspark.ml.feature import StandardScaler

scale=StandardScaler(inputCol='features',outputCol='standardized')

data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)

data_scale_output.show(2)

In [None]:
data_scale_output.toPandas().columns

In [None]:
datad = data_scale_output.select('_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price', 'standardized')
datf = datad.toPandas()

In [None]:
datad_wi0_standard = data_scale_output.select(
 '99',
 'New',
 'Shiny Black',
 'Turquoise',
 'Silver',
 'Green',
 'Purple',
 'Blue',
 'White',
 'Gold',
 'Mint Green',
 'Black',
 'Red',
 'Pink',
 '6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'price',
 '_id', 'title', 'category', 'color', 'status', 'price')
datf_wi0_standard = datad_wi0_standard.toPandas()

In [None]:
datf.head(5)

In [None]:
datf.iloc[0]['standardized'].toArray()

In [None]:
datf.iloc[1]['standardized'].toArray()

In [None]:
len(datf['standardized'][0].toArray())

In [None]:
datf.iloc[0]['standardized']

In [None]:
#RMSE with standard
import numpy as np, pandas as pd
import matplotlib.pyplot as plt, seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

class PhoneSimilarity():
    def __init__(self, all_Data):
        self.all_Data_ = all_Data
    
    def phone_similarity(self, phone_id, amount=1):
        amount = amount + 1
        distances = []
        
        phone = self.all_Data_[(self.all_Data_._id == phone_id)].head(1).values[0]
        phone_row = self.all_Data_[(self.all_Data_._id == phone_id)].head(1)
        
        current_standardized_vector = phone[10].toArray()
        res_data = self.all_Data_[self.all_Data_._id != phone_id]
        countElement = 23 #23 of vector and 1 of predict
        for r_phone in tqdm(res_data.values):
            dist = 0
            standardized_vector = r_phone[10].toArray()
            for col in np.arange(23):
                dist = dist + np.square(float(current_standardized_vector[col]) - float(standardized_vector[col]))
            # dist = dist + np.square(float(phone[11]) - float(r_phone[11]))
            dist = dist / countElement
            dist = np.sqrt(dist)
            distances.append(dist)
        res_data['distance'] = distances
        phone_row['distance'] = 0
        res_data = res_data.sort_values('distance')
        bigdata = pd.concat([phone_row, res_data], ignore_index=True, sort=False)
        columns = ['_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price','distance']
        return bigdata[columns][:amount]

In [None]:
#euclidean
import numpy as np, pandas as pd
import matplotlib.pyplot as plt, seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

class PhoneSimilarity():
    def __init__(self, all_Data):
        self.all_Data_ = all_Data
    
    def phone_similarity(self, phone_id, amount=1):
        amount = amount + 1
        distances = []
        phone = self.all_Data_[(self.all_Data_._id == phone_id)].head(1).values[0]
        phone_row = self.all_Data_[(self.all_Data_._id == phone_id)].head(1)
        current_standardized_vector = np.array(phone[10].toArray())
        res_data = self.all_Data_[self.all_Data_._id != phone_id]
        countElement = 23 #23 of vector and 1 of predict
        for r_phone in tqdm(res_data.values):
            dist = 0
            standardized_vector = np.array(r_phone[10].toArray())
            dist = np.linalg.norm(current_standardized_vector-standardized_vector)
            distances.append(dist)
        res_data['distance'] = distances
        phone_row['distance'] = 0
        res_data = res_data.sort_values('distance')
        bigdata = pd.concat([phone_row, res_data], ignore_index=True, sort=False)
        columns = ['_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price','distance']
        return bigdata[columns][:amount]

In [None]:
#with standard
similarity = PhoneSimilarity(datf)
x = '61948b652d9fa1d9e7da2d3a'
similarity_phones = similarity.phone_similarity(x, 10)

In [None]:
print(similarity_phones)

In [None]:
print(similarity_phones)

In [None]:
kafka_topic_name = "clickcount"
kafka_bootstrap_servers = 'localhost:9092'

# Construct a streaming DataFrame that reads from topic
flower_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

In [None]:
def process_row(row):
    value = row['value'].decode("utf-8")
    first_element = value.split(',')[0]
    similarity_phones = similarity.phone_similarity(first_element, 10)
    print(similarity_phones)
    pass
query = flower_df.writeStream.foreach(process_row).start()