# Code Transcribe 
* Langchain
* LangSmith
* Semantic Kernel
* Async Calls
* Build APIs
* Integrate Airflow, DB, Logging

Models:
* Mistral Dolphin: 
* 

1. Take user input. User can either paste or upload PIG code. The uploaded or pasted file will be saved to a local directory (or a database) 
2. (optional) user uploads a sample data to run against the PIG code. THe output from the code will be used to validate PySpark code by ensuring the two outputs are identical. 
3. If no sample data is uploaded, LLM generates a sample data given the PIG code. 
3.1. LLM generate Python code that build and save sample file to local directory and database
3.2. PIG executor runs the PIG code to 1) ensure the sample data is runnable and 2) save the output(s) for future comparison with results from PySpark code 
4. LLM generates PySpark that is equivalent to PIG code. 
5. PySpark executor runs the PySpark code. 
5.1. If error is occured, the error is fed back to LLM to fix the PySPark code 
5.2. If no error occured 
5.2.1. If output data is different from the one resulting from PIG code, the difference is fed back to LLM to fix the PySpark code 
5.2.2. If output data is identical to the one resulting from PIG code, then we can tell that the correct PYSpark code that is identical to PIG code is written> 
6. PySpark code is returned to user. 

In [20]:
# TODO 1: Use LangGraph 
# TODO 2: Use Ollama to run LLM models locally

---

## 1. Install and Load Dependencies

In [1]:
# this will be handled in requirements.txt

In [2]:
from langchain_community.llms.huggingface_pipeline import HuggingFacePipeline
from langchain_core.prompts.prompt import PromptTemplate
import regex as re
import subprocess
import os
import pandas as pd
import glob
import pyspark as spark

---

## Load LLM 

In [3]:
class LanguageModelService:
    def __init__(self, model_id="mistralai/Mistral-7B-Instruct-v0.2", # LLM pre-trained model 
                 device=0, # Use 1st GPU 
                 max_new_tokens=1000 # output token count 
                ):
        """
        Initializes the model with the specified parameters.
        """
        self.hf = HuggingFacePipeline.from_model_id(
            model_id=model_id,
            task="text-generation",
            device=device,
            pipeline_kwargs={"max_new_tokens": max_new_tokens},
        )
        
    def query(self, input_text):
        """
        Sends a custom query to the model and returns the output.
        """
        result = self.hf(input_text)
        return result.text if hasattr(result, 'text') else result

# Usage
%time  lm_service = LanguageModelService()  # Initialize once

  from .autonotebook import tqdm as notebook_tqdm
Downloading shards: 100%|██████████| 3/3 [05:59<00:00, 119.84s/it]
Loading checkpoint shards: 100%|██████████| 3/3 [00:05<00:00,  1.82s/it]


CPU times: user 1min 20s, sys: 1min 57s, total: 3min 18s
Wall time: 6min 40s


---

## 2. Load PIG Code (optionsal: Upload sample data)

In [4]:
# temporarily work with pre-written code
pig_script_dir = './scripts/pig1.pig'

with open(pig_script_dir, 'r') as file:
    pig_code = file.read()

print(pig_code)

-- Load the data from a CSV file
transactions = LOAD 'data/sample1.csv' USING PigStorage(',') 
    AS (depStore:chararray, date:chararray, amount:int);

-- Filter transactions to include only those where the amount is greater than 200
high_value_transactions = FILTER transactions BY amount > 200;

-- Group the transactions by store
grouped_by_store = GROUP high_value_transactions BY depStore;

-- Calculate total and average sales per depStore
sales_summary = FOREACH grouped_by_store GENERATE 
    group AS depStore,
    SUM(high_value_transactions.amount) AS total_sales,
    AVG(high_value_transactions.amount) AS average_sales;

-- Store the summary in a CSV file
STORE sales_summary INTO 'output/sales_summary' USING PigStorage(',');

-- Optional: Just for demonstration, store filtered data to another directory
STORE high_value_transactions INTO 'output/high_value_transactions' USING PigStorage(',');



