# Pig to PySpark
* **Tools:** Ollama, Langchain, Autogen
* **TODO:**
  * Build user interface where PIG code can be copy/pasted or uploaded. 
  * User Autogen for creating CSV input data
  * Re-write as Python files (.py)

---
* **Next Steps:**
  * (05-08-2024) Looks like a popular opinion is ["LangGraph is better than AutoGen"](https://www.reddit.com/r/LangChain/comments/1b7q44y/autogen_vs_langgraph/?onetap_auto=true&one_tap=true). --> Swap out Autogen with LangGraph. 

---

## 0. Initial Setup 

* generate LangSmith API key.
* TODO: How to safely save and load API keys
* https://docs.smith.langchain.com/

In [1]:
import os 
from dotenv import load_dotenv
import re
import pandas as pd 

# langchian 
from langchain_community.document_loaders import WebBaseLoader # this is for pulling 
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
# from langchain_mistralai.chat_models import ChatMistralAI
from langchain_core.output_parsers import JsonOutputParser
from datetime import datetime

# Autogen
from autogen import AssistantAgent, UserProxyAgent

# from langchain_mistralai import MistralAIEmbeddings
load_dotenv()

os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANGCHAIN_API_KEY')

In [2]:
# configure 
run_local = "Yes"

# select llm model 
# local_llm = "mistral" # mistral: https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_crag_local.ipynb
local_llm = "mixtral"  # mixtral: https://scalastic.io/en/mixtral-ollama-llamaindex-llm/
# local_llm = "llama3" # llama3: https://python.langchain.com/docs/integrations/chat/ollama/
# local_llm = "codellama"

**IMPORTANT! Make sure to run the below in terminal to start ollama and download LLM model.**  
`ollama serve`  
`ollama pull {model_name}`

TODO: Automate the above (e.g., add to Dockerfile).

---

## 1. RAG (Index?) - Uplaod Supporting Documents 
Not really needed for this project but as a placeholder add vector DB.   
* reference: https://github.com/langchain-ai/langgraph/blob/main/examples/rag/langgraph_crag.ipynb?ref=blog.langchain.dev

In [3]:
# Load
url = "https://github.com/palantir/pyspark-style-guide"
loader = WebBaseLoader(url)
docs = loader.load()

# Split
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=500, chunk_overlap=100
)
all_splits = text_splitter.split_documents(docs)

# Embed and index
if run_local == "Yes":
    embedding = GPT4AllEmbeddings()
else:
    # embedding = MistralAIEmbeddings(mistral_api_key=mistral_api_key)
    pass

# Index
vectorstore = Chroma.from_documents(
    documents=all_splits,
    collection_name="rag-chroma",
    embedding=embedding,
)

Downloading: 100%|██████████| 45.9M/45.9M [00:01<00:00, 38.4MiB/s]
Verifying: 100%|██████████| 45.9M/45.9M [00:00<00:00, 765MiB/s]


In [4]:
# create a retriever
retriever = vectorstore.as_retriever()

retriever is not yet used in this demo.

---

## 2. LLMs

We build two sets of LLMs: 
1. PIG code --> create benchmark input data (if none is provided by user)
2. PIG code --> PySpark code

**references:** 
* https://python.langchain.com/docs/integrations/chat/ollama/
* JsonOutputParser: https://api.python.langchain.com/en/latest/output_parsers/langchain_core.output_parsers.json.JsonOutputParser.html
* OutputParser: https://medium.com/@larry_nguyen/langchain-101-lesson-3-output-parser-406591b094d7

### 2.1. Generate input data (CSV) given Pig code

If the user does not provide benchmark input data for testing the PIG code, we will use LLM to generate a Python script. This script will create and save sample data to the specified folder.

We achieve this in the following steps: 
1. Create an LLM prompt template that outputs a Python script designed to generate and save CSV data based on the given PIG code.
2. Execute the LLM to produce the Python script.
3. Parse the execute Python script.
4. Execute the Python script, which results in the CSV file being saved.

In [5]:
# we use locally hosted llm models 
llm = ChatOllama(model = local_llm, format = "json", temperature=0.2)

