In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import expr

In [None]:
ml_catalog = dbutils.widgets.get("ml_catalog")
ml_search_db = dbutils.widgets.get("ml_search_db")

### Search Data

In [None]:
df_search = spark.sql(f"""select 
    user_agent,
    uuid1_time,
    _token_associate_id,
    label,
    _token_client_id,
    _token_session_id,
    context,
    athena_tablename,
    technical_mini_app_version,
    technical_mini_app,
    _token_person_id,
    action,
    os,
    browser,
    schema_version,
    _token_mask,
    date_key,
    client_id,
    tile_id,
    category,
    timezone,
    event_id,
    time_stamp,
    request_correlation_id,
    details_search_value,
    details_search_results,
    year(date_sub(current_date(), 1)) as year,
    month(date_sub(current_date(), 1)) as month,
    day(date_sub(current_date(), 1)) as day
from {ml_catalog}.{ml_search_db}.ml_search where details_search_results is not null
""")

In [None]:
# Define the schema for the JSON objects
schema = StructType([
    StructField('_id', StringType(), True),
    StructField('finalScore', FloatType(), True),
    StructField('resPos', IntegerType(), True),
    StructField('traceId', StringType(), True),
    StructField('queryId', StringType(), True)
])

# Explode the list of JSON objects
res = df_search.withColumn("details_search_results_extract_search", expr(f"{ml_catalog}.{ml_search_db}.literal_eval_search(details_search_results)"))
exploded_df = res.withColumn("details", F.explode(F.col("details_search_results_extract_search")))
parsed_df = exploded_df.withColumn("details", F.from_json(F.col("details"), schema))
final_df = parsed_df.select(*[col for col in res.columns if col != "details_search_results_extract_search"], "details.*")

In [None]:
display(final_df)

In [None]:
final_df.printSchema()

In [None]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
spark.sql(f"DROP TABLE IF EXISTS {ml_catalog}.{ml_search_db}.ml_search_all")

(final_df
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.partitionBy("year", "month", "day")
.saveAsTable(f"{ml_catalog}.{ml_search_db}.ml_search_all"))

### People Data

In [None]:
df_people = spark.sql(f"""select 
    user_agent,
    uuid1_time,
    _token_associate_id,
    label,
    _token_client_id,
    _token_session_id,
    context,
    athena_tablename,
    technical_mini_app_version,
    technical_mini_app,
    _token_person_id,
    action,
    os,
    browser,
    schema_version,
    _token_mask,
    date_key,
    client_id,
    tile_id,
    category,
    timezone,
    event_id,
    time_stamp,
    request_correlation_id,
    details_search_value,
    details_search_results,
    year(date_sub(current_date(), 1)) as year,
    month(date_sub(current_date(), 1)) as month,
    day(date_sub(current_date(), 1)) as day
from {ml_catalog}.{ml_search_db}.ml_search where action = 'people' and details_search_results is not null
""")

In [None]:
# Define the schema for the JSON objects
schema = StructType([
    StructField('_id', StringType(), True),
    StructField('legalName', StringType(), True),
    StructField('displayName', StringType(), True),
    StructField('eID', StringType(), True),
    StructField('location', StringType(), True),
    StructField('position', StringType(), True),
    StructField('finalScore', FloatType(), True),
    StructField('resPos', IntegerType(), True),
    StructField('traceId', StringType(), True),
    StructField('queryId', StringType(), True)
])

# Explode the list of JSON objects
res = df_people.withColumn("details_search_results_extract_people", expr(f"{ml_catalog}.{ml_search_db}.literal_eval_people(details_search_results)"))
exploded_df = res.withColumn("details", F.explode(F.col("details_search_results_extract_people")))
parsed_df = exploded_df.withColumn("details", F.from_json(F.col("details"), schema))
final_df = parsed_df.select(*[col for col in res.columns if col != "details_search_results_extract_people"], "details.*")

In [None]:
display(final_df)

In [None]:
final_df.printSchema()

In [None]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
spark.sql(f"DROP TABLE IF EXISTS {ml_catalog}.{ml_search_db}.ml_search_people")

(final_df
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.partitionBy("year", "month", "day")
.saveAsTable(f"{ml_catalog}.{ml_search_db}.ml_search_people"))

### Action Data

