<a href="https://colab.research.google.com/github/sugarforever/LangChain-Tutorials/blob/main/LangChain_Spark_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Connecting OpenAI with Apache Spark

Introduction of [pyspark-ai](https://github.com/databrickslabs/pyspark-ai)

Pyspark-AI takes English instructions and compile them into PySpark objects like DataFrames, to make Spark more user-friendly and accessible, allowing you to focus on extracting insights from your data.

In [39]:
!pip install --quiet --upgrade langchain openai pyspark-ai pyspark

In [40]:
import os
os.environ['OPENAI_API_KEY'] = 'your openai api key'

1. Initialize the Spark AI instance

In [41]:
from langchain.chat_models import ChatOpenAI
from pyspark_ai import SparkAI

# If 'gpt-4' is unavailable, use 'gpt-3.5-turbo' (might lower output quality)
llm = ChatOpenAI(model_name='gpt-4', temperature=0)

spark_ai = SparkAI(llm=llm, verbose=True)

# Activate partial functions for Spark DataFrame
spark_ai.activate()

2. Create a dataframe via a HTTP URL

In this case, we are fetching the share holders of Apple, one of the best performing stock in US market.

In [42]:
holders_dataframe = spark_ai.create_df("https://finance.yahoo.com/quote/AAPL/holders?p=AAPL")

[92mINFO: [0mParsing URL: https://finance.yahoo.com/quote/AAPL/holders?p=AAPL



INFO:spark_ai:Parsing URL: https://finance.yahoo.com/quote/AAPL/holders?p=AAPL



[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mapple_stock_holders[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Vanguard Group, Inc. (The)'[39;49;00m,[37m [39;49;00m[34m1309785362[39;49;00m,[37m [39;49;00m[33m'Mar 30, 2023'[39;49;00m,[37m [39;49;00m[34m8[39;49;00m.[34m33[39;49;00m,[37m [39;49;00m[34m254059068265[39;49;00m),[37m[39;49;00m
([33m'Blackrock Inc.'[39;49;00m,[37m [39;49;00m[34m1035008939[39;49;00m,[37m [39;49;00m[33m'Mar 30, 2023'[39;49;00m,[37m [39;49;00m[34m6[39;49;00m.[34m58[39;49;00m,[37m [39;49;00m[34m200760685161[39;49;00m),[37m[39;49;00m
([33m'Berkshire Hathaway, Inc'[39;49;00m,[37m [39;49;00m[34m915560382[39;

INFO:spark_ai:SQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mapple_stock_holders[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Vanguard Group, Inc. (The)'[39;49;00m,[37m [39;49;00m[34m1309785362[39;49;00m,[37m [39;49;00m[33m'Mar 30, 2023'[39;49;00m,[37m [39;49;00m[34m8[39;49;00m.[34m33[39;49;00m,[37m [39;49;00m[34m254059068265[39;49;00m),[37m[39;49;00m
([33m'Blackrock Inc.'[39;49;00m,[37m [39;49;00m[34m1035008939[39;49;00m,[37m [39;49;00m[33m'Mar 30, 2023'[39;49;00m,[37m [39;49;00m[34m6[39;49;00m.[34m58[39;49;00m,[37m [39;49;00m[34m200760685161[39;49;00m),[37m[39;49;00m
([33m'Berkshire Hathaway, Inc'[39;49;00m,[37m [39;49;00m[34m915560382[39;4

[92mINFO: [0mStoring data into temp view: apple_stock_holders



INFO:spark_ai:Storing data into temp view: apple_stock_holders



In [43]:
holders_dataframe.show(n=5)

+--------------------+----------+-------------+-----------+------------+
|         holder_name|    shares|date_reported|percent_out|       value|
+--------------------+----------+-------------+-----------+------------+
|Vanguard Group, I...|1309785362| Mar 30, 2023|       8.33|254059068265|
|      Blackrock Inc.|1035008939| Mar 30, 2023|       6.58|200760685161|
|Berkshire Hathawa...| 915560382| Mar 30, 2023|       5.82|177591248414|
|State Street Corp...| 576281774| Mar 30, 2023|       3.66|111781376406|
|            FMR, LLC| 311437576| Mar 30, 2023|       1.98| 60409546996|
+--------------------+----------+-------------+-----------+------------+
only showing top 5 rows



3. Plot

In [44]:
holders_dataframe.ai.plot()

[92mINFO: [0mHere is a Python code snippet that uses Plotly to visualize the data in the PySpark DataFrame `df`. This code assumes that you want to create a bar chart with `holder_name` on the x-axis and `shares` on the y-axis. 


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
[37m# Convert the Spark DataFrame to a Pandas DataFrame[39;49;00m[37m[39;49;00m
pandas_df = df.select([33m"[39;49;00m[33m*[39;49;00m[33m"[39;49;00m).toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a bar chart[39;49;00m[37m[39;49;00m
fig = go.Figure(data=go.Bar(x=pandas_df[[33m'[39;49;00m[33mholder_name[39;49;00m[33m'[39;49;00m], y=pandas_df[[33m'[39;49;00m

INFO:spark_ai:Here is a Python code snippet that uses Plotly to visualize the data in the PySpark DataFrame `df`. This code assumes that you want to create a bar chart with `holder_name` on the x-axis and `shares` on the y-axis. 


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
[37m# Convert the Spark DataFrame to a Pandas DataFrame[39;49;00m[37m[39;49;00m
pandas_df = df.select([33m"[39;49;00m[33m*[39;49;00m[33m"[39;49;00m).toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a bar chart[39;49;00m[37m[39;49;00m
fig = go.Figure(data=go.Bar(x=pandas_df[[33m'[39;49;00m[33mholder_name[39;49;00m[33m'[39;49;00m], y=pandas_df[[33m'[39;49;00m

In [45]:
holders_dataframe.ai.plot("Pie chart for Apple's top holders, show their name and share percentages")

[92mINFO: [0mHere is a Python code snippet that uses PySpark and Plotly to visualize the result of `df` as a pie chart. This code assumes that the `percent_out` column represents the share percentages of each holder.


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession[37m[39;49;00m
[37m[39;49;00m
[37m# Assuming that SparkSession is already initialized[39;49;00m[37m[39;49;00m
spark = SparkSession.builder.getOrCreate()[37m[39;49;00m
[37m[39;49;00m
[37m# Convert the Spark DataFrame to a Pandas DataFrame[39;49;00m[37m[39;49;00m
pandas_df = df.toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a pie chart with Plotly[39;49;00m[37m[39;49;00m
fig = go.Figure(data=[go.Pie(labels=pandas_df[[33m'[39;49;00m[33mholder_n

INFO:spark_ai:Here is a Python code snippet that uses PySpark and Plotly to visualize the result of `df` as a pie chart. This code assumes that the `percent_out` column represents the share percentages of each holder.


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objects[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession[37m[39;49;00m
[37m[39;49;00m
[37m# Assuming that SparkSession is already initialized[39;49;00m[37m[39;49;00m
spark = SparkSession.builder.getOrCreate()[37m[39;49;00m
[37m[39;49;00m
[37m# Convert the Spark DataFrame to a Pandas DataFrame[39;49;00m[37m[39;49;00m
pandas_df = df.toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a pie chart with Plotly[39;49;00m[37m[39;49;00m
fig = go.Figure(data=[go.Pie(labels=pandas_df[[33m'[39;49;00m[33mholder_na

In [46]:
top_holder_dataframe = holders_dataframe.ai.transform("name with the highest percentage, and its percentage")
top_holder_dataframe.show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00mholder_name,[37m [39;49;00m[34mMAX[39;49;00m(percent_out)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mmax_percent[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mGROUP[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mholder_name[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mmax_percent[37m [39;49;00m[34mDESC[39;49;00m[37m[39;49;00m
[34mLIMIT[39;49;00m[37m [39;49;00m[34m1[39;49;00m[37m[39;49;00m



INFO:spark_ai:SQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00mholder_name,[37m [39;49;00m[34mMAX[39;49;00m(percent_out)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mmax_percent[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mGROUP[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mholder_name[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mmax_percent[37m [39;49;00m[34mDESC[39;49;00m[37m[39;49;00m
[34mLIMIT[39;49;00m[37m [39;49;00m[34m1[39;49;00m[37m[39;49;00m



+--------------------+-----------+
|         holder_name|max_percent|
+--------------------+-----------+
|Vanguard Group, I...|       8.33|
+--------------------+-----------+



4. Explain what the AI did

In [47]:
top_holder_dataframe.ai.explain()

'In summary, this dataframe is retrieving the holder name with the highest percentage of Apple stocks out of all holders. It presents the results sorted by the percentage of stocks in descending order and limits the result to the top holder.'

5. Verify the dataframe attributes by giving the expectatoin in natural language

In [48]:
holders_dataframe.ai.verify("expect Apple's top holders have no more than 10% of shares")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mcheck_apple_top_holders[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
    [37m# Filter the DataFrame for rows where the holder_name is 'Apple'[39;49;00m[37m[39;49;00m
    apple_df = df.filter(col([33m'[39;49;00m[33mholder_name[39;49;00m[33m'[39;49;00m) == [33m'[39;49;00m[33mApple[39;49;00m[33m'[39;49;00m)[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if any row in the filtered DataFrame has a percent_out greater than 10[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m apple_df.filter(col([33m'[39;49;00m[33mpercent_out[39;49;00m[33m'[39;49;00m) > [34m10[39;49;00m).count() > [34m0[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
    [34m

INFO:spark_ai:Generated code:
[34mdef[39;49;00m [32mcheck_apple_top_holders[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
    [37m# Filter the DataFrame for rows where the holder_name is 'Apple'[39;49;00m[37m[39;49;00m
    apple_df = df.filter(col([33m'[39;49;00m[33mholder_name[39;49;00m[33m'[39;49;00m) == [33m'[39;49;00m[33mApple[39;49;00m[33m'[39;49;00m)[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if any row in the filtered DataFrame has a percent_out greater than 10[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m apple_df.filter(col([33m'[39;49;00m[33mpercent_out[39;49;00m[33m'[39;49;00m) > [34m10[39;49;00m).count() > [34m0[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
    [34me

[92mINFO: [0m
Result: True


INFO:spark_ai:
Result: True