## Create two templates: 
# 1. pig code --> benchmark input data
prompt_data_gen = PromptTemplate(
    template="""
    You are an expert data scientist fluent in PIG and Python coding languages.
    Generate Python code that do the following: 
    1. Generate 20 lines or more CSV data that can be used to test the PIG code. 
       Ensure column names are consistent with the names in PIG code. 
    2. Write Python code that save this CSV data to the directory provided. 
        
    Here is the PIG code: \n\n {pig_code} \n\n
    Here is the directory to save CSV file: \n\n {sample_input_path} \n\n

    Give a string of Python code with correct indentation that can be ran to create and save CSV file to correct path. 
    Avoid using generic column names (e.g., amount, date) to avoid conflict with reserved words. 
    Provide this as a JSON with a single key 'data_gen_code' and no premable or explaination.""",
    input_variables=["pig_code", "sample_input_path"],
)
sample_input_code_generator = prompt_data_gen | llm | JsonOutputParser()

_**Below codes are work in progress.**_

In [6]:
# If there was an error in the outputted Python script for generating CSV file, 
#  add that error back to LLM and re-generate an updated Python script. 
prompt_data_regen = PromptTemplate(
    template="""
    You are an expert data scientist fluent in PIG and Python coding languages.
    Generate Python code that do the following: 
    * Debug and share updated Python code to generate 100 lines or more CSV data that can be used to thest the PIG code. 
    * Use the error message and the data that resulted in error as a reference to fix the Python code. 
        
    Here is the PIG code: \n\n {pig_code} \n\n
    Here is the Python code with error: \n\n {pycode_error} \n\n
    Here is the Python code error message: \n\n {pycode_error_message} \n\n
    Here is the directory to save CSV file: \n\n {sample_input_path} \n\n

    Give a string of Python code with correct indentation that can be ran to create and save CSV file with more than 100 records to correct path. 
    Provide this as a JSON with a single key 'data_gen_code' and no premable or explaination.""",
    input_variables=["pig_code", "pycode_error", "pycode_error_message", "sample_input_path"],
)
fix_sample_input_code_generator = prompt_data_regen | llm | JsonOutputParser()


# 2. pig code to pyspark code 
prompt_pig2pyspark = PromptTemplate(
    template="""
    You are an expert data scientist fluent in PIG and PySpark coding languages.
    Generate PySpark code that do the following: 
    * Implement same logic and methods as the provided PIG code. 
    * When ran against a sample input data, outputs identical result as PIG code. 
        
    Here is the PIG code: \n\n {pig_code} \n\n

    Give a string of PySpark code with correct indentation. 
    Provide this as a JSON with a single key 'pyspark_code' and no premable or explaination.""",
    input_variables=["pig_code"],
)
pig_to_pyspark_converter = prompt_pig2pyspark | llm | JsonOutputParser()

prompt_pig2pyspark_regen = PromptTemplate(
    template="""
    You are an expert data scientist fluent in PIG and PySpark coding languages.
    Generate PySpark code that do the following: 
    * Implement same logic and methods as the provided PIG code. 
    * Use the PySpark code that returned an error message to update the PySpark code. 
    * Use the PySpark code error message to update the PySpark code. 
    * When ran against a sample input data, outputs identical result as PIG code. 
        
    Here is the PIG code: \n\n {pig_code} \n\n
    Here is the PySpark code with error: \n\n {pycode_error} \n\n
    Here is the PySpark code error message: \n\n {pycode_error_message} \n\n

    Give a string of PySpark code with correct indentation. 
    Provide this as a JSON with a single key 'pyspark_code' and no premable or explaination.""",
    input_variables=["pig_code", "pycode_error", "pycode_error_message"],
)
fix_pig_to_pyspark_converter = prompt_pig2pyspark | llm | JsonOutputParser()

In [8]:
## test with sample PIG code 
pig_script_dir = './scripts/pig1.pig'

with open(pig_script_dir, 'r') as file:
    sample_pig_code = file.read()

print('*'*88)
print("Pig Code\n")
print(sample_pig_code)
print('*'*88)

data_output_dir = './data'

datagen_code = sample_input_code_generator.invoke({"pig_code": sample_pig_code, 
                                                   "sample_input_path": data_output_dir})
