# Input

In [1]:
# we are using the google api to fetch us data
from langchain.chat_models import ChatOpenAI
from pyspark_ai import SparkAI

# If 'gpt-4' is unavailable, use 'gpt-3.5-turbo' (might lower output quality)
llm = ChatOpenAI(model_name='gpt-4', temperature=0)

# Initialize SparkAI with the ChatOpenAI model
spark_ai = SparkAI(llm=llm, verbose=True)

# Activate partial functions for Spark DataFrame
spark_ai.activate()

23/07/05 12:23:05 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.10 instead (on interface wlp4s0)
23/07/05 12:23:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/05 12:23:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Search and ingest content from url into a DataFrame
tech_comp_df = spark_ai.create_df("https://companiesmarketcap.com/tech/largest-tech-companies-by-market-cap/", 
                                  ["company_name", "market_cap_trillion", "country"])
tech_comp_df.show()

[92mINFO: [0mParsing URL: https://companiesmarketcap.com/tech/largest-tech-companies-by-market-cap/

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mtech_companies[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Apple'[39;49;00m,[37m [39;49;00m[34m3[39;49;00m.[34m027[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Microsoft'[39;49;00m,[37m [39;49;00m[34m2[39;49;00m.[34m513[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Alphabet (Google)'[39;49;00m,[37m [39;49;00m[34m1[39;49;00m.[34m525[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Amazon'[39;49;00m,[37m [39;49;00m[34m1[39;

In [21]:
# other ways to create a DataFrame
# By default it will create a dataframe with significant columns. We can specify the column names as well.
# tech_comp_df = spark_ai.create_df("https://companiesmarketcap.com/tech/largest-tech-companies-by-market-cap/", 
#                                   ["company_name", "market_cap_trillion", "country"])

# We can also use google search query to get the data. We need GOOGLE_API_KEY and GOOGLE_CSE_ID for that.
tech_comp_df = spark_ai.create_df("largest tech companies by market cap", 
                                ["company_name", "market_cap_trillion", "country"])

[92mINFO: [0mParsing URL: https://companiesmarketcap.com/tech/largest-tech-companies-by-market-cap/

[92mINFO: [0mSQL query for the ingestion:
[34mCREATE[39;49;00m[37m [39;49;00m[34mOR[39;49;00m[37m [39;49;00m[34mREPLACE[39;49;00m[37m [39;49;00mTEMP[37m [39;49;00m[34mVIEW[39;49;00m[37m [39;49;00mtech_companies[37m [39;49;00m[34mAS[39;49;00m[37m [39;49;00m
[34mSELECT[39;49;00m[37m [39;49;00m*[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00m[34mVALUES[39;49;00m[37m[39;49;00m
([33m'Apple'[39;49;00m,[37m [39;49;00m[34m3[39;49;00m.[34m027[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Microsoft'[39;49;00m,[37m [39;49;00m[34m2[39;49;00m.[34m513[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Alphabet (Google)'[39;49;00m,[37m [39;49;00m[34m1[39;49;00m.[34m525[39;49;00m,[37m [39;49;00m[33m'USA'[39;49;00m),[37m[39;49;00m
([33m'Amazon'[39;49;00m,[37m [39;49;00m[34m1[39;

In [3]:
# we also have a local csv file that we can use 
# (https://github.com/HariVM/Analytics/blob/master/1000%20Sales%20Records.csv)
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("tech_comp").getOrCreate()

manual_df = spark.read.csv("1000 Sales Records.csv", header=True, inferSchema=True)
manual_df.show()

23/07/05 12:24:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------------------+----------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|   Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+----------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|     Libya|      Cosmetics|      Offline|             M|10/18/2014|686800706|10/31/2014|      8446|     437.2|   263.33|    3692591.2|2224085.18|  1468506.02|
|       North America|    Canada|     Vegetables|       Online|             M| 11/7/2011|185941302| 12/8/2011|      3018|    154.06|    90.93|    464953.08| 274426.74|   190526.34|
|Middle East and N...|     Libya|      Baby Food|      Offline|             C|10/31/2016|246222

# Plotting

In [4]:
tech_comp_df.ai.plot()

[92mINFO: [0mHere is a Python code snippet that uses Plotly to visualize the data in the `df` DataFrame. This code assumes that you want to create a bar chart where the x-axis represents the `company_name` and the y-axis represents the `market_cap_trillion`. The bars are colored based on the `country`.


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[34mimport[39;49;00m [04m[36mpandas[39;49;00m [34mas[39;49;00m [04m[36mpd[39;49;00m[37m[39;49;00m
[37m[39;49;00m
[37m# Convert Spark DataFrame to Pandas DataFrame[39;49;00m[37m[39;49;00m
pandas_df = df.toPandas()[37m[39;49;00m
[37m[39;49;00m
[37m# Create a bar chart[39;49;00m[37m[39;49;00m
fig = px.bar(pandas_df, x=[33m'[39;49;00m[33mcompany_name[39;49;00m[33m'[39;49;00m, y=[33m'[39;49;00m[33mmarket_cap_trillion[39;49;00m[33m'[39;49;00m, color=[33m'[39;49;00m[33mcountry[39;49;00m[

In [5]:
tech_comp_df.ai.plot("company with market cap greater than 1 trillion")

[92mINFO: [0mHere is a Python code snippet that uses Plotly to visualize the result of `df`. This code assumes that `df` is a PySpark DataFrame. It first filters the DataFrame to include only companies with a market cap greater than 1 trillion, then converts the DataFrame to a Pandas DataFrame, and finally creates a bar plot with company names on the x-axis and market cap on the y-axis.


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col[37m[39;49;00m
[37m[39;49;00m
[37m# Filter the DataFrame to include only companies with a market cap greater than 1 trillion[39;49;00m[37m[39;49;00m
df_filtered = df.filter(col([33m"[39;49;00m[33mmarket_cap_trillion[39;49;00m[33m"[39;49;00m) > [34m1[39

In [6]:
# using manually created dataframe
manual_df.ai.plot("Sales count by Region")

[92mINFO: [0mHere is a Python code snippet that uses PySpark and Plotly to visualize the sales count by region. This code assumes that the PySpark DataFrame `df` is already defined and populated.


```
[34mimport[39;49;00m [04m[36mplotly[39;49;00m[04m[36m.[39;49;00m[04m[36mexpress[39;49;00m [34mas[39;49;00m [04m[36mpx[39;49;00m[37m[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m functions [34mas[39;49;00m F[37m[39;49;00m
[37m[39;49;00m
[37m# First, we need to aggregate the data by region and count the sales[39;49;00m[37m[39;49;00m
df_agg = df.groupBy([33m"[39;49;00m[33mRegion[39;49;00m[33m"[39;49;00m).agg(F.count([33m"[39;49;00m[33mUnits Sold[39;49;00m[33m"[39;49;00m).alias([33m"[39;49;00m[33mSales Count[39;49;00m[33m"[39;49;00m))[37m[39;49;00m
[37m[39;49;00m
[37m# Convert the aggregated Spark DataFrame to a Pandas DataFrame[39;49;00m[37m[39;49;00m
df

# Transformations

In [7]:
tech_comp_df.ai.transform("Name of the country with max companies and how much is the total market cap for those companies").show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00mcountry,[37m [39;49;00m[34mSUM[39;49;00m(market_cap_trillion)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mtotal_market_cap[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mGROUP[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mcountry[37m[39;49;00m
[34mORDER[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00m[34mCOUNT[39;49;00m(company_name)[37m [39;49;00m[34mDESC[39;49;00m,[37m [39;49;00mtotal_market_cap[37m [39;49;00m[34mDESC[39;49;00m[37m[39;49;00m
[34mLIMIT[39;49;00m[37m [39;49;00m[34m1[39;49;00m[37m[39;49;00m

+-------+----------------+
|country|total_market_cap|
+-------+----------------+
|    USA|        11.06787|
+-------+----------------+



In [8]:
tech_comp_df.ai.transform("list all the countries").show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34mDISTINCT[39;49;00m[37m [39;49;00mcountry[37m [39;49;00m[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m

+--------+
| country|
+--------+
|     USA|
|  Taiwan|
|   China|
|S. Korea|
+--------+



In [9]:
manual_df.ai.transform("Sales sum for units sold by Region").show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00mRegion,[37m [39;49;00m[34mSUM[39;49;00m(`Units[37m [39;49;00mSold`)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mSales_Sum[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mGROUP[39;49;00m[37m [39;49;00m[34mBY[39;49;00m[37m [39;49;00mRegion[37m[39;49;00m

+--------------------+---------+
|              Region|Sales_Sum|
+--------------------+---------+
|Middle East and N...|   682363|
|Australia and Oce...|   417298|
|              Europe|  1285808|
|  Sub-Saharan Africa|  1386894|
|Central America a...|   503362|
|       North America|   100739|
|                Asia|   677524|
+--------------------+---------+



In [10]:
manual_df.ai.transform("Sum of all the orders served in North America").show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m[34mSUM[39;49;00m(`Units[37m [39;49;00mSold`)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mTotal_Orders_Served[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00mtemp_view_for_transform[37m[39;49;00m
[34mWHERE[39;49;00m[37m [39;49;00mRegion[37m [39;49;00m=[37m [39;49;00m[33m'North America'[39;49;00m[37m[39;49;00m

+-------------------+
|Total_Orders_Served|
+-------------------+
|             100739|
+-------------------+



# Explain

In [11]:
tech_comp_df.ai.explain()

'In summary, this dataframe is creating a view named `tech_companies` from a local relation `v1`. The view contains three columns: `company_name`, `market_cap_trillion`, and `country`. The `company_name` and `country` columns are cast to string type, and the `market_cap_trillion` column is cast to decimal type with precision 6 and scale 5.'

In [12]:
manual_df.ai.explain()

'In summary, this dataframe is retrieving all records from the CSV file. The columns include Region, Country, Item Type, Sales Channel, Order Priority, Order Date, Order ID, Ship Date, Units Sold, Unit Price, Unit Cost, Total Revenue, Total Cost, and Total Profit.'

# Verify

In [13]:
tech_comp_df.ai.verify("name of companies should be unique")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mhas_unique_company_names[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m functions [34mas[39;49;00m F[37m[39;49;00m
[37m[39;49;00m
    [37m# Count the number of unique company names[39;49;00m[37m[39;49;00m
    unique_company_names = df.select(F.countDistinct([33m"[39;49;00m[33mcompany_name[39;49;00m[33m"[39;49;00m)).collect()[[34m0[39;49;00m][[34m0[39;49;00m][37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the number of unique company names is equal to the total number of rows[39;49;00m[37m[39;49;00m
    [34mif[39;49;00m unique_company_names == df.count():[37m[39;49;00m
        [34mreturn[39;49;00m [34mTrue[39;49;00m[37m[39;49;00m
    [34melse[39;49;00m:[37m[39;49;00m
        [34mreturn[39;49;00m [34mFalse[39;49;00m[37m[39;49;00m
[37m[39;49;00m
result =

In [4]:
manual_df.ai.verify("Sales count by Region should be greater than 1000")

[92mINFO: [0mGenerated code:
[34mdef[39;49;00m [32mcheck_sales_count[39;49;00m(df) -> [36mbool[39;49;00m:[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mimport[39;49;00m col, [36msum[39;49;00m[37m[39;49;00m
[37m[39;49;00m
    [37m# Group by Region and sum the Units Sold[39;49;00m[37m[39;49;00m
    df_grouped = df.groupBy([33m"[39;49;00m[33mRegion[39;49;00m[33m"[39;49;00m).agg([36msum[39;49;00m([33m"[39;49;00m[33mUnits Sold[39;49;00m[33m"[39;49;00m).alias([33m"[39;49;00m[33mTotal Sales[39;49;00m[33m"[39;49;00m))[37m[39;49;00m
[37m[39;49;00m
    [37m# Check if the minimum Total Sales is greater than 1000[39;49;00m[37m[39;49;00m
    min_sales = df_grouped.agg({[33m"[39;49;00m[33mTotal Sales[39;49;00m[33m"[39;49;00m: [33m"[39;49;00m[33mmin[39;49;00m[33m"[39;49;00m}).collect()[[34m0[39;49;00m][[34m0[39;49

# UDFs

In [15]:
# one of the examples
@spark_ai.udf
def convert_grades(grade_percent: float) -> str:
    """Convert the grade percent to a letter grade using standard cutoffs"""

[92mINFO: [0mCreating following Python UDF:
[34mdef[39;49;00m [32mconvert_grades[39;49;00m(grade_percent) -> [36mstr[39;49;00m:[37m[39;49;00m
    [34mif[39;49;00m grade_percent [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m:[37m[39;49;00m
        [34mif[39;49;00m grade_percent >= [34m90.0[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m'[39;49;00m[33mA[39;49;00m[33m'[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m80.0[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m'[39;49;00m[33mB[39;49;00m[33m'[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m70.0[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m'[39;49;00m[33mC[39;49;00m[33m'[39;49;00m[37m[39;49;00m
        [34melif[39;49;00m grade_percent >= [34m60.0[39;49;00m:[37m[39;49;00m
            [34mreturn[39;49;00m [33m'[39;49;00m[33mD[39;49;00m[33m'[39;49;00m[37m

In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.udf.register("convert_grades", convert_grades)
percentGrades = [(1, 97.8), (2, 72.3), (3, 81.2)]
df = spark.createDataFrame(percentGrades, ["student_id", "grade_percent"])
df.selectExpr("student_id", "convert_grades(grade_percent)").show()

                                                                                

+----------+-----------------------------+
|student_id|convert_grades(grade_percent)|
+----------+-----------------------------+
|         1|                            A|
|         2|                            C|
|         3|                            B|
+----------+-----------------------------+



In [17]:
@spark_ai.udf
def diff_bw_ship_date_and_order_date(ship_date, order_date) -> int:
    """Order date and ship date are date strings formatted as MM/DD/YYYY, calculate the difference in days"""

[92mINFO: [0mCreating following Python UDF:
[34mdef[39;49;00m [32mdiff_bw_ship_date_and_order_date[39;49;00m(ship_date, order_date):[37m[39;49;00m
    [34mfrom[39;49;00m [04m[36mdatetime[39;49;00m [34mimport[39;49;00m datetime[37m[39;49;00m
    [34mif[39;49;00m ship_date [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m [35mand[39;49;00m order_date [35mis[39;49;00m [35mnot[39;49;00m [34mNone[39;49;00m:[37m[39;49;00m
        ship_date = datetime.strptime(ship_date, [33m'[39;49;00m[33m%[39;49;00m[33mm/[39;49;00m[33m%d[39;49;00m[33m/[39;49;00m[33m%[39;49;00m[33mY[39;49;00m[33m'[39;49;00m)[37m[39;49;00m
        order_date = datetime.strptime(order_date, [33m'[39;49;00m[33m%[39;49;00m[33mm/[39;49;00m[33m%d[39;49;00m[33m/[39;49;00m[33m%[39;49;00m[33mY[39;49;00m[33m'[39;49;00m)[37m[39;49;00m
        [34mreturn[39;49;00m (ship_date - order_date).days[37m[39;49;00m



In [18]:
spark.udf.register("diff_bw_ship_date_and_order_date", diff_bw_ship_date_and_order_date)
manual_df.createOrReplaceTempView("manualDF")

In [19]:
manual_df.ai.transform("difference between ship date and order date using udf diff_bw_ship_date_and_order_date ").show()

[92mINFO: [0mSQL query for the transform:
[34mSELECT[39;49;00m[37m [39;49;00m
[37m    [39;49;00m`[34mOrder[39;49;00m[37m [39;49;00mID`,[37m [39;49;00m
[37m    [39;49;00m`[34mOrder[39;49;00m[37m [39;49;00m[36mDate[39;49;00m`,[37m [39;49;00m
[37m    [39;49;00m`Ship[37m [39;49;00m[36mDate[39;49;00m`,[37m [39;49;00m
[37m    [39;49;00mdiff_bw_ship_date_and_order_date(`[34mOrder[39;49;00m[37m [39;49;00m[36mDate[39;49;00m`,[37m [39;49;00m`Ship[37m [39;49;00m[36mDate[39;49;00m`)[37m [39;49;00m[34mas[39;49;00m[37m [39;49;00mDate_Difference[37m[39;49;00m
[34mFROM[39;49;00m[37m [39;49;00m
[37m    [39;49;00mtemp_view_for_transform[37m[39;49;00m

+---------+----------+----------+---------------+
| Order ID|Order Date| Ship Date|Date_Difference|
+---------+----------+----------+---------------+
|686800706|10/18/2014|10/31/2014|            -13|
|185941302| 11/7/2011| 12/8/2011|            -31|
|246222341|10/31/2016| 12/9/2016|            -

# Output

In [20]:
spark_ai.commit()