In [1]:
# %pip install opensearch-py pyspark matplotlib scikit-learn seaborn pandas

In [2]:
import os
import sys
rootpath = os.path.abspath('/home/robertc/Git/pfun-cma-model')
if rootpath not in sys.path:
    sys.path.insert(0, rootpath)
from pfun_cma_model.embed import EmbedClient
os = EmbedClient(require_ssh_tunnel=False).opensearch_client

In [3]:
res = os.search(index="embeddings", body={"size": 5000, "_source": "embedding"}, scroll='1m')
scroll_id = res['_scroll_id']
scroll_size = res['hits']['total']['value']

In [4]:
embeddings = [(d['_source']['embedding'][0]['embedding'],)
              for d in res['hits']['hits']]

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("k8s://https://192.168.1.64:16443") \
    .config("spark.driver.host", "192.168.1.64") \
    .config("spark.kubernetes.container.image", "apache/spark:latest") \
    .config("spark.executor.instances", "4") \
    .config("spark.jars", os.path.join(rootpath, "embed/embed/pyspark_jars/elasticsearch-hadoop-8.10.2.jar")) \
    .appName("pfun-cma-model-embed") \
    .getOrCreate()

23/09/23 20:05:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
def getDataFromOSWithSpark(index: str = "embeddings", sample_fraction: float = 0.1):
    #: Get data from opensearch (with spark)
    df = (
        spark.read.format("org.elasticsearch.spark.sql")
        .option("es.nodes.wan.only", "true")
        .option("es.port", "9201")
        .option("es.net.ssl", "true")
        .option("es.nodes", "192.168.1.64")
        .load(f"{index}/doc_type")
    )

    # Create random sample of 10% of the data
    df_sample = df.sample(False, sample_fraction)

    return df_sample

In [6]:
from pyspark.sql.types import ArrayType, DoubleType, StructType, StructField

schema = StructType([StructField("list_features", ArrayType(DoubleType()))])
df = spark.createDataFrame(embeddings, schema=schema)

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# UDF to convert array into vector
vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df = df.withColumn("features", vector_udf("list_features"))

In [7]:
df = df.repartition('features')
df.persist()
spark.conf.set('spark.sql.shuffle.partitions', int(16 * 2.5))
spark.conf.set('spark.default.parallelism', 16)

In [8]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=8, seed=23)
model = kmeans.fit(df)

23/09/23 20:06:17 WARN TaskSetManager: Stage 0 contains a task of very large size (13841 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 3) / 3]

In [None]:
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
import seaborn as sns
import pandas as pd

pca = PCA(n_components=2)
df_pandas = pd.DataFrame(model.transform(df).rdd.map(lambda r: (float(r.features[0]), float(r.features[1]), int(r.prediction))).collect(), columns=["x", "y", "cluster"])
df_pandas['x'], df_pandas['y'] = zip(*pca.fit_transform(df_pandas[["x", "y"]]))
plt.rc('figure', figsize=(10, 8))
sns.scatterplot(x='x', y='y', hue='cluster', data=df_pandas)