---

## 3. Create sample data 

Note Batch GPU can work: https://python.langchain.com/docs/integrations/llms/huggingface_pipelines/

### 3.1. Create sample test data using LLM (if none provided by user)

In [5]:
%%time 

query_create_sample_data = f"""
    Given the following PIG code, generate sample CSV data that will produce consistent 
    results when processed by this PIG code. The PIG code is intended to perform operations 
    such as filtering, grouping, and aggregation. Ensure that the sample data is diverse enough 
    to test all parts of the code effectively.\n\n
    PIG Code:\n{pig_code}\n\n
    Write a Python code that will save the sample CSV file to ./data/sample1.csv with a header column. Encapsulate the code between ```. 
    Make sure there is only one code chunk (```).:
"""

response = lm_service.query(query_create_sample_data)
print("Response from model:\n", response)

  warn_deprecated(


Response from model:
 
```python
import csv

data = [
    ['StoreA', '2022-01-01', 250],
    ['StoreB', '2022-01-02', 300],
    ['StoreA', '2022-01-03', 220],
    ['StoreC', '2022-01-04', 280],
    ['StoreB', '2022-01-05', 210],
    ['StoreA', '2022-01-06', 350],
    ['StoreC', '2022-01-07', 290],
    ['StoreB', '2022-01-08', 260],
    ['StoreA', '2022-01-09', 270],
    ['StoreC', '2022-01-10', 310],
]

with open('data/sample1.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    writer.writerow(["depStore", "date", "amount"])
    writer.writerows(data)
```

This Python code creates a list of transactions and writes it to a CSV file named 'data/sample1.csv' with a header row. The data is diverse enough to test the filtering, grouping, and aggregation operations in the PIG code effectively.
CPU times: user 7min 12s, sys: 47.2 s, total: 8min
Wall time: 8min


In [6]:
def normalize_indentation(code):
    lines = code.split('\n')
    # Find the first non-empty line to determine the base indentation level
    base_indent = None
    for line in lines:
        stripped_line = line.lstrip()
        if stripped_line:
            base_indent = len(line) - len(stripped_line)
            break

    if base_indent is None:
        return code  # Return original code if it's all empty lines or no base indent found

    # Normalize each line by removing the base indentation
    normalized_lines = []
    for line in lines:
        stripped_line = line.lstrip()
        if len(line) > base_indent:
            normalized_lines.append(line[base_indent:])
        else:
            normalized_lines.append(stripped_line)

    return '\n'.join(normalized_lines)

def parse_python_code_from_text(text):
    normalized_text = normalize_indentation(text)
    
    # Define the pattern to extract code between ```python and ```
    pattern = r'```python\s*(.*?)\s*```'
    match = re.search(pattern, normalized_text, re.DOTALL)
    
    if match:
        code_to_execute = match.group(1)
        print(code_to_execute)
        return code_to_execute
    else:
        print("No Python code block found.")

def run_pyspark_code(code):
    """
    Executes PySpark code, returns either error message or result DataFrame.
    Assumes PySpark session and context are already set.
    """
    try:
        exec(code)
        return None, globals().get('sales_summary')  # Assuming 'sales_summary' is the result DataFrame
    except Exception as e:
        return str(e), None


In [7]:
### 3.2. LLM may not output working test data. Repeat until correct data is outputtedz

### 3.3. Save sample data and output 

#### 3.3.1. Save Sample Data

In [8]:
code = parse_python_code_from_text(response)
run_pyspark_code(code)

import csv

data = [
    ['StoreA', '2022-01-01', 250],
    ['StoreB', '2022-01-02', 300],
    ['StoreA', '2022-01-03', 220],
    ['StoreC', '2022-01-04', 280],
    ['StoreB', '2022-01-05', 210],
    ['StoreA', '2022-01-06', 350],
    ['StoreC', '2022-01-07', 290],
    ['StoreB', '2022-01-08', 260],
    ['StoreA', '2022-01-09', 270],
    ['StoreC', '2022-01-10', 310],
]

with open('data/sample1.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile, delimiter=',')
    writer.writerow(["depStore", "date", "amount"])
    writer.writerows(data)


(None, None)

#### 3.3.2. Execute PIG code and save output 

In [9]:
def run_pig_script(script_path, data_path):
    # Set up the environment variable to point to the directory containing the data file
    os.environ['PIG_DATA_PATH'] = data_path

    # Execute the Pig script using subprocess, assuming Pig is installed and configured to run in local mode
    result = subprocess.run(['pig', '-x', 'local', '-f', script_path], capture_output=True, text=True)
    
    # Check the results of the Pig script execution
    if result.returncode != 0:
        print("Error occurred during Pig script execution:")
        print(result.stderr)
    else:
        print("Pig script executed successfully. Output:")
        print(result.stdout)

# Define the path to the Pig script and the directory containing the data file
pig_script = './scripts/pig1.pig'
csv_data = './data/'

# Execute the Pig script
run_pig_script(pig_script, csv_data)

Error occurred during Pig script execution:
2024-05-06 03:03:50,063 [main] INFO  org.apache.pig.Main - Apache Pig version 0.17.0 (r1797386) compiled Jun 02 2017, 15:41:58
2024-05-06 03:03:50,063 [main] INFO  org.apache.pig.Main - Logging error messages to: /workspace/pig_1714964630057.log
2024-05-06 03:03:50,079 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - user.name is deprecated. Instead, use mapreduce.job.user.name
2024-05-06 03:03:50,301 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found
2024-05-06 03:03:50,366 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2024-05-06 03:03:50,367 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2024-05-06 03:03:50,382 [main] INFO  org.apache.pig.PigServer - Pig Script ID for the session: PIG-pig1.pig-56c51554-a370-4bef-8174

In [12]:

def get_latest_file(directory, pattern):
    """
    Returns the path to the latest file in the given directory that matches the pattern.

    Args:
    directory (str): The directory to search in.
    pattern (str): The file name pattern to match.

    Returns:
    str: The path to the latest file matching the pattern or None if no file matches.
    """
    # Create the full path pattern
    search_pattern = os.path.join(directory, pattern)
    
    # List all files matching the pattern
    files = glob.glob(search_pattern)
    
    if not files:
        return None

    # Find the latest file based on last modification time
    latest_file = max(files, key=os.path.getmtime)
    
    return latest_file

# Example usage
directory = './output/high_value_transactions'
file_pattern = 'part-*'
latest_file = get_latest_file(directory, file_pattern)
print(f"The latest file is: {latest_file}")


The latest file is: ./output/high_value_transactions/part-00000-e24ceab2-7948-433d-b84d-e7078314d13f-c000.csv


In [13]:
# Path to the output file created by Pig
output_file = latest_file  # Adjust path as needed

# Read the CSV file into a DataFrame
df = pd.read_csv(output_file, names=['depStore', 'total_sales'])
df.head()

Unnamed: 0,depStore,total_sales
depStore,date,amount
StoreA,2022-01-01,250
StoreB,2022-01-02,300
StoreA,2022-01-03,220
StoreC,2022-01-04,280


---

In [14]:
# A.1. LLM transcriber PIG2PySpark
# Prompt: 
# You are an experienced Software Engineer and Machine Learning Engineer fluent in PIG and PySpark coding languages. 
# Rewrite the following PIG code into PySpark so that they can perform identical tasks and output identical results given same input data. 
# PIG Code: {pig_code}
# A.2. Code Parser: Parse and save PySpark code 

# B. Sample Data Builder 
# C. PIG Interpreter 
# D. PySpark Interpreter 

# If sample data NOT available (run B.)
  # Run PIG code and generate output 
  # 1. Generate PySpark code from PIG (run A.1) and parse/save the PySpark Code (run A.2)
# 2. Run against sample data in a separate module.
# 3. Check 

---

## Building LLM Layer - Pig2PySpark 
* https://medium.com/@yash9439/unleashing-the-power-of-falcon-code-a-comparative-analysis-of-implementation-approaches-803048ce65dc
* ref: https://medium.com/@ajay_khanna/leveraging-llama2-0-for-question-answering-on-your-own-data-using-cpu-aa6f75868d2d
* ref: https://medium.com/@murtuza753/using-llama-2-0-faiss-and-langchain-for-question-answering-on-your-own-data-682241488476
* https://wellsr.com/python/fine-tuning-llama2-for-question-answering-tasks/
* https://www.kaggle.com/code/gpreda/rag-using-llama-2-langchain-and-chromadb

In [15]:
prompt_pig2pyspark = f"""
You are an experienced software and machine learning engineer fluent in both PIG and PySpark. 
Re-write the following PIG code into PySpark code. 
Ensure PySpark code is logically identical and output identical results as the provided PIG code. 
Make sure to only share PySpark in a single code block (inside ```). 
PIG code: {pig_code}
"""

%time response = lm_service.query(prompt_pig2pyspark)
print("Response from model:\n", response)


  warn_deprecated(


CPU times: user 3min 3s, sys: 21.3 s, total: 3min 24s
Wall time: 3min 24s
Response from model:
 ```

```python
from pyspark.sql import SparkSession, functions as F

# Load the data from a CSV file
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()
transactions = spark.read.option("header", "true").option("inferSchema", "true").csv("data/sample1.csv")

# Filter transactions to include only those where the amount is greater than 200
high_value_transactions = transactions.filter(F.col("amount") > 200)

# Group the transactions by store
grouped_by_store = high_value_transactions.groupBy("depStore")

# Calculate total and average sales per depStore
sales_summary = grouped_by_store.agg(F.sum("amount").alias("total_sales"), F.avg("amount").alias("average_sales"))

# Store the summary in a CSV file
sales_summary.write.option("header", "true").csv("output/sales_summary", mode="overwrite")

# Optional: Just for demonstration, store filtered data to another directory
high_value_t

In [17]:
error_message = run_pyspark_code(response)
print(error_message)

('invalid syntax (<string>, line 1)', None)


In [18]:
def build_prompt_fix_error(pyspark_code, error_message, input_data):
    """
    Generate prompt to fix error in PySpark code.
    Returns a string with the prompt to fix errors.
    """
    return f"""There was an error in your PySpark code: {error_message}. Please fix and re-share the full PySpark code with relevant updates inside a single code cell.
    
    Below is the PySpark code that returned an error: 
    {pyspark_code}. 

    Below is the input data: 
    {input_data}
    """

In [19]:
prompt_fix_pyspark_code = build_prompt_fix_error(response, error_message, df)

response = lm_service.query(prompt_fix_pyspark_code)
print("Response from model:\n", response)

  warn_deprecated(


Response from model:
 
    Below is the expected output: 
                depStore  total_sales  average_sales
StoreA            3150         315.0
StoreB            1120         112.0
StoreC            1180         118.0
```

To fix the error, you need to remove the extra parentheses in the `agg` function. Here's the corrected code:

```python
from pyspark.sql import SparkSession, functions as F

# Load the data from a CSV file
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()
transactions = spark.read.option("header", "true").option("inferSchema", "true").csv("data/sample1.csv")

# Filter transactions to include only those where the amount is greater than 200
high_value_transactions = transactions.filter(F.col("amount") > 200)

# Group the transactions by store
grouped_by_store = high_value_transactions.groupBy("depStore")

# Calculate total and average sales per depStore
sales_summary = grouped_by_store.agg(F.sum("amount").alias("total_sales"), F.avg("amount").alia

In [21]:
print(response)


    Below is the expected output: 
                depStore  total_sales  average_sales
StoreA            3150         315.0
StoreB            1120         112.0
StoreC            1180         118.0
```

To fix the error, you need to remove the extra parentheses in the `agg` function. Here's the corrected code:

```python
from pyspark.sql import SparkSession, functions as F

# Load the data from a CSV file
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()
transactions = spark.read.option("header", "true").option("inferSchema", "true").csv("data/sample1.csv")

# Filter transactions to include only those where the amount is greater than 200
high_value_transactions = transactions.filter(F.col("amount") > 200)

# Group the transactions by store
grouped_by_store = high_value_transactions.groupBy("depStore")

# Calculate total and average sales per depStore
sales_summary = grouped_by_store.agg(F.sum("amount").alias("total_sales"), F.avg("amount").alias("average_sales"))

#

In [24]:
code = parse_python_code_from_text(response) # this did not run 
run_pyspark_code("""from pyspark.sql import SparkSession, functions as F

# Load the data from a CSV file
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()
transactions = spark.read.option("header", "true").option("inferSchema", "true").csv("data/sample1.csv")

# Filter transactions to include only those where the amount is greater than 200
high_value_transactions = transactions.filter(F.col("amount") > 200)

# Group the transactions by store
grouped_by_store = high_value_transactions.groupBy("depStore")

# Calculate total and average sales per depStore
sales_summary = grouped_by_store.agg(F.sum("amount").alias("total_sales"), F.avg("amount").alias("average_sales"))

# Store the summary in a CSV file
sales_summary.write.option("header", "true").csv("output/sales_summary", mode="overwrite")

# Optional: Just for demonstration, store filtered data to another directory
high_value_transactions.write.option("header", "true").csv("output/high_value_transactions", mode="overwrite")""")

No Python code block found.


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/06 03:27:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


(None, None)

In [None]:
import subprocess
import os

def run_pig_script(script_path, data_path):
    # Set up the environment variable to point to the directory containing the data file
    os.environ['PIG_DATA_PATH'] = data_path

    # Execute the Pig script using subprocess, assuming Pig is installed and configured to run in local mode
    result = subprocess.run(['pig', '-x', 'local', '-f', script_path], capture_output=True, text=True)
    
    # Check the results of the Pig script execution
    if result.returncode != 0:
        print("Error occurred during Pig script execution:")
        print(result.stderr)
    else:
        print("Pig script executed successfully. Output:")
        print(result.stdout)

# Define the path to the Pig script and the directory containing the data file
pig_script = './scripts/pig1.pig'
csv_data = './data/'

# Execute the Pig script
run_pig_script(pig_script, csv_data)


In [None]:
execute_python_code_from_text

In [None]:
def run_pyspark_code(code):
    """
    Executes PySpark code, returns either error message or result DataFrame.
    Assumes PySpark session and context are already set.
    """
    try:
        exec(code)
        return None, globals().get('sales_summary')  # Assuming 'sales_summary' is the result DataFrame
    except Exception as e:
        return str(e), None


In [None]:
# ===============================================================================================================
# parse Python code inside the code cell (TODO: How to ensure code is consistently inside the triple back ticks?) 
# How to loop the code so that it runs until correct code is written? 
# How does Langchain come into play? 
# When debugging: 1) provide loaded data head 2) code 3) error message or output if run was successful --> output updated code
# ===============================================================================================================

---
---
---

In [None]:
# # Load the model and tokenizer from the cache for use
# tokenizer = AutoTokenizer.from_pretrained('./model_cache/Meta-Llama-3-8B-Instruct')
# model = AutoModelForCausalLM.from_pretrained('./model_cache/Meta-Llama-3-8B-Instruct')

# # Setup the pipeline with local model and tokenizer
# text_generation = pipeline(
#     "text-generation",
#     model=model,
#     tokenizer=tokenizer,
#     device=0  # Assuming using the first GPU
# )

# # Generate text
# generated_text = text_generation("Sample prompt text goes here", max_length=50)
# print(generated_text)

In [None]:
# question = f"Re-write the following PIG code into PySpark code. Following is the PIG code: \n {pig_script_code}"
# template = f"""
# You are an intelligent software engineer and machine learning engineer. Re-write the following PIG code into PySpark code. Make sure to only share PySpark code so it's easy to copy and paste. 
# PIG code: {question}
# --------------------------------------------------------------------
# PySpark code:"""

# sequences = pipeline(
#     template,
#     max_length=5000,
#     do_sample=True,
#     top_k=10,
#     num_return_sequences=1,
#     eos_token_id=tokenizer.eos_token_id,
# )

# for seq in sequences:
#     print(f"Result: {seq['generated_text']}")

---
## Test PySpark Code

In [None]:
data.head()

In [None]:
def build_prompt_starter(): 
    """
    Generate prompt to start transcribing PIG to PySpark.
    """

def build_prompt_fix_error(): 
    """
    Generate prompt to fix error in PySpark code.
    """

def build_prompt_fix_output(): 
    """
    Generate prompt to fix code output mismatch (between result from PIG and PySpark). 
    """
    

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, avg


def run_pyspark_code(code_dir, code_name, sample_data_dir): 
    # start PySpark server
    # run code against test data

    if error: 
        return error_message
    else: 
        return result_df

def check_resutls(pig_result_df, pyspark_result_df): 
    # return True if the results are identical 
    return is_same 


# loop until both pyspark code runs fine and output data from PIG and PYSpark are the same
    




In [None]:
from pyspark.sql import SparkSession
from tabulate import tabulate

def build_prompt_starter(pig_code):
    """
    Generate prompt to start transcribing PIG to PySpark.
    Returns a string with the starter prompt.
    """
    return f"""
    I need to convert the following Apache PIG script into Apache PySpark code. 
    The PySpark code should perform the same tasks and produce identical outputs as the PIG code. 
    Please ensure that the PySpark code uses DataFrame operations wherever possible and include comments explaining any complex parts or transformations.
    
    PIG Code: 
    {pig_code}
    """






pig_output = tabulate(pig_output, headers='keys', tablefmt='psql', showindex="never")
pyspark_output = tabulate(pyspark_output, headers='keys', tablefmt='psql', showindex="never")


def build_prompt_fix_output(pig_output, pyspark_output):
    """
    Generate prompt to fix code output mismatch between results from PIG and PySpark.
    Returns a string with the prompt for fixing output mismatches.
    """
    return f"""
    The outputs between PIG and PySpark do not match. Please adjust the PySpark code so that the output from PySpark code is identical to that from PIG code. 
    Below are the two outputs with mismatch:

    Pig Output (ground truth): 
    {pig_output}

    
    PySpark Output: 
    {pyspark_output}
    """

def check_results(pig_result_df, pyspark_result_df):
    """
    Compare PIG and PySpark DataFrames to check if the results are identical.
    """
    return pig_result_df.subtract(pyspark_result_df).count() == 0 and pyspark_result_df.subtract(pig_result_df).count() == 0


# Set up PySpark
spark = SparkSession.builder.appName("Sales Summary").getOrCreate()

# Initial PySpark code generation using an LLM (not shown here)
pyspark_code = """
# Assume pyspark_code is filled with the initially generated code
"""
pig_result_df = spark.createDataFrame(...)  # Assume this is setup elsewhere

# Run and refine PySpark code
error_message, pyspark_result_df = run_pyspark_code(pyspark_code)
while error_message or not check_results(pig_result_df, pyspark_result_df):
    if error_message:
        prompt = build_prompt_fix_error(error_message)
    else:
        prompt = build_prompt_fix_output()
    # Here you would update `pyspark_code` based on the LLM's output (not shown here)
    error_message, pyspark_result_df = run_pyspark_code(pyspark_code)

# Results are now fine
print("PySpark code executed successfully and results match PIG.")


In [None]:
# subprocess to test out sample codes: 
# link: https://www.google.com/search?q=Python+application+which+run+PIG+code&rlz=1C1OPNX_enUS1108US1108&oq=Python+application+which+run+PIG+code+&gs_lcrp=EgZjaHJvbWUyBggAEEUYOTIHCAEQIRigATIHCAIQIRigATIHCAMQIRigAdIBCTEwNTIzajBqN6gCALACAA&sourceid=chrome&ie=UTF-8

# langchain 

# langsmith 

# streamlit 

# airflow 

# 