In [0]:
%pip install databricks_langchain
%restart_python

In [0]:
from databricks_langchain import ChatDatabricks
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

import json

In [0]:
vehicles_df = spark.table('workspace.car_sales.vehicles')

In [0]:
portion_df = vehicles_df.filter(vehicles_df.manufacturer.isin('ford', 'toyota', 'chevrolet', 'honda')).sample(0.05)

In [0]:

portion_df = portion_df.drop("url", "region", "region_url", "VIN", "image_url", "county")

In [0]:
portion_df = portion_df.filter((portion_df.price != 0) & portion_df.year.isNotNull() & portion_df.model.isNotNull() & portion_df.odometer.isNotNull() & portion_df.price.isNotNull())

In [0]:
df_portion = portion_df.sample(0.12)

In [0]:
df_portion.columns

## Define Chain

In [0]:
model = ChatDatabricks(endpoint="databricks-gpt-oss-120b")

In [0]:
prompt_template_content = """
You will get a free text. You need to extraxt the following information, if available:
- manufacturer
- model
- year
- price
- odometer in km
- transmission
- fuel
- drive (4wd, fwd, ...)
- size (mid-size, full-size, ...)
- type (SUV, hatchback, sedan)
- paint_color
- condition (like new, good, excellent, ...).

If some fields are not found in the text, return them as null.

Do not add any comment, answer only with a JSON format.

EXAMPLE:

free text: 2019 Ford Focus Sedan 2.0L 4dr Sedan 4WD 2019 Ford Focus Sedan 2.0L

answer:
{{
    "manufacturer": "ford",
    "model": "focus",
    "year": "2019",
    "price": null,
    "odometer": null,
    "transmission": null,
    "fuel": null,
    "drive": "4wd",
    "size": null,
    "type": "sedan",
    "paint_color": null
    }}

free text: {free_text}
"""

prompt = PromptTemplate(
    template=prompt_template_content,
    input_variables=["free_text"]
    )

parser = StrOutputParser()

In [0]:
def call_gpt_oss(message):
    response = chain.invoke(message)
    answer = json.loads(retrieved_info)[-1]['text']
    return answer

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, LongType, DoubleType

In [0]:
output_schema = StructType([
    StructField("id", LongType(), True),
    StructField("price", StringType(), True),
    StructField("year", StringType(), True),
    StructField("manufacturer", StringType(), True),
    StructField("model", StringType(), True),
    StructField("condition", StringType(), True),
    StructField("cylinders", StringType(), True),
    StructField("fuel", StringType(), True),
    StructField("odometer", StringType(), True),
    StructField("title_status", StringType(), True),
    StructField("transmission", StringType(), True),
    StructField("drive", StringType(), True),
    StructField("size", StringType(), True),
    StructField("type", StringType(), True),
    StructField("paint_color", StringType(), True),
    StructField("description", StringType(), True),
    StructField("state", StringType(), True),
    StructField("lat", StringType(), True), # Using String for safety
    StructField("long", StringType(), True), # Using String for safety
    StructField("posting_date", StringType(), True),

    # New error column
    StructField("api_error", StringType(), True)      
])

In [0]:
# ---
# 5. Define the UDF to process batches of rows
# ---
def process_api_batch(pdf: pd.DataFrame) -> pd.DataFrame:
    """
    This function receives a batch of rows as a pandas DF,
    processes each row, and returns an updated pandas DF.
    """
    
    # IMPORTANT: Import all required libraries INSIDE the UDF
    import json
    
    # This function will be applied to *each row* in the batch
    def process_row(row):
        try:
            # 1. Create the message
            message = {"free_text": row['description']}
            
            # 2. Call the API
            json_answer = call_gpt_oss(message)
            
            # 3. Parse and filter (your original logic)
            json_info = json.loads(json_answer)
            # We filter out 'None' values from the API response
            new_values = {k: v for k, v in json_info.items() if v is not None}
            
            # 4. Update the row (this is safe within .apply)
            # This updates the 'row' (a pandas Series) in-place.
            # E.g., row['price'] (which was None) gets updated
            # with new_values['price'] (e.g., "15000").
            row.update(pd.Series(new_values))
            
        except Exception as e:
            # Don't let one bad row fail the whole batch!
            row['api_error'] = str(e)
            
        return row

    # Apply the function to every row in the pandas batch (axis=1)
    # This *modifies* the 'pdf' DataFrame in place
    updated_pdf = pdf.apply(process_row, axis=1)
    
    # Add an empty error column if it wasn't created
    if 'api_error' not in updated_pdf.columns:
        updated_pdf['api_error'] = None
        
    # Ensure the returned DF matches the output_schema
    # This re-orders columns and casts types
    
    # Create a new empty DataFrame with the correct schema columns
    # and fill it with data from the updated_pdf
    final_pdf = pd.DataFrame(columns=output_schema.names)
    for col_name in output_schema.names:
        if col_name in updated_pdf.columns:
            final_pdf[col_name] = updated_pdf[col_name]
        else:
            final_pdf[col_name] = None
            
    # Cast types to be safe
    # We cast all fields to 'str' to handle Nones/nulls safely,
    # except for 'id' which we know is an integer.
    astype_map = {name: 'str' for name in output_schema.names}
    astype_map['id'] = 'int64' # Keep 'id' as integer
    # Handle potential None in 'id' if rows are added (unlikely here)
    final_pdf['id'] = pd.to_numeric(final_pdf['id'], errors='coerce').fillna(0).astype('int64')

    # Convert all other columns to string, handling NaNs/Nones
    for col_name in astype_map:
        if col_name != 'id':
            # .astype(str) can turn None into "None", we want empty string
            final_pdf[col_name] = final_pdf[col_name].fillna("").astype(str)

    return final_pdf

In [0]:
# ---
# 6. Execute the Parallel Operation
# ---
print("Starting parallel processing...")

# We use mapInPandas to apply our function to batches
# The number of batches depends on your data and cluster configuration
updated_spark_df = df_portion.mapInPandas(process_api_batch, schema=output_schema)

# A .show() or .collect() command triggers the execution
print("Displaying processed data...")
updated_spark_df.show(truncate=False)

# ---
# 7. Write results directly to a Delta Table
# ---
print("Processing complete. Writing to Delta table...")

In [0]:
df_portion_updated = df_portion.copy()
ulimit = df_portion.shape[0]
for i in range(0, ulimit):
    message = {"free_text": df_portion_updated.iloc[i]['description']}
    json_answer = call_gpt_oss(message)
    json_info = json.loads(json_answer)
    
    new_values = {k:v for k,v in json_info.items() if v is not None}

    df_portion_updated.iloc[i].update(new_values)


In [0]:
df_portion_updated.iloc[:2]