In [0]:

%pip install typing-extensions
%pip install openai
%pip install sqlparse==0.5.0
%pip install mlflow>=2.9.0
dbutils.library.restartPython()

### Trying to register to MLFlow

In [0]:
%pip install mlflow[databricks]
import mlflow

In [0]:
# Import the necessary libraries
import mlflow
from mlflow.pyfunc import PythonModel, ModelSignature
from mlflow.types import DataType, Schema
import mlflow.pyfunc
import mlflow.deployments
import os

def get_table_schema(table_name, spark_object):
    table_schema = spark_object.sql("DESCRIBE {}".format(table_name))
    my_schema = table_schema.collect()
    return (table_name, my_schema)

In [0]:

# Define the custom model class for TxtToSQL
class TxtToSQLModel(PythonModel):
    # Constructor
    def __init__(self):
        pass

    def nl_to_sql(self, client, nl_query, schema):
        chat_response = client.predict(
            endpoint="databricks-meta-llama-3-70b-instruct",
            inputs={
                "messages": [
                    {"role": "system", "content": "You are an AI assistant"},
                    {
                        "role": "user",
                        "content": f"Table schema:\n{schema}\n\nConvert the following natural language query to SQL: {nl_query}\n\nSQL: and in the output give only the SQL query without text",
                    },
                ],
                "temperature": 0.1,
                "max_tokens": 256,
            },
        )

        return chat_response.choices[0]

    # Method for loading the model
    def load_context(self, context):
        pass

    """
    # Method for predicting using the loaded model
    def predict(self, model_input):
        # Custom prediction logic goes here
        pass
    """
    def predict(self, client, message, table_name, spark_object):
        table_schema = get_table_schema(table_name, spark_object)
        sql_query = self.nl_to_sql(client, message, table_schema)
        response = sql_query["message"]["content"]
        response = response.replace("```", "")
        #df = spark_object.sql(response)
        return response

# Create an instance of the custom model
txtToSQLModel = TxtToSQLModel()

# Model Signature to be added to MLlow registration
from mlflow.types import DataType, Schema, ColSpec
input_schema = Schema([
    ColSpec("string", "message"),
    ColSpec("string", "table_name")
])
output_schema = Schema([
    ColSpec("string", "sql_query")
])
model_signature = ModelSignature(
    inputs=input_schema,
    outputs=output_schema
)

# # Log the model with MLflow
# mlflow.pyfunc.log_model(
#     "my_custom_model",
#     python_model=txtToSQLModel,
#     artifacts={},
#     signature=model_signature,
#     # registered_model_name="workspace.default.txt_to_sql_llama3",
# )

In [0]:

# Define the custom model class for TxtToSQL
class DefineTxtQuery(PythonModel):
    # Constructor
    def __init__(self):
        pass

    def nl_to_sql(self, client, nl_query, schema):
        chat_response = client.predict(
            endpoint="databricks-meta-llama-3-70b-instruct",
            inputs={
                "messages": [
                    {"role": "system", "content": "You are an AI assistant"},
                    {
                        "role": "user",
                        "content": f"From the table having schema :\n{schema}\n\nWhat would you look at to answer this question : {nl_query}\n\n Give an answer that contains simple statements looking like natural language queries",
                    },
                ],
                "temperature": 0.1,
                "max_tokens": 256,
            },
        )

        return chat_response.choices[0]

    # Method for loading the model
    def load_context(self, context):
        pass

    """
    # Method for predicting using the loaded model
    def predict(self, model_input):
        # Custom prediction logic goes here
        pass
    """
    def predict(self, client, message, table_name, spark_object):
        table_schema = get_table_schema(table_name, spark_object)
        sql_query = self.nl_to_sql(client, message, table_schema)
        response = sql_query["message"]["content"]
        response = response.replace("```", "")
        #df = spark_object.sql(response)
        return response

# Create an instance of the custom model
defineTxtQueryModel = DefineTxtQuery()

# Model Signature to be added to MLlow registration
from mlflow.types import DataType, Schema, ColSpec
input_schema = Schema([
    ColSpec("string", "message"),
    ColSpec("string", "table_name")
])
output_schema = Schema([
    ColSpec("string", "sql_query")
])
model_signature = ModelSignature(
    inputs=input_schema,
    outputs=output_schema
)

# # Log the model with MLflow
# mlflow.pyfunc.log_model(
#     "my_custom_model",
#     python_model=defineTxtQueryModel,
#     artifacts={},
#     signature=model_signature,
#     registered_model_name="workspace.default.define_txt_query_llama",
# )

