# [PySpark AI](https://github.com/databrickslabs/pyspark-ai)

```shell
mamba install -c plotly plotly
```

or

```
pip install pyspark-ai plotly-express
```

Clean data rows using:

```
tr -d '\r' < all_perth_310121.csv > all_perth_310121_new.csv
```

In [2]:
# import geofunctions as S
import pyspark.sql.functions as F
from langchain.chat_models import ChatOpenAI
from pyspark_ai import SparkAI
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("SparkAI").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/03 07:22:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [1]:
# S.st_register_functions()

In [3]:
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
spark_ai = SparkAI(llm, verbose=True)
spark_ai.activate()

In [4]:
schema = ",".join(
    [
        "`ADDRESS` string",
        "`SUBURB` string",
        "`PRICE` double",
        "`BEDROOMS` integer",
        "`BATHROOMS` integer",
        "`GARAGE` integer",
        "`LAND_AREA` double",
        "`FLOOR_AREA` double",
        "`BUILD_YEAR` int",
        "`CBD_DIST` double",
        "`NEAREST_STN` string",
        "`NEAREST_STN_DIST` string",
        "`DATE_SOLD` string",
        "`POSTCODE` string",
        "`LATITUDE` double",
        "`LONGITUDE` double",
        "`NEAREST_SCH` string",
        "`NEAREST_SCH_DIST` double",
        "`NEAREST_SCH_RANK` int",
    ]
)

In [5]:
df = spark.read.csv("data/all_perth_310121_new.csv", header=True, schema=schema).cache()

In [6]:
df.count()

                                                                                

33656

In [7]:
spark_ai.plot_df(
    df, "show distribution of NEAREST_SCH_DIST less than 8 miles in 32 bins"
)

[92mINFO: [0mimport plotly.express as px
import pandas as pd

# Convert Spark DataFrame to Pandas DataFrame
df_pandas = df.toPandas()

# Filter the data where NEAREST_SCH_DIST is less than 8 miles
filtered_df = df_pandas[df_pandas['NEAREST_SCH_DIST'] < 8]

# Create the histogram plot
fig = px.histogram(filtered_df, x='NEAREST_SCH_DIST', nbins=32)

# Display the plot
fig.show()


In [8]:
df2 = spark_ai.transform_df(df, "with 3 bedrooms and 2 garages")

[92mINFO: [0mCreating temp view for the transform:
df.createOrReplaceTempView([33m"[39;49;00m[33mspark_ai_temp_view_4cab64[39;49;00m[33m"[39;49;00m)[37m[39;49;00m

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00mspark_ai_temp_view_4cab64[37m [39;49;00m[34mWHERE[39;49;00m[37m [39;49;00mBEDROOMS[37m [39;49;00m=[37m [39;49;00m[34m3[39;49;00m[37m [39;49;00m[34mAND[39;49;00m[37m [39;49;00mGARAGE[37m [39;49;00m=[37m [39;49;00m[34m2[39;49;00m[37m[39;49;00m


In [9]:
spark_ai.plot_df(
    df2,
    "show map of all locations using LONGITUDE and LATITUDE with only address column",
)

[92mINFO: [0mHere is the Python code to visualize the result of `df` using plotly:


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m[37m[39;49;00m
[37m[39;49;00m
[37m# Convert Spark DataFrame to Pandas DataFrame[39;49;00m[37m[39;49;00m
df_pandas = df.toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a scatter plot using longitude and latitude columns[39;49;00m[37m[39;49;00m
fig = px.scatter_mapbox(df_pandas, lat=[33m"[39;49;00m[33mLATITUDE[39;49;00m[33m"[39;49;00m, lon=[33m"[39;49;00m[33mLONGITUDE[39;49;00m[33m"[39;49;00m, hover_name=[33m"[39;49;00m[33mADDRESS[39;49;00m[33m"[39;49;00m)[37m[39;49;00m
[37m[39;49;00m
[37m# Set the mapbox access token (replace "YOUR_MAPBOX_ACCESS_TOKEN" with your own token)[39;49;00m[37m[39;49;00m
fig.up

In [None]:
# spark_ai.transform_df(
#     df,
#     "Create regression model to calculate the price of the house using BEDROOMS,BATHROOMS and LAND_AREA",
# )

### Point Near Point Analysis.

In [None]:
lhs = (
    spark.range(20)
    .withColumnRenamed("id", "id")
    .withColumn("lon", F.rand() * 360 - 180)
    .withColumn("lat", F.rand() * 180 - 90)
)

rhs = (
    spark.range(20)
    .withColumnRenamed("id", "id")
    .withColumn("lon", F.rand() * 360 - 180)
    .withColumn("lat", F.rand() * 180 - 90)
)

rhs.createOrReplaceTempView("rhs")

In [None]:
res = spark_ai.transform_df(lhs, "find nearest rhs element to each row")

In [None]:
# res.explain(extended=False)