print('*'*88)
print("Python Code to generate sample data:\n")

print(datagen_code['data_gen_code'])
print('*'*88)

****************************************************************************************
Pig Code

-- Load input data into a relation named 'sales_data'
sales_data = LOAD 'data/sales_data.csv' USING PigStorage(',')
    AS (transaction_id: int, product_name: chararray, category: chararray, 
        quantity_sold: int, price: double, transaction_date: chararray, store_location: chararray);

-- Filter the data to include only the 'Electronics' category
electronics_sales = FILTER sales_data BY category == 'Electronics';

-- Group the data by product_name within the Electronics category
grouped_electronics = GROUP electronics_sales BY product_name;

-- Calculate the total sales and total quantity for each product
aggregated_electronics = FOREACH grouped_electronics GENERATE
    group AS product_name,
    SUM(electronics_sales.price * electronics_sales.quantity_sold) AS total_sales,
    SUM(electronics_sales.quantity_sold) AS total_units_sold;

-- Order the results by total sales in descendi

**TODO:** 
* Add unit tests to ensure generated data is useful.
* Number of records are incorrect. 

### Parse and Run Python Code

In [9]:
def normalize_indentation(code):
    """
    Normalize the indentation of a given block of code to the least indented level.
    This is useful for processing code blocks that may have variable indentation.

    Parameters:
    - code (str): A string of code with potentially inconsistent indentation.

    Returns:
    - str: The normalized code with the base indentation removed.
    """
    lines = code.split('\n')
    # Find the first non-empty line to determine the base indentation level
    base_indent = None
    for line in lines:
        stripped_line = line.lstrip()
        if stripped_line:
            base_indent = len(line) - len(stripped_line)
            break

    if base_indent is None:
        return code  # Return original code if it's all empty lines or no base indent found

    # Normalize each line by removing the base indentation
    normalized_lines = []
    for line in lines:
        stripped_line = line.lstrip()
        if len(line) > base_indent:
            normalized_lines.append(line[base_indent:])
        else:
            normalized_lines.append(stripped_line)

    return '\n'.join(normalized_lines)


def parse_python_code_from_text(text):
    """
    Extracts the first Python code block found between triple backticks (markdown code block).

    Parameters:
    - text (str): A string that may contain a markdown-formatted Python code block.

    Returns:
    - str: The first Python code block extracted from the text, or None if no block is found.
    """
    normalized_text = normalize_indentation(text)
    
    # Define the pattern to extract code between ```python and ```
    pattern = r'```python\s*(.*?)\s*```'
    match = re.search(pattern, normalized_text, re.DOTALL)
    
    if match:
        code_to_execute = match.group(1)
        print(code_to_execute)
        return code_to_execute
    else:
        print("No Python code block found.")


def run_python_code(code):
    """
    Executes Python code provided as a string and returns a message indicating the success of the execution,
    or returns an error message if the execution fails.
    
    Parameters:
    - code (str): A string containing Python code to be executed.
    
    Returns:
    - tuple: A tuple where the first element is a message indicating the results or success of the code execution,
             and the second element is an error message if an exception occurs, or None if the execution is successful.
    """
    try:
        # Dictionary to capture local execution context
        local_vars = {}
        exec(code, globals(), local_vars)
        
        # Construct a success message
        success_message = "Code executed successfully. Variables: " + str(local_vars)
        
        # Return the success message and None for the error message
        return success_message, None
    except Exception as e:
        # Return None for the result and the error message
        return None, str(e)


In [10]:
run_python_code(datagen_code['data_gen_code'])

("Code executed successfully. Variables: {'csv': <module 'csv' from '/usr/lib/python3.10/csv.py'>, 'sales_file': <_io.TextIOWrapper name='./data/sales_data.csv' mode='w' encoding='UTF-8'>, 'fieldnames': ['transaction_id', 'product_name', 'category', 'quantity_sold', 'price', 'transaction_date', 'store_location'], 'writer': <csv.DictWriter object at 0x7f03726af5b0>, 'i': 19, 'data': {'transaction_id': 19, 'product_name': 'Product_19', 'category': 'Electronics', 'quantity_sold': 29, 'price': 195.0, 'transaction_date': '2022-01-02', 'store_location': 'Location_3'}}",
 None)

