In [0]:
from langchain_experimental.agents.agent_toolkits import create_spark_dataframe_agent
from langchain_openai import ChatOpenAI
from pyspark.sql import SparkSession

In [0]:
spark=SparkSession.builder.getOrCreate()

In [0]:
df=spark.sql("select * from samples.nyctaxi.trips")
df.show()

+--------------------+---------------------+-------------+-----------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip|
+--------------------+---------------------+-------------+-----------+----------+-----------+
| 2016-02-14 16:52:13|  2016-02-14 17:16:04|         4.94|       19.0|     10282|      10171|
| 2016-02-04 18:44:19|  2016-02-04 18:46:00|         0.28|        3.5|     10110|      10110|
| 2016-02-17 17:13:57|  2016-02-17 17:17:55|          0.7|        5.0|     10103|      10023|
| 2016-02-18 10:36:07|  2016-02-18 10:41:45|          0.8|        6.0|     10022|      10017|
| 2016-02-22 14:14:41|  2016-02-22 14:31:52|         4.51|       17.0|     10110|      10282|
| 2016-02-05 06:45:02|  2016-02-05 06:50:26|          1.8|        7.0|     10009|      10065|
| 2016-02-15 15:03:28|  2016-02-15 15:18:45|         2.58|       12.0|     10153|      10199|
| 2016-02-25 19:09:26|  2016-02-25 19:24:50|          1.4|  

In [0]:
df.count()

21932

In [0]:
llm=ChatOpenAI(model="gpt-4",temperature=0)
agent=create_spark_dataframe_agent(llm=llm,df=df,verbose=True)

In [0]:
agent.run("what are the columns available?")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To get the columns of a dataframe in PySpark, we can use the `columns` property of the dataframe.
Action: python_repl_ast
Action Input: df.columns[0m
Observation: [36;1m[1;3m['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount', 'pickup_zip', 'dropoff_zip'][0m
Thought:[32;1m[1;3mI now know the columns available in the dataframe.
Final Answer: The columns available are 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount', 'pickup_zip', 'dropoff_zip'.[0m

[1m> Finished chain.[0m


"The columns available are 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 'fare_amount', 'pickup_zip', 'dropoff_zip'."

In [0]:
agent.run("what is maximum trip distance")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find the maximum trip distance, I need to use the `max()` function on the `trip_distance` column of the dataframe.
Action: python_repl_ast
Action Input: df.agg({"trip_distance": "max"}).collect()[0][0][0m
Observation: [36;1m[1;3m30.6[0m
Thought:[32;1m[1;3mThe maximum trip distance in the dataframe is 30.6 miles.
Final Answer: 30.6[0m

[1m> Finished chain.[0m


'30.6'

In [0]:
agent.run("which zip code has maxiumum number of pick up")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find the zip code with the maximum number of pickups, we need to group the dataframe by the 'pickup_zip' column and count the number of rows in each group. Then, we need to order the groups by the count in descending order and select the first row.
Action: python_repl_ast
Action Input: df.groupBy('pickup_zip').count().orderBy('count', ascending=False).first()[0m
Observation: [36;1m[1;3mRow(pickup_zip=10001, count=1227)[0m
Thought:[32;1m[1;3mThe zip code with the maximum number of pickups is 10001.
Final Answer: 10001[0m

[1m> Finished chain.[0m


'10001'

In [0]:
agent.run("which zip code has maxiumum number of drop")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find the zip code with the maximum number of drops, we need to group the dataframe by the 'dropoff_zip' column and count the number of rows in each group. Then, we can sort the result in descending order and select the first row.
Action: python_repl_ast
Action Input: df.groupBy('dropoff_zip').count().orderBy('count', ascending=False).first()[0m
Observation: [36;1m[1;3mRow(dropoff_zip=10001, count=1069)[0m
Thought:[32;1m[1;3mThe zip code with the maximum number of drops is 10001.
Final Answer: 10001[0m

[1m> Finished chain.[0m


'10001'

In [0]:
agent.run("which date has maximum number of drop")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find the date with the maximum number of drops, we need to group the data by the dropoff date and then count the number of rows for each date. The date with the maximum count is the date with the maximum number of drops.
Action: python_repl_ast
Action Input: df.groupBy(df.tpep_dropoff_datetime).count().orderBy('count', ascending=False).first()[0m
Observation: [36;1m[1;3mRow(tpep_dropoff_datetime=datetime.datetime(2016, 2, 14, 18, 25, 41), count=2)[0m
Thought:[32;1m[1;3mThe date with the maximum number of drops is 2016-02-14 18:25:41 with 2 drops.
Final Answer: The date with the maximum number of drops is 2016-02-14 18:25:41.[0m

[1m> Finished chain.[0m


'The date with the maximum number of drops is 2016-02-14 18:25:41.'

In [0]:
agent.run("what is the maximum fare considering all trips")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: To find the maximum fare, I need to use the `max()` function on the `fare_amount` column of the dataframe.
Action: python_repl_ast
Action Input: df.agg({"fare_amount": "max"}).collect()[0m
Observation: [36;1m[1;3m[Row(max(fare_amount)=275.0)][0m
Thought:[32;1m[1;3mThe maximum fare amount for all trips is 275.0
Final Answer: 275.0[0m

[1m> Finished chain.[0m


'275.0'