# English SDK for Apache Spark

## Initialization

In [2]:
from pyspark_ai import SparkAI

spark_ai=SparkAI(verbose=True)
spark_ai.activate() # active partial functions for Spark DataFrame

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/24 11:04:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Example 1: Auto sales by brand in US 2022

In [3]:
# Search and ingest web content into a DataFrame
# If you have set up google-api-python-client, you can just run
# auto_df = spark_ai.create_df("2022 USA national auto sales by brand")
auto_df = spark_ai.create_df("https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand")
auto_df.show()

[92mINFO: [0mParsing URL: https://www.carpro.com/blog/full-year-2022-national-auto-sales-by-brand

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mauto_sales_2022[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Toyota'[39;49;00m,[37m [39;49;00m[34m1849751[39;49;00m,[37m [39;49;00m-[34m9[39;49;00m),[37m[39;49;00m
([33m'Ford'[39;49;00m,[37m [39;49;00m[34m1767439[39;49;00m,[37m [39;49;00m-[34m2[39;49;00m),[37m[39;49;00m
([33m'Chevrolet'[39;49;00m,[37m [39;49;00m[34m1502389[39;49;00m,[37m [39;49;00m[34m6[39;49;00m),[37m[39;49;00m
([33m'Honda'[39;49;00m,[37m [39;49;00m[34m881201[39;49;00m,[37m [39;49;00m-[34m33[39;49;00m),[37m[39;49;00m


In [4]:
auto_df.ai.verify("expect sales change percentage to be between -100 to 100")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mcheck_sales_change_percentage[39;49;00m(df) -> [36mbool[39;49;00m:
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col
    
    [37m# Check if the sales_change_percentage column is between -100 and 100[39;49;00m
    valid_percentage = df.filter((col([33m"[39;49;00m[33msales_change_percentage[39;49;00m[33m"[39;49;00m) >= -[34m100[39;49;00m) & (col([33m"[39;49;00m[33msales_change_percentage[39;49;00m[33m"[39;49;00m) <= [34m100[39;49;00m))
    
    [37m# Compare the count of valid_percentage rows with the total count of rows in the DataFrame[39;49;00m
    [34mif[39;49;00m valid_percentage.count() == df.count():
        [34mreturn[39;49;00m [34mTrue[39;49;00m
    [34melse[39;49;00m:
        [34mreturn[39;49;00m [34mFalse[39;49;00m

result = check_sales_change_percentage(df)



In [None]:
auto_df.ai.plot()

[92mINFO: [0mTo visualize the result of `df` using Plotly, you can follow these steps:

1. Convert the PySpark DataFrame to a Pandas DataFrame.
2. Create a Plotly figure using the Pandas DataFrame.
3. Display the plot directly.

Here's the code to achieve this:


```
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m

[37m# Convert the PySpark DataFrame to a Pandas DataFrame[39;49;00m
pdf = df.toPandas()

[37m# Create a Plotly figure using the Pandas DataFrame[39;49;00m
fig = px.bar(pdf, x=[33m'[39;49;00m[33mbrand[39;49;00m[33m'[39;49;00m, y=[33m'[39;49;00m[33mus_sales[39;49;00m[33m'[39;49;00m, text=[33m'[39;49;00m[33msales_change_percentage[39;49;00m[33m'[39;49;00m,
             labels={[33m'[39;49;00m[33mus_sales[39;49;00m[33m'[39;49;00m: [33m'[39;49;00m[33mUS Sales[

In [7]:
auto_df.ai.plot("pie chart for US sales market shares, show the top 5 brands and the sum of others")

[92mINFO: [0mTo visualize the result of `df` using Plotly, you can follow these steps:

1. Import the necessary libraries
2. Convert the PySpark DataFrame to a Pandas DataFrame
3. Calculate the top 5 brands and the sum of others
4. Create a pie chart using Plotly
5. Display the plot

Here's the code:


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mgraph_objs[39;49;00m [34mas[39;49;00m [04m[36mgo[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m

[37m# Convert the PySpark DataFrame to a Pandas DataFrame[39;49;00m
pdf = df.toPandas()

[37m# Calculate the top 5 brands and the sum of others[39;49;00m
top_5_brands = pdf.nlargest([34m5[39;49;00m, [33m'[39;49;00m[33mus_sales[39;49;00m[33m'[39;49;00m)
other_brands_sum = pdf.loc[~pdf.index.isin(top_5_brands.index), [33m'[39;49;00m[33mus_sales[39;49;00m[33m'[39;49;00m].sum()

[37m# Add the sum of others to the top 5 brands Dat

In [8]:
# Apply transforms to a Dataframe
auto_top_growth_df=auto_df.ai.transform("brand with the highest growth")
auto_top_growth_df.show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00mbrand,[37m [39;49;00msales_change_percentage[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00msales_change_percentage[37m [39;49;00m[34mDESC[39;49;00m[37m[39;49;00m
[34mLIMIT[39;49;00m[37m [39;49;00m[34m1[39;49;00m[37m[39;49;00m

+-------+-----------------------+
|  brand|sales_change_percentage|
+-------+-----------------------+
|Genesis|                     14|
+-------+-----------------------+



In [9]:
# Explain what a DataFrame is retrieving.
auto_top_growth_df.ai.explain()

'In summary, this dataframe is retrieving the brand with the highest sales change percentage from the `temp_view_for_transform` view. The result will show the brand and its corresponding sales change percentage.'

## Example 2: USA Presidents

In [8]:
# You can also specify the expected columns for the ingestion.
df=spark_ai.create_df("USA presidents", ["president", "vice_president"])
df.show()

[92mINFO: [0mParsing URL: https://www.loc.gov/rr/print/list/057_chron.html

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00musa_presidents[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'George Washington'[39;49;00m,[37m [39;49;00m[33m'John Adams'[39;49;00m),[37m[39;49;00m
([33m'John Adams'[39;49;00m,[37m [39;49;00m[33m'Thomas Jefferson'[39;49;00m),[37m[39;49;00m
([33m'Thomas Jefferson'[39;49;00m,[37m [39;49;00m[33m'Aaron Burr'[39;49;00m),[37m[39;49;00m
([33m'Thomas Jefferson'[39;49;00m,[37m [39;49;00m[33m'George Clinton'[39;49;00m),[37m[39;49;00m
([33m'James Madison'[39;49;00m,[37m [39;49;00m[33m'George Clinton'[39;49;00m),[37m[39;49;00m
(

In [9]:
presidents_who_were_vp = df.ai.transform("presidents who were also vice presidents")
presidents_who_were_vp.show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34mDISTINCT[39;49;00m[37m [39;49;00mpresident[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mWHERE[39;49;00m[37m [39;49;00mpresident[37m [39;49;00m[34mIN[39;49;00m[37m [39;49;00m([34mSELECT[39;49;00m[37m [39;49;00mvice_president[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform)[37m[39;49;00m

+------------------+
|         president|
+------------------+
|        John Adams|
|  Thomas Jefferson|
|  Martin Van Buren|
|  Millard Fillmore|
|        John Tyler|
|    Andrew Johnson|
| Chester A. Arthur|
|Theodore Roosevelt|
|   Calvin Coolidge|
|   Harry S. Truman|
|    Gerald R. Ford|
| Lyndon B. Johnson|
|  Richard M. Nixon|
|       George Bush|
|   Joseph R. Biden|
+------------------+



In [10]:
presidents_who_were_vp.ai.explain()

'In summary, this dataframe is retrieving the list of presidents who have never been vice-presidents from the `usa_presidents` table.'

In [11]:
presidents_who_were_vp.ai.verify("expect no NULL values")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mno_null_values[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col, count, when[37m[39;49;00m
[37m[39;49;00m
    [37m# Count the number of NULL values in each column[39;49;00m[37m[39;49;00m
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) [34mfor[39;49;00m c [35min[39;49;00m df.columns])[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if there are any NULL values in any column[39;49;00m[37m[39;49;00m
    [34mfor[39;49;00m c [35min[39;49;00m null_counts.columns:[37m[39;49;00m
        [34mif[39;49;00m null_counts.first()[c] > [34m0[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;49;00m
    [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;

# Example 3: Top 10 tech companies

In [12]:
# Search and ingest web content into a DataFrame
company_df=spark_ai.create_df("Top 10 tech companies by market cap", ['company', 'cap', 'country'])
company_df.show()

[92mINFO: [0mParsing URL: https://www.statista.com/statistics/1350976/leading-tech-companies-worldwide-by-market-cap/

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mtop_tech_companies[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Apple'[39;49;00m,[37m [39;49;00m[34m2242[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Microsoft'[39;49;00m,[37m [39;49;00m[34m1821[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Alphabet (Google)'[39;49;00m,[37m [39;49;00m[34m1229[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Amazon'[39;49;00m,[37m [39;49;00m[34m902[39;49;00m.[34m4[39;49;00m,[3

In [13]:
us_company_df=company_df.ai.transform("companies in United States")
us_company_df.show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m [39;49;00m[34mWHERE[39;49;00m[37m [39;49;00mcountry[37m [39;49;00m=[37m [39;49;00m[33m'United States'[39;49;00m[37m[39;49;00m

+-------+---+-------+
|company|cap|country|
+-------+---+-------+
+-------+---+-------+



In [14]:
us_company_df.ai.explain()

'In summary, this dataframe is retrieving the company, market capitalization (cap), and country for all the top tech companies that are located in the United States.'

In [15]:
us_company_df.ai.verify("expect all company names to be unique")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mhas_unique_company_names[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m countDistinct[37m[39;49;00m
[37m[39;49;00m
    [37m# Get the number of unique company names in the DataFrame[39;49;00m[37m[39;49;00m
    unique_company_names = df.select(countDistinct([33m"[39;49;00m[33mcompany[39;49;00m[33m"[39;49;00m)).collect()[[34m0[39;49;00m][[34m0[39;49;00m][37m[39;49;00m
[37m[39;49;00m
    [37m# Get the total number of rows in the DataFrame[39;49;00m[37m[39;49;00m
    total_rows = df.count()[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the number of unique company names is equal to the total number of rows[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m unique_company_names == total_rows:[37m[39;49;00m
        [34mret

## Example 4: Ingestion from a URL
Instead of searching for the web page, you can also ask the SparkAI to ingest from a URL.

In [16]:
best_albums_df = spark_ai.create_df('https://time.com/6235186/best-albums-2022/', ["album", "artist", "year"])
best_albums_df.show()

[92mINFO: [0mParsing URL: https://time.com/6235186/best-albums-2022/

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mbest_albums_2022[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Motomami'[39;49;00m,[37m [39;49;00m[33m'Rosalía'[39;49;00m,[37m [39;49;00m[34m2022[39;49;00m),[37m[39;49;00m
([33m'You Can’t Kill Me'[39;49;00m,[37m [39;49;00m[33m'070 Shake'[39;49;00m,[37m [39;49;00m[34m2022[39;49;00m),[37m[39;49;00m
([33m'Mr. Morale & The Big Steppers'[39;49;00m,[37m [39;49;00m[33m'Kendrick Lamar'[39;49;00m,[37m [39;49;00m[34m2022[39;49;00m),[37m[39;49;00m
([33m'Big Time'[39;49;00m,[37m [39;49;00m[33m'Angel Olsen'[39;49;00m,[37m [39;49;00

In [17]:
best_albums_df.ai.verify("expect each year to be 2022")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mhas_year_2022[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
    [37m# Filter the DataFrame to only include rows where the year is 2022[39;49;00m[37m[39;49;00m
    df_filtered = df.filter(col([33m"[39;49;00m[33myear[39;49;00m[33m"[39;49;00m) == [34m2022[39;49;00m)[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the number of rows in the filtered DataFrame is equal to the number of rows in the original DataFrame[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m df_filtered.count() == df.count():[37m[39;49;00m
        [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;00m
    [34melse[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;4

## Example 5: UDF Generation

You can also ask the SparkAI to generate code for a Spark UDF by providing.

In [18]:
@spark_ai.udf
def convert_grades(grade_percent: float) -> str:
    """Convert the grade percent to a letter grade using standard cutoffs"""
    ...

[92mINFO: [0mCreating following Python UDF:
[34mdef[39;49;00m [32mconvert_grades[39;49;00m(grade_percent: [36mfloat[39;49;00m) -> [36mstr[39;49;00m:[37m[39;49;00m
    [34mif[39;49;00m grade_percent [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m:[37m[39;49;00m
        [34mif[39;49;00m grade_percent >= [34m90[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m"[39;49;00m[33mA[39;49;00m[33m"[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m80[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m"[39;49;00m[33mB[39;49;00m[33m"[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m70[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m"[39;49;00m[33mC[39;49;00m[33m"[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m60[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m"[39;49;00m[33mD[39;49;00m[33m"

In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.udf.register("convert_grades", convert_grades)
percentGrades = [(1, 97.8), (2, 72.3), (3, 81.2)]
df = spark.createDataFrame(percentGrades, ["student_id", "grade_percent"])
df.selectExpr("student_id", "convert_grades(grade_percent)").show()

                                                                                

+----------+-----------------------------+
|student_id|convert_grades(grade_percent)|
+----------+-----------------------------+
|         1|                            A|
|         2|                            C|
|         3|                            B|
+----------+-----------------------------+



# Cache
The SparkAI supports a simple in-memory and persistent cache system. It keeps an in-memory staging cache, which gets updated for LLM and web search results. The staging cache can be persisted through the commit() method. Cache lookup is always performed on both in-memory staging cache and persistent cache.

In [10]:
spark_ai.commit()