In [None]:
df_action = spark.sql(f"""select 
    user_agent,
    uuid1_time,
    _token_associate_id,
    label,
    _token_client_id,
    _token_session_id,
    context,
    athena_tablename,
    technical_mini_app_version,
    technical_mini_app,
    _token_person_id,
    action,
    os,
    browser,
    schema_version,
    _token_mask,
    date_key,
    client_id,
    tile_id,
    category,
    timezone,
    event_id,
    time_stamp,
    request_correlation_id,
    details_search_value,
    details_search_results,
    year(date_sub(current_date(), 1)) as year,
    month(date_sub(current_date(), 1)) as month,
    day(date_sub(current_date(), 1)) as day
from {ml_catalog}.{ml_search_db}.ml_search where action = 'actions' and details_search_results is not null
""")

In [None]:
# Define the schema for the JSON objects
schema = StructType([
    StructField('_id', StringType(), True),
    StructField('caption', StringType(), True),
    StructField('subtitle', StringType(), True),
    StructField('solrScore', FloatType(), True),  
    StructField('finalScore', FloatType(), True),
    StructField('scoreDistributionCaption', FloatType(), True), 
    StructField('scoreDistributionDescription', FloatType(), True), 
    StructField('scoreDistributionKeywords', FloatType(), True), 
    StructField('resPos', IntegerType(), True),
    StructField('traceId', StringType(), True),
    StructField('queryId', StringType(), True)
])


# Explode the list of JSON objects
res = df_action.withColumn("details_search_results_extract_action", expr(f"{ml_catalog}.{ml_search_db}.literal_eval_action(details_search_results)"))
exploded_df = res.withColumn("details", F.explode(F.col("details_search_results_extract_action")))
parsed_df = exploded_df.withColumn("details", F.from_json(F.col("details"), schema))
final_df = parsed_df.select(*[col for col in res.columns if col != "details_search_results_extract_action"], "details.*")

In [None]:
display(final_df)

In [None]:
final_df.printSchema()

In [None]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
spark.sql(f"DROP TABLE IF EXISTS {ml_catalog}.{ml_search_db}.ml_search_action")

(final_df
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.partitionBy("year", "month", "day")
.saveAsTable(f"{ml_catalog}.{ml_search_db}.ml_search_action"))

### Merge Searches and Clicks

In [None]:
final_df = spark.sql(f"""
with click as (
    select request_correlation_id,
        _token_session_id, 
        _token_associate_id, 
        object_id, 
        time_stamp, 
        label, 
        client_id, 
        category, 
        details_caption 
    from {ml_catalog}.{ml_search_db}.ml_search_click 
    where request_correlation_id is not null and lower(request_correlation_id) != 'nan'
),

search_click as (
        select search.request_correlation_id,
        search.resPos,
        search.traceId,
        rank() over (partition by search.request_correlation_id, search.resPos, search.traceId order by click.time_stamp) as rank,
        click._token_session_id as click_session_id, 
        click._token_associate_id as click_associate_id, 
        click.object_id as click_object_id, 
        click.time_stamp as click_time_stamp, 
        click.label as click_label, 
        click.client_id as click_client_id, 
        click.category as click_category, 
        click.details_caption as click_details_caption
    from {ml_catalog}.{ml_search_db}.ml_search_all search
    inner join click
    on search.request_correlation_id = click.request_correlation_id
    and search._id = click.object_id
)

select search.*, 
        search_click.click_session_id, 
        search_click.click_associate_id, 
        search_click.click_object_id, 
        search_click.click_time_stamp, 
        search_click.click_label, 
        search_click.click_client_id, 
        search_click.click_category, 
        search_click.click_details_caption,
        if(search_click.traceId is null, 0, 1) as click
from {ml_catalog}.{ml_search_db}.ml_search_all search
left join (
    select *
    from search_click
    where rank = 1
) search_click
on search.traceId = search_click.traceId and search.resPos = search_click.resPos
""")

In [None]:
display(final_df)

In [None]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
spark.sql(f"DROP TABLE IF EXISTS {ml_catalog}.{ml_search_db}.ml_search_with_click")

(final_df
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.partitionBy("year", "month", "day")
.saveAsTable(f"{ml_catalog}.{ml_search_db}.ml_search_with_click"))