In [12]:
# check outputted data
sample_df = pd.read_csv('./data/sales_data.csv')
sample_df

Unnamed: 0,transaction_id,product_name,category,quantity_sold,price,transaction_date,store_location
0,0,Product_0,Electronics,10,100.0,2022-01-01,Location_0
1,1,Product_1,Electronics,11,105.0,2022-01-02,Location_1
2,2,Product_2,Electronics,12,110.0,2022-01-03,Location_2
3,3,Product_3,Electronics,13,115.0,2022-01-01,Location_3
4,4,Product_4,Electronics,14,120.0,2022-01-02,Location_0
5,5,Product_5,Electronics,15,125.0,2022-01-03,Location_1
6,6,Product_6,Electronics,16,130.0,2022-01-01,Location_2
7,7,Product_7,Electronics,17,135.0,2022-01-02,Location_3
8,8,Product_8,Electronics,18,140.0,2022-01-03,Location_0
9,9,Product_9,Electronics,19,145.0,2022-01-01,Location_1


**TODO:** Load latest data dynamically.

---

### 2.2. Generate PySpark code given Pig code

We use Autogen's multi-agent system to produce PySpark code.

We achieve this in the following steps: 
1. Create an LLM prompt template that outputs a PySpark script that performs identical task as the given PIG code.
2. Execute the LLM to produce the PySpark script.
3. Parse the execute PySpark script.  
   3.1. If error is returned, Autogen will fix and re-generate PySpark code.  
   3.2. If execution is successful, PySpark code is returned.

**reference:** 
* https://github.com/microsoft/autogen/blob/main/notebook/agentchat_auto_feedback_from_code_execution.ipynb

In [13]:
config_list = [
  {
    "model": local_llm,
    "base_url": "http://localhost:11434/v1",
    "api_key": os.getenv('LANGCHAIN_API_KEY'),
  }
]

assistant = AssistantAgent("assistant", llm_config={"config_list": config_list})

user_proxy = UserProxyAgent("user_proxy", code_execution_config={"work_dir": "coding", "use_docker": False})

# prompt 
user_proxy.initiate_chat(assistant, message=f"""
Convert the following PIG code to PySpark code:

Apache Pig code: {sample_pig_code}

""")