### Testing the Model 


In [0]:
client = mlflow.deployments.get_deploy_client("databricks")

## 1st step ask a generic question, an agent will define how to solve this with the data that we have at hand

In [0]:
query1 = defineTxtQueryModel.predict(client, "What is the cheapest place to buy a house past the date timestamp year 2023 for a familiy having more than 2 bathrooms? Always Keep the Latitude and Longitude columns.", "workspace.default.us_listings_with_risk_scores", spark)

print(query1)

query2 = defineTxtQueryModel.predict(client, "What are the average sold house prices of the max day in the dataset for each city? Always Keep the Latitude and Longitude columns.", "workspace.default.us_listings_with_risk_scores", spark)

print(query2)



## 2nd step get you answer transformed into sql 

In [0]:

result1 = txtToSQLModel.predict(client, query1, "workspace.default.us_listings_with_risk_scores", spark)

print(result1)

result2 = txtToSQLModel.predict(client, query2, "workspace.default.us_listings_with_risk_scores", spark)

print(result2)


In [0]:
%sql 
SELECT * FROM workspace.default.us_listings_with_risk_scores 
WHERE YEARBUILT > "2023" AND BATHROOMS > "2" 
ORDER BY PRICE ASC 
LIMIT 1;

## 3rd step : get the query executed 

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
import plotly.express as px

final_df = spark.sql(result1)

final_display = spark.sql(
    """
  SELECT
  avg(try_cast(LATITUDE AS DOUBLE)) as LATITUDE,
  avg(try_cast(LONGITUDE AS DOUBLE)) as LONGITUDE,
  avg(PRICE) as PRICE
FROM
  bright_data_real_estate_listings.datasets.zillow_properties
WHERE YEARBUILT > "2023" AND BATHROOMS > "2" 
GROUP BY
  City
  """
)

final_display = final_display.withColumn("PRICE", col("PRICE").cast("integer"))


# Example dataframe for plotting
df_plot = final_display.toPandas()

fig = px.scatter_mapbox(df_plot, lat="LATITUDE", lon="LONGITUDE", color="PRICE",
                        size="PRICE", color_continuous_scale=px.colors.cyclical.IceFire,
                        size_max=20, zoom=2.5, mapbox_style="carto-positron")

# Set title
fig.update_layout(title="Real Estate Prices")

# Control dimensions
fig.update_layout(
    title="Real Estate Prices",
    title_x=0.5,  # Center title
    title_font_size=24,  # Change title size
    title_font_color="white",  # Update title text color
    paper_bgcolor="black",  # Set background color
    legend_font_color="white",  # Set legend text color to white
    font_color="white"  # Set all font color to white
)

# Rename the display name of a column in the figure
fig.update_traces(marker=dict(symbol='circle', opacity=0.8), name="Average Price")

# Control dimensions
fig.update_layout(
    autosize=False,
    width=800,
    height=600,
)

fig.show()


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
import plotly.express as px

final_df_2 = spark.sql(result2)

final_display_2 = final_df_2.where(~col("LATITUDE").contains("__TYPE__")).groupBy("CITY_NAME").agg(
    F.mean("LATITUDE").cast("DOUBLE").alias("LATITUDE"),
    F.mean("LONGITUDE").cast("DOUBLE").alias("LONGITUDE"),
    F.mean("avg_max_price").cast("DOUBLE").alias("PRICE"),
)



# Example dataframe for plotting
df_plot = final_display_2.toPandas()

fig = px.scatter_mapbox(df_plot, lat="LATITUDE", lon="LONGITUDE", color="PRICE",
                        size="PRICE", color_continuous_scale=px.colors.cyclical.IceFire,
                        size_max=20, zoom=2.5, mapbox_style="carto-positron")

# Set title
fig.update_layout(title="Real Estate Prices")

# Control dimensions
fig.update_layout(
    title="Real Estate Prices",
    title_x=0.5,  # Center title
    title_font_size=24,  # Change title size
    title_font_color="white",  # Update title text color
    paper_bgcolor="black",  # Set background color
    legend_font_color="white",  # Set legend text color to white
    font_color="white"  # Set all font color to white
)

# Rename the display name of a column in the figure
fig.update_traces(marker=dict(symbol='circle', opacity=0.8), name="Average Price")

# Control dimensions
fig.update_layout(
    autosize=False,
    width=800,
    height=600,
)

fig.show()
