In [None]:
%pip install langchain==0.0.268 chromadb==0.3.26 sentence_transformers==2.2.2 InstructorEmbedding==1.0.1 fastapi pydantic==1.10.13 pyspark

In [None]:
!python3 --version
#checking python version

Python 3.10.12


In [None]:
%run config.py

In [None]:
%run functions.py

In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("customer_look_alike_modelling").getOrCreate()


In [None]:
# input_df = spark.read.csv("MOCK_DATA.csv")
# input_df.count()
input_df = spark.read.option("header", "true").option("inferSchema", "true").csv("./Train.csv")

In [None]:
input_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Ever_Married: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Graduated: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- Work_Experience: double (nullable = true)
 |-- Spending_Score: string (nullable = true)
 |-- Family_Size: double (nullable = true)
 |-- Var_1: string (nullable = true)
 |-- Segmentation: string (nullable = true)



In [None]:
input_df.show(20)
input_df.groupBy("Segmentation").count().show()

+------+------+------------+---+---------+-------------+---------------+--------------+-----------+-----+------------+
|    ID|Gender|Ever_Married|Age|Graduated|   Profession|Work_Experience|Spending_Score|Family_Size|Var_1|Segmentation|
+------+------+------------+---+---------+-------------+---------------+--------------+-----------+-----+------------+
|462809|  Male|          No| 22|       No|   Healthcare|            1.0|           Low|        4.0|Cat_4|           D|
|462643|Female|         Yes| 38|      Yes|     Engineer|           NULL|       Average|        3.0|Cat_4|           A|
|466315|Female|         Yes| 67|      Yes|     Engineer|            1.0|           Low|        1.0|Cat_6|           B|
|461735|  Male|         Yes| 67|      Yes|       Lawyer|            0.0|          High|        2.0|Cat_6|           B|
|462669|Female|         Yes| 40|      Yes|Entertainment|           NULL|          High|        6.0|Cat_6|           A|
|461319|  Male|         Yes| 56|       No|      

In [None]:
# Create row containing all columns in the format (column_name: column_value) except target

# rename to cols_to_convert
rows_to_convert = input_df.columns
rows_to_convert.remove("Segmentation")
rows_to_convert.remove("ID")
rows_to_convert

['Gender',
 'Ever_Married',
 'Age',
 'Graduated',
 'Profession',
 'Work_Experience',
 'Spending_Score',
 'Family_Size',
 'Var_1']

In [None]:
train_df = get_row_as_text(input_df, rows_to_convert)
train_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Ever_Married: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Graduated: string (nullable = true)
 |-- Profession: string (nullable = true)
 |-- Work_Experience: double (nullable = true)
 |-- Spending_Score: string (nullable = true)
 |-- Family_Size: double (nullable = true)
 |-- Var_1: string (nullable = true)
 |-- Segmentation: string (nullable = true)
 |-- row_as_text: string (nullable = false)



