In [11]:
from skt.ye import (
    get_hdfs_conn,
    get_spark,
    hive_execute,
    hive_to_pandas,
    pandas_to_parquet,
    slack_send,
    get_secrets
)

In [12]:
from skt.gcp import (
    PROJECT_ID,
    bq_insert_overwrite,
    bq_to_df,
    bq_to_pandas,
    get_bigquery_client,
    get_max_part,
    load_query_result_to_table,
    pandas_to_bq,
    pandas_to_bq_table,
    load_bigquery_ipython_magic,
    get_bigquery_client
)

In [13]:
import pyspark.sql.functions as F

In [14]:
import os

In [15]:
#!pip install opensearch-py

In [38]:
mappings = {
        "properties": {
            "svc_mgmt_num": {"type": "keyword"},
            "luna_id": {"type": "keyword"},
            "age": {"type": "short"},
            "gender": {"type": "short"},
            "mno_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "adot_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "behavior_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_adot": {"type": "boolean"},
            "user_embedding": {
                "type": "knn_vector",
                "dimension": 2048,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib",
                    "parameters": {"ef_construction": 16, "m": 16},
                },
            },
        }
    }

In [44]:
settings = {
    "index": {
        "number_of_shards": 4,
        "number_of_replicas": 1,
        "knn": True,
        "analysis":{
            "analyzer": {
                "default": {
                    "type": "standard"
                }
            }
        }        
    }
}

In [45]:
index_body = {
    "settings": settings,
    "mappings": mappings
}

In [75]:
from pyspark import version as spark_version
from pyspark.sql import SparkSession

is_spark_3 = spark_version.__version__ >= "3.0.0"

In [76]:
def get_custom_spark(queue:str=None, jars:tuple=None, app_name:str=None, is_spark_3:bool=True, spark_config=None):
    import tempfile
    import uuid
    
    if not queue:
        if "JUPYTERHUB_USER" in os.environ:
            queue = "dmig_eda"
        else:
            queue = "airflow_job"
            
    if app_name is None:
        tmp_uuid = str(uuid.uuid4())
        app_name = f"skt-{os.environ.get('USER', 'default')}-{tmp_uuid}"
        
    bigquery_jars = (
        "hdfs:///jars/spark-bigquery-with-dependencies_2.12-0.24.2.jar"
        if is_spark_3
        else "hdfs:///jars/spark-bigquery-with-dependencies_2.11-0.17.3.jar"
    )
    
    spark_jars = ",".join([bigquery_jars, jars]) if jars else bigquery_jars
    arrow_enabled = "spark.sql.execution.arrow.pyspark.enabled" if is_spark_3 else "spark.sql.execution.arrow.enabled"
    
    arrow_pre_ipc_format = "0" if is_spark_3 else "1"
    os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = arrow_pre_ipc_format
    if spark_config is None:
        spark = (
            SparkSession.builder.config("spark.app.name", app_name)
            .config("spark.driver.memory", "30g")
            .config("spark.executor.memory", "30g")
            .config("spark.executor.instances", "10")
            .config("spark.driver.maxResultSize", "12g")
            .config("spark.rpc.message.maxSize", "1024")
            .config("spark.yarn.queue", queue)
            .config("spark.ui.enabled", "false")
            .config("spark.port.maxRetries", "128")
            .config("spark.pyspark.driver.python", "/usr/bin/python3.8") 
            .config("spark.pyspark.python", "/usr/bin/python3.8") 
            .config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", arrow_pre_ipc_format)
            .config("spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT", arrow_pre_ipc_format)
            .config(
                "spark.jars",
                spark_jars,
            )
            .enableHiveSupport()
            .getOrCreate()
        )
    else:
        raise NotImplementedError('yet implemented')
        
    spark.conf.set(arrow_enabled, "true")
    return spark
    

In [77]:
spark = get_custom_spark(app_name='test_jh', is_spark_3 = is_spark_3)

In [43]:
from py4j.java_gateway import java_import
sc = spark.sparkContext
java_import(sc._gateway.jvm, "org.apache.hadoop.fs.Path")
java_import(sc._gateway.jvm, "org.apache.hadoop.fs.FileSystem")
fs = sc._gateway.jvm.FileSystem.get(sc._jsc.hadoopConfiguration())

In [46]:
hdfs_path = "/data/temp/one_model_2024-05-11"

In [47]:
save_hdfs_path = sc._gateway.jvm.Path(hdfs_path)

In [48]:
status = fs.listStatus(save_hdfs_path)


In [49]:
file_list = []
for i, file_status in enumerate(status):
    file_name = file_status.getPath().getName()
    file_path = os.path.join(hdfs_path, file_name)
    file_list.append(file_path)

In [50]:
file_list[0]

'/data/temp/one_model_2024-05-11/input_data'

In [57]:
temp_file = '/data/temp/one_model_2024-05-11/one_model_v3_result_adot_20240511_0_emb.parquet.gzip'

In [94]:
sample_d = spark.read.parquet(temp_file)

                                                                                

In [60]:
sample_d.count()

                                                                                

13132

In [107]:
sample_d.printSchema()

root
 |-- svc_mgmt_num: string (nullable = true)
 |-- mno_profile: string (nullable = true)
 |-- adot_profile: string (nullable = true)
 |-- behavor_profile: string (nullable = true)
 |-- user_vector: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- age: integer (nullable = false)
 |-- gender: string (nullable = false)