[33muser_proxy[0m (to assistant):


Convert the following PIG code to PySpark code:

Apache Pig code: -- Load input data into a relation named 'sales_data'
sales_data = LOAD 'data/sales_data.csv' USING PigStorage(',')
    AS (transaction_id: int, product_name: chararray, category: chararray, 
        quantity_sold: int, price: double, transaction_date: chararray, store_location: chararray);

-- Filter the data to include only the 'Electronics' category
electronics_sales = FILTER sales_data BY category == 'Electronics';

-- Group the data by product_name within the Electronics category
grouped_electronics = GROUP electronics_sales BY product_name;

-- Calculate the total sales and total quantity for each product
aggregated_electronics = FOREACH grouped_electronics GENERATE
    group AS product_name,
    SUM(electronics_sales.price * electronics_sales.quantity_sold) AS total_sales,
    SUM(electronics_sales.quantity_sold) AS total_units_sold;

-- Order the results by total sales in des

Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  input data is located at /workspace/data/ absolute path 


[33muser_proxy[0m (to assistant):

input data is located at /workspace/data/ absolute path 

--------------------------------------------------------------------------------
[33massistant[0m (to user_proxy):

 Given the updated information about the absolute path of the input data, here's the modified PySpark script:
```python
# filename: pyspark_script.py
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

spark = SparkSession.builder.appName('PigToPySpark').getOrCreate()

# Load input data into a DataFrame named 'sales_data'
input_path = "/workspace/data/"
sales_data = spark.read.option("header", "true").csv(input_path + "sales_data.csv") \
    .selectExpr("cast(_c0 as int) transaction_id", "_c1 product_name", "_c2 category", 
                "cast(_c3 as int) quantity_sold", "cast(_c4 as double) price", "_c5 transaction_date", 
                "_c6 store_l

Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  


[31m
>>>>>>>> NO HUMAN INPUT RECEIVED.[0m
[31m
>>>>>>>> USING AUTO REPLY...[0m
[31m
>>>>>>>> EXECUTING CODE BLOCK 0 (inferred language is python)...[0m
[33muser_proxy[0m (to assistant):

exitcode: 1 (execution failed)
Code output: 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/09 05:00:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "pyspark_script.py", line 14, in <module>
    .selectExpr("cast(_c0 as int) transaction_id", "_c1 product_name", "_c2 category", 
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/dataframe.py", line 3267, in selectExpr
    jdf = self._jdf.selectExpr(self._jseq(expr))
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.10/d

Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  


[31m
>>>>>>>> NO HUMAN INPUT RECEIVED.[0m
[31m
>>>>>>>> USING AUTO REPLY...[0m
[31m
>>>>>>>> EXECUTING CODE BLOCK 0 (inferred language is python)...[0m
[33muser_proxy[0m (to assistant):

exitcode: 1 (execution failed)
Code output: 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/09 05:01:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "pyspark_script.py", line 13, in <module>
    sales_data = spark.read.option("header", "true").csv(input_path + "input/pig_files/*")
  File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/readwriter.py", line 740, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  F

Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  sales data (sales_data.csv) is located at /workspace/data/sales_data.csv absolute path


[33muser_proxy[0m (to assistant):

sales data (sales_data.csv) is located at /workspace/data/sales_data.csv absolute path

--------------------------------------------------------------------------------
[33massistant[0m (to user_proxy):

 I will modify the PySpark script to load 'sales\_data.csv' directly instead of searching for a directory called 'pig\_files'.
```python
# filename: pyspark_script.py
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
import os

os.environ["PYSPARK_PYTHON"] = "python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"

spark = SparkSession.builder.appName('PigToPySpark').getOrCreate()

# Load input data into a DataFrame named 'sales_data'
input_path = "/workspace/data/"
sales_data_csv = os.path.join(input_path, "sales_data.csv")

sales_data = spark.read.option("header", "true").csv(sales_data_csv)

# ... Continue with the rest of your PySpark code.
```
Replace '/workspace/data/' in 'sales\_data\_csv' variable with your actua

Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  


[31m
>>>>>>>> NO HUMAN INPUT RECEIVED.[0m
[31m
>>>>>>>> USING AUTO REPLY...[0m
[31m
>>>>>>>> EXECUTING CODE BLOCK 0 (inferred language is python)...[0m
[31m
>>>>>>>> EXECUTING CODE BLOCK 1 (inferred language is bash)...[0m
[33muser_proxy[0m (to assistant):

exitcode: 0 (execution succeeded)
Code output: 



--------------------------------------------------------------------------------
[33massistant[0m (to user_proxy):

 As there is no output provided, I'll assume that the script executed successfully without any errors or issues. If you have questions or need further assistance regarding PySpark or any other topic, feel free to ask! TERMINATE

--------------------------------------------------------------------------------


Provide feedback to assistant. Press enter to skip and use auto-reply, or type 'exit' to end the conversation:  exit


ChatResult(chat_id=None, chat_history=[{'content': "\nConvert the following PIG code to PySpark code:\n\nApache Pig code: -- Load input data into a relation named 'sales_data'\nsales_data = LOAD 'data/sales_data.csv' USING PigStorage(',')\n    AS (transaction_id: int, product_name: chararray, category: chararray, \n        quantity_sold: int, price: double, transaction_date: chararray, store_location: chararray);\n\n-- Filter the data to include only the 'Electronics' category\nelectronics_sales = FILTER sales_data BY category == 'Electronics';\n\n-- Group the data by product_name within the Electronics category\ngrouped_electronics = GROUP electronics_sales BY product_name;\n\n-- Calculate the total sales and total quantity for each product\naggregated_electronics = FOREACH grouped_electronics GENERATE\n    group AS product_name,\n    SUM(electronics_sales.price * electronics_sales.quantity_sold) AS total_sales,\n    SUM(electronics_sales.quantity_sold) AS total_units_sold;\n\n-- Orde

---
## Work In Progress
* Add data check functionality. Potentially, we could use LangGraph.
* Replace AutoGen with LangGraph.
* Productionization of Jupyter Notebook
* User Interface