In [None]:
train_df.select("row_as_text").show(2, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|row_as_text                                                                                                                                              |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|Gender: Male; Ever_Married: No; Age: 22; Graduated: No; Profession: Healthcare; Work_Experience: 1.0; Spending_Score: Low; Family_Size: 4.0; Var_1: Cat_4|
|Gender: Female; Ever_Married: Yes; Age: 38; Graduated: Yes; Profession: Engineer; Spending_Score: Average; Family_Size: 3.0; Var_1: Cat_4                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [None]:

import os

# Specify the path of the directory you want to create
db_dir = "resources/embeddings_01"

# Create the directory
os.makedirs(db_dir, exist_ok=True)

In [None]:

step = 500
k = 2000
hf_embeddings = get_embedding_model()
texts_list = train_df.rdd.collect()
# texts_list = [x[0] for x in texts_label_list]
# texts_list
texts_list[0]

Row(ID=462809, Gender='Male', Ever_Married='No', Age=22, Graduated='No', Profession='Healthcare', Work_Experience=1.0, Spending_Score='Low', Family_Size=4.0, Var_1='Cat_4', Segmentation='D', row_as_text='Gender: Male; Ever_Married: No; Age: 22; Graduated: No; Profession: Healthcare; Work_Experience: 1.0; Spending_Score: Low; Family_Size: 4.0; Var_1: Cat_4')

In [None]:
vdb = Chroma(persist_directory=db_dir, embedding_function=hf_embeddings,)
for i in range(0, len(texts_list), step):
    texts = [x.row_as_text for x in texts_list[i:i+step]]
    metadata = [{"ID": str(x.ID), "Gender": str(x.Gender), "Ever_Married": str(x.Ever_Married), "Age": str(x.Age), "Graduated": str(x.Graduated), "Profession": str(x.Profession), "Work_Experience": str(x.Work_Experience), "Spending_Score": str(x.Spending_Score), "Family_Size": str(x.Family_Size), "Var_1": str(x.Var_1), "Segmentation": str(x.Segmentation)} for x in texts_list[i:i+step]]
    vdb.add_texts(texts, metadata)
vdb.persist()

In [None]:
vdb = Chroma(persist_directory=db_dir, embedding_function=hf_embeddings,)
retriever = vdb.as_retriever(search_kwargs={"k": k})

In [None]:
print(f"\n\nNumber of embeddings in chromadb: {vdb._collection.count()}")



Number of embeddings in chromadb: 8068


In [None]:
val_A, _ = train_df.filter(F.col("Segmentation") == "A").randomSplit([0.20, 0.80])
val_B, _ = train_df.filter(F.col("Segmentation") == "B").randomSplit([0.20, 0.80])
val_C, _ = train_df.filter(F.col("Segmentation") == "C").randomSplit([0.20, 0.80])
val_D, _ = train_df.filter(F.col("Segmentation") == "D").randomSplit([0.20, 0.80])

In [None]:
val_A_row2txt = get_row_as_text(val_A.limit(10), rows_to_convert)
val_B_row2txt = get_row_as_text(val_B.limit(10), rows_to_convert)
val_C_row2txt = get_row_as_text(val_C.limit(10), rows_to_convert)
val_D_row2txt = get_row_as_text(val_D.limit(10), rows_to_convert)

In [None]:
def get_retrived_df(input_df, val_df, retriever):
  texts_list = val_df.rdd.map(lambda x: x.row_as_text).collect()

  input_rows = val_df.rdd.map(lambda x: x.row_as_text).collect()
  relevant_rows = []

  step = 5
  for i in range(0, len(input_rows)):
      for relevant_row in retriever.get_relevant_documents("\n".join(input_rows[i])):
          relevant_rows.append(relevant_row.page_content + f"; Segmentation_predicted: {relevant_row.metadata['Segmentation']}; ID: {relevant_row.metadata['ID']}")

  converted_rows = [dict(pair.split(": ") for pair in row.split("; ")) for row in relevant_rows]
  return spark.createDataFrame(converted_rows).distinct().join(input_df.select("ID", "Segmentation"), how="left", on=["ID"])


In [None]:
input_rows = val_A_row2txt.rdd.map(lambda x: x.row_as_text).collect()
relevant_rows = []
retrieved_labels = []
for i in range(0, len(input_rows), 500):
    for relevant_row in retriever.get_relevant_documents("\n".join(input_rows[i:(i+500)])):
        relevant_rows.append(relevant_row.page_content)
        retrieved_labels.append(relevant_row.metadata["Segmentation"])

converted_rows = [dict(pair.split(": ") for pair in row.split("; ")) for row in relevant_rows]
retrieved_df = spark.createDataFrame(converted_rows)

In [None]:
result_df_A = get_retrived_df(input_df, val_A_row2txt, retriever)
result_df_A.distinct().groupBy("Segmentation").count().show()

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  887|
|           D| 1378|
|           C| 1007|
|           A|  958|
+------------+-----+



In [None]:
get_retrived_df(input_df, val_B_row2txt, retriever).groupBy("Segmentation").count().show()

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  803|
|           D| 1428|
|           C|  892|
|           A|  893|
+------------+-----+



In [None]:
get_retrived_df(input_df, val_C_row2txt, retriever).groupBy("Segmentation").count().show()

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  600|
|           D| 1219|
|           C|  700|
|           A|  668|
+------------+-----+



In [None]:
get_retrived_df(input_df, val_D_row2txt, retriever).groupBy("Segmentation").count().show()

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  749|
|           D| 1356|
|           C|  840|
|           A|  828|
+------------+-----+



In [None]:
test_df = spark.read.option("header", "true").option("inferSchema", "true").csv("./Test.csv")
test_df = get_row_as_text(test_df, rows_to_convert).limit(10)


In [None]:
from collections import Counter


def get_retrieved_rows(input_row, retriever):
  relevant_rows = []
  labels = []
  for relevant_row in retriever.get_relevant_documents(input_row):
      relevant_rows.append(relevant_row.page_content + f"; Segmentation_predicted: {relevant_row.metadata['Segmentation']}; ID: {relevant_row.metadata['ID']}")
      labels.append(relevant_row.metadata['Segmentation'])


  converted_rows = [dict(pair.split(": ") for pair in row.split("; ")) for row in relevant_rows]
  spark.createDataFrame(converted_rows).distinct().join(input_df.select("ID", "Segmentation"), how="left", on=["ID"]).groupBy("Segmentation").count().show()

  # most common label
  counter = Counter(labels)
  most_common_label, count = counter.most_common(1)[0]
  return most_common_label

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

get_retrieved_rows_udf = udf(lambda x: get_retrieved_rows(x, retriever), StringType())


In [None]:
import pandas as pd
test_pdf = test_df.toPandas()
test_pdf["Segmentation_predicted"] = test_pdf["row_as_text"].apply(lambda x: get_retrieved_rows(x, retriever))

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  430|
|           D|  609|
|           C|  336|
|           A|  625|
+------------+-----+

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  537|
|           D|  425|
|           C|  608|
|           A|  430|
+------------+-----+

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  436|
|           D|  594|
|           C|  406|
|           A|  564|
+------------+-----+

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  499|
|           D|  495|
|           C|  456|
|           A|  550|
+------------+-----+

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  202|
|           D| 1175|
|           C|  206|
|           A|  417|
+------------+-----+

+------------+-----+
|Segmentation|count|
+------------+-----+
|           B|  410|
|           D|  638|
|           C|  400|
|           A|  552|
+-------

In [None]:
result_df = spark.createDataFrame(test_pdf)
actual_results_df = spark.read.option("header", "true").option("inferSchema", "true").csv("./results.csv")
result_df.select("ID", "Segmentation_predicted").join(actual_results_df, on=["ID"], how="left").show(200)

+------+----------------------+------------+
|    ID|Segmentation_predicted|Segmentation|
+------+----------------------+------------+
|458989|                     A|           B|
|458994|                     C|           C|
|458996|                     D|           A|
|459000|                     A|           C|
|459001|                     D|           D|
|459003|                     D|           B|
|459005|                     D|           A|
|459008|                     C|           C|
|459013|                     C|           C|
|459014|                     D|           D|
+------+----------------------+------------+



In [None]:
# from helper_utils import load_chroma, word_wrap, project_embeddings
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
import numpy as np

In [None]:
from sentence_transformers import CrossEncoder
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/316 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

In [None]:
from collections import Counter


def get_retrieved_rows_with_crossenc(input_row, retriever, k):
    relevant_rows = []
    labels = []
    for relevant_row in retriever.get_relevant_documents(input_row):
      score = cross_encoder.predict([input_row, relevant_row.page_content])
      relevant_rows.append(relevant_row.page_content + f"; Segmentation_predicted: {relevant_row.metadata['Segmentation']}; ID: {relevant_row.metadata['ID']}; Score: {score}")
      labels.append(relevant_row.metadata['Segmentation'])


    converted_rows = [dict(pair.split(": ") for pair in row.split("; ")) for row in relevant_rows]
    df = spark.createDataFrame(converted_rows).distinct().join(input_df.select("ID", "Segmentation"), how="left", on=["ID"])
    # df.groupBy("Segmentation").count().show()

    df = spark.createDataFrame(converted_rows).distinct().join(input_df.select("ID", "Segmentation"), how="left", on=["ID"])
    most_common_label = df.orderBy(df["Score"].desc()).limit(int(k*0.1)).groupBy("Segmentation").count().orderBy(F.col("count").desc()).first()["Segmentation"]

    return most_common_label


In [None]:
k = 100
retriever = vdb.as_retriever(search_kwargs={"k": k})

In [None]:
#for 300 rows
import pandas as pd
test_df = spark.read.option("header", "true").option("inferSchema", "true").csv("./Test.csv")
test_df = get_row_as_text(test_df, rows_to_convert).limit(300)

test_pdf = test_df.toPandas()
test_pdf["Segmentation_predicted"] = test_pdf["row_as_text"].apply(lambda x: get_retrieved_rows_with_crossenc(x, retriever, k))

In [None]:
result_df = spark.createDataFrame(test_pdf)
actual_results_df = spark.read.option("header", "true").option("inferSchema", "true").csv("./results.csv")
df = result_df.select("ID", "Segmentation_predicted").join(actual_results_df, on=["ID"], how="left")
df.show()

+------+----------------------+------------+
|    ID|Segmentation_predicted|Segmentation|
+------+----------------------+------------+
|458989|                     A|           B|
|458994|                     A|           C|
|458996|                     B|           A|
|459000|                     C|           C|
|459001|                     D|           D|
|459003|                     A|           B|
|459005|                     A|           A|
|459008|                     C|           C|
|459013|                     C|           C|
|459014|                     D|           D|
|459015|                     D|           D|
|459016|                     D|           D|
|459024|                     C|           C|
|459026|                     C|           D|
|459032|                     D|           D|
|459033|                     C|           B|
|459036|                     A|           A|
|459039|                     C|           C|
|459041|                     B|           B|
|459045|  

In [None]:
from pyspark.sql.functions import col

matched = df.filter(col("Segmentation") == col("Segmentation_predicted")).count()
unmatched = df.filter(col("Segmentation") != col("Segmentation_predicted")).count()

print(matched)
print(unmatched)

214
86


In [None]:
total = matched + unmatched

percentage_matched = (matched / total) * 100
percentage_unmatched = (unmatched / total) * 100

print("Percentage Matched:", percentage_matched, "%")
print("Percentage Unmatched:", percentage_unmatched, "%")


Percentage Matched: 71.33333333333334 %
Percentage Unmatched: 28.666666666666668 %


In [None]:

# Count occurrences of each label in prediction and test
prediction_counts = df.groupBy("Segmentation_predicted").count().orderBy("Segmentation_predicted")
test_counts = df.groupBy("Segmentation").count().orderBy("Segmentation")

# Show counts
print("Prediction Counts:")
prediction_counts.show()

print("Test Counts:")
test_counts.show()




Prediction Counts:
+----------------------+-----+
|Segmentation_predicted|count|
+----------------------+-----+
|                     A|   67|
|                     B|   61|
|                     C|   95|
|                     D|   77|
+----------------------+-----+

Test Counts:
+------------+-----+
|Segmentation|count|
+------------+-----+
|           A|   67|
|           B|   60|
|           C|   91|
|           D|   82|
+------------+-----+



In [None]:
from pyspark.sql.functions import col

# Define the target column name
target_col = "Segmentation"

# Calculate true positives, false positives, and false negatives for each class
classes = ['A', 'B', 'C', 'D']
metrics = {}
for cls in classes:
    tp = df.filter((col(target_col) == cls) & (col(target_col + "_predicted") == cls)).count()
    fp = df.filter((col(target_col) != cls) & (col(target_col + "_predicted") == cls)).count()
    fn = df.filter((col(target_col) == cls) & (col(target_col + "_predicted") != cls)).count()
    metrics[cls] = (tp, fp, fn)

# Calculate precision, recall, and F1-score for each class
for cls, (tp, fp, fn) in metrics.items():
    precision = tp / (tp + fp) if tp + fp != 0 else 0
    recall = tp / (tp + fn) if tp + fn != 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall != 0 else 0
    print(f"Class {cls}: Precision = {precision:.2f}, Recall = {recall:.2f}, F1-score = {f1_score:.2f}")


Class A: Precision = 0.63, Recall = 0.63, F1-score = 0.63
Class B: Precision = 0.61, Recall = 0.62, F1-score = 0.61
Class C: Precision = 0.73, Recall = 0.76, F1-score = 0.74
Class D: Precision = 0.86, Recall = 0.80, F1-score = 0.83


In [None]:
from pyspark.sql.functions import col

target_col = "Segmentation"

classes = ['A', 'B', 'C', 'D']

confusion_matrix = {(true_cls, pred_cls): 0 for true_cls in classes for pred_cls in classes}

for true_cls in classes:
    for pred_cls in classes:
        tp = df.filter((col(target_col) == true_cls) & (col(target_col + "_predicted") == pred_cls)).count()
        confusion_matrix[(true_cls, pred_cls)] = tp

# Print the confusion matrix
print("Confusion Matrix:")
for true_cls in classes:
    row = [confusion_matrix[(true_cls, pred_cls)] for pred_cls in classes]
    print(f"True {true_cls}: {row}")

Confusion Matrix:
True A: [42, 12, 6, 7]
True B: [6, 37, 17, 0]
True C: [8, 10, 69, 4]
True D: [11, 2, 3, 66]