In [110]:
query = f"""
SELECT  A.svc_mgmt_num, 
        A.item, 
        A.mno_profile_feature, 
        A.adot_profile_feature,
        B.luna_id
        CASE 
            WHEN B.luna_id is null Then 'N'
            ELSE 'Y'
        END AS is_aodt
FROM adot_reco.onemodelV3_train_input_prd
WHERE dt = (select max(dt) from adot_reco.onemodelV3_train_input_prd) AS A

LEFT JOIN(

            SELECT
              *
            FROM x1112904.adot_agr_meta_daily
            WHERE dt = (SELECT max(dt) FROM x1112904.adot_agr_meta_daily)
            AND status IS NULL
            
            UNION ALL

            SELECT
              *
                    
            FROM x1112904.adot_agr_meta_daily
            WHERE dt = (SELECT max(dt) FROM x1112904.adot_agr_meta_daily)
            AND status = 'NORMAL'
) AS B

"""

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)


In [80]:
input_df = bq_to_df(query=query, spark_session=spark)

24/05/22 15:20:27 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.


In [None]:
#input_df.printSchema()

In [95]:
sample_d = sample_d.join(input_df, sample_d.svc_mgmt_num == input_df.svc_mgmt_num, how='left')\
        .select(sample_d['*'], input_df['item'], input_df['mno_profile_feature'], input_df['adot_profile_feature'])

In [96]:
sample_d = sample_d.select(
                F.col('svc_mgmt_num'),
                F.col('mno_profile_feature').alias('mno_profile'),
                F.col('adot_profile_feature').alias('adot_profile'),
                F.col('item').alias('behavor_profile'),
                F.col('user_vector')
)

In [97]:
def convert_to_float_array(array):
    return [float(x) for x in array]

In [98]:
from pyspark.sql.types import ArrayType, FloatType
convert_to_float_array_udf = F.udf(convert_to_float_array, ArrayType(FloatType()))

In [99]:
sample_d = sample_d.withColumn('age', F.lit(31))\
                    .withColumn('gender', F.lit('F'))\
                    .withColumn('user_vector', convert_to_float_array_udf(F.col('user_vector')))

In [102]:
#sample_d.show(1,False)

In [103]:
mappings = {
        "properties": {
            "svc_mgmt_num": {"type": "keyword"},
            "luna_id": {"type": "keyword"},
            "age": {"type": "short"},
            "gender": {"type": "short"},
            "mno_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "adot_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "behavior_profiles": {"type": "text", "analyzer": "standard", "search_analyzer": "standard"},
            "created_at": {"type": "date", "format": "yyyy-MM-dd HH:mm:ss"},
            "is_adot": {"type": "boolean"},
            "user_embedding": {
                "type": "knn_vector",
                "dimension": 2048,
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib",
                    "parameters": {"ef_construction": 16, "m": 16},
                },
            },
        }
    }

In [104]:
from datetime import datetime

In [126]:
# Step 2: Prepare Data for Bulk Indexing
actions = []
for row in sample_d.collect():
    action = {
        "_op_type": "index",
        "_index": "onemodel_v3_inital_test_20240512",
        "_id": row["svc_mgmt_num"],
        "_source": {
            "svc_mgmt_num": row["svc_mgmt_num"],
            "luna_id": "temp",
            "user_embedding": row["user_vector"],
            "age": row["age"],
            "mno_profiles": row["mno_profile"],
            "adot_profiles": row["adot_profile"],
            "behavior_profiles": row["behavor_profile"],
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "is_adot": True
        }
    }
    actions.append(action)

                                                                                

In [121]:
#actions[1]

In [128]:
success, failed = helpers.bulk(client, actions, chunk_size=100)

# cretae opensearch Client

In [20]:
stg_vpce_opensearch = "aos-ef399bbbead7-qs5jilysrng2oxiiayoxwtklki.ap-northeast-2.es.amazonaws.com"

In [21]:
PORT = 443

In [22]:
from opensearchpy import helpers, OpenSearch

In [23]:
client = OpenSearch(
    hosts = [{"host": stg_vpce_opensearch, "port": PORT}],
    http_compress=True,
    use_ssl=True,
    verify_certs=True,
    timeout=2,
    pool_maxsize=40,
    http_auth=("TheMasterUser", "Developer_rec_stg1!"),
)

In [24]:
#client.ping()

In [None]:
def gen_data(

In [29]:
def create_index(client, index_name, index_body):
    """
    신규 인덱스를 정의한다
    """
    #logging.info("start create index")

    # index creation
    if not (client.indices.exists(index_name)):
        #logging.info("[create_index] create new index")
        client.indices.create(index=index_name, body=index_body)
        print(index_name)

In [7]:
def remove_index(client, index_name):
    """
    인덱스를 삭제한다
    """
    if client.indices.exists(index_name):
        client.indices.delete(index=index_name)
        #logging.info(f"[remove_index] remove_index {index_name}")

In [None]:
def update_data(opensearch, index_name, basetime, number_of_shards):
    """
    신규 데이터를 지정한 index_name으로 집어 넣는다.
    """
    start_time = time.time()
    resp = helpers.bulk(
        opensearch, gen_data(index_name, basetime), chunk_size=4000, request_timeout=300
    )
    inference_time = time.time() - start_time
    return resp, inference_time

In [41]:
create_index(client=client, index_name = "onemodel_v3_inital_test_20240512", index_body=index_body)

onemodel_v3_inital_test_20240512


In [25]:
remove_index(client=client, index_name = "onemodel_v3_inital_test")