In [1]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

In [2]:
import os, json
import asyncio
import assemblyai as aai
import gradio as gr
from dotenv import load_dotenv
from queue import Queue
from llama_parse import LlamaParse
from llama_index.llms.together import TogetherLLM
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import (
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage
)
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Context,
    step,
    Workflow,
    Event,
    InputRequiredEvent,
    HumanResponseEvent
)
from llama_index.readers.whisper import WhisperReader
from llama_index.utils.workflow import draw_all_possible_flows

In [3]:
import nest_asyncio
nest_asyncio.apply()

load_dotenv()

True

### Voice Feedback Handler

In [4]:
class TranscriptionHandler:
    '''A class to handle transcription of audio files using AssemblyAI API.'''

    aai.settings.api_key = os.getenv('ASSEMBLY_API_KEY')
    transcriber = aai.Transcriber()

    def __init__(self):
        self.transcription_queue = Queue()
        self.interface = None 
    
    def transcribe_speech(self, file_path):
        '''Transcribe the speech in the given audio file.'''
        try:
            # Create a transcriber object.
            transcript = self.transcriber.transcribe(file_path)
            return transcript.text
        except Exception as e:
            raise Exception(f"Error during transcription: {str(e)}")
    
    def store_transcription(self, transcription):
        '''Store the transcription in a queue'''
        self.transcription_queue.append(transcription)
        return transcription
    
    def create_interface(self):
        '''Create a user interface for the transcription process.'''
        mic_transcribe = gr.Interface(
            fn=lambda x: self.store_transcription(self.transcribe_speech(x)),
            inputs=gr.Audio(sources="microphone", type="filepath"),
            outputs=gr.Textbox(label="Transcription")
        )
        self.interface = gr.Blocks()
        with self.interface:
            gr.TabbedInterface(
                [mic_transcribe],
                ["Transcribe Microphone"]
            )
        return self.interface

    async def get_transcription(self):
        self.interface = self.create_interface()
        self.interface.launch(
            share=False,
            server_port=8000, 
            prevent_thread_lock=True
        )

        # poll every 1.5 seconds waiting for something to end up in the queue
        attempt = 0
        while True:
            if not self.transcription_queue.empty():
                result = self.transcription_queue.get()
                if self.interface is not None:
                    self.interface.close()
                return result
            else:
                attempt += 1
            if attempt > 10:
                if self.interface is not None:
                    self.interface.close()
                print("No transcription received after 10 attempts. Exiting...")
                break
            await asyncio.sleep(1.5)




### Events

In [5]:
class ParseFormEvent(Event):
    application_form: str 

class GenerateQuestionEvent(Event):
    pass

class QueryEvent(Event):
    field: str 
    query: str

class ResponseEvent(Event):
    field: str 
    response: str

class FeedbackEvent(Event):
    feedback: str 

### Workflow

In [6]:
class JobPilotWorkflow(Workflow):

    storage_dir = '../storage'
    llm: TogetherLLM
    query_engine: VectorStoreIndex
    
    @step 
    async def setup(self, ctx: Context, ev: StartEvent) -> ParseFormEvent:

        if not ev.resume:
            raise ValueError("No resume file provided.") 
        if not ev.application_form:
            raise ValueError("No application form provided")
        
        self.llm = TogetherLLM(model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8")
        if os.path.exists(self.storage_dir):
            # read parsed resume from disk
            storage_context = StorageContext.from_defaults(persist_dir=self.storage_dir)
            index = load_index_from_storage(
                storage_context,
                embed_model=HuggingFaceEmbedding(
                    model_name="BAAI/bge-base-en-v1.5"
                )
            )
        else:
            # parse and load resume
            documents = LlamaParse(
                api_key=os.getenv('LLAMA_CLOUDE_API_KEY'),
                base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
                result_type="markdown",
                system_prompt="This is a resume/cv, gather related facts together and format it as bullet points with headers"
            ).load_data(ev.resume)
            # embed and index resume
            index = VectorStoreIndex.from_documents(
                documents,
                embed_model=HuggingFaceEmbedding(
                    model_name="BAAI/bge-base-en-v1.5"
                )
            )
            # store embed and indexed doc to disk
            index.storage_context.persist(persist_dir=self.storage_dir)
        
        # create a query engine
        self.query_engine = index.as_query_engine(llm=self.llm, similarity_top_k=5)
        return ParseFormEvent(application_form=ev.application_form)
    
    @step 
    async def parse_form(self, ctx: Context, ev: ParseFormEvent) -> GenerateQuestionEvent:
        # parse and load application form
        documents = LlamaParse(
            api_key=os.getenv('LLAMA_CLOUDE_API_KEY'),
            base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
            result_type="markdown",
            system_prompt="This is a job application form. Create a list of all the fields that need to be filled in.",
            system_prompt_append="Return a bulleted list of the fields ONLY."
        ).load_data(ev.application_form)
        result = documents[0]
        raw_json = self.llm.complete(
            f"This is a parsed form. Convert it into a JSON object containing only the list of fields to be filled in, in the form {{ fields: [...] }}. <form>{result.text}</form>. Return JSON ONLY, no markdown.")
        fields = json.loads(raw_json.text)["fields"]

        # set list of fields to fillout
        await ctx.set('fields', fields)

        return GenerateQuestionEvent()

    @step 
    async def generate_questions(self, ctx: Context, ev: GenerateQuestionEvent | FeedbackEvent) -> QueryEvent:

        # get list of fields to fillout
        fields = await ctx.get('fields')

        for field in fields:
            question = f"How would you answer this question about the candidate? <field>{field}</field>"

            # if incomming even is feedback
            if hasattr(ev,"feedback"):
                question += f"""
                    \nWe previously got feedback about how we answered the questions.
                    It might not be relevant to this particular field, but here it is:
                    <feedback>{ev.feedback}</feedback>
                """

            ctx.send_event(
                QueryEvent(
                    field=field,
                    query=question
                )
            )
         # store number of total fields
        await ctx.set('total_fields', len(fields))

        return

    @step 
    async def ask_question(self, cxt: Context, ev: QueryEvent) -> ResponseEvent:
        
        response = self.query_engine.query(f"This is a question about the specific resume/cv we have in our database: {ev.query}")
        return ResponseEvent(field=ev.field, response=response.response)
    
    @step 
    async def fill_application(self, ctx: Context, ev: ResponseEvent) -> InputRequiredEvent | StopEvent:
        
        total_fields = await ctx.get('total_fields')
        responses = ctx.collect_events(ev, [ResponseEvent] * total_fields)
        if not responses:
            return None 
        
        # all responses list
        responses_list = '\n'.join(f'Field: {r.field} \nResponse: {r.response}' for r in responses)
        result = self.llm.complete(f"""
            You are given a list of fields in an application form and responses to
            questions about those fields from a resume. Combine the two into a list of
            fields and succinct, factual answers to fill in those fields.

            <responses>
            {responses_list}
            </responses>
        """)

        # save the result form
        await ctx.set('filled_form', str(result))

        # let's get a human in the loop
        return InputRequiredEvent(
            prefix="How does this look? Give me any feedback you have on any of the answers.",
            result=result
        ) 
    
    @step 
    async def get_feedback(self, ctx: Context, ev: HumanResponseEvent) -> FeedbackEvent | StopEvent:
        
        result = self.llm.complete(f"""
            You have received some human feedback on the form-filling task you've done.
            Does everything look good, or is there more work to be done?
            <feedback>
            {ev.response}
            </feedback>
            If everything is fine, respond with just the word 'OKAY'.
            If there's any other feedback, respond with just the word 'FEEDBACK'.
        """)

        verdict = result.text.strip()

        print(f"LLM says the verdict was {verdict}")
        if (verdict == "OKAY"):
            return StopEvent(result=await ctx.get("filled_form"))
        else:
            return FeedbackEvent(feedback=ev.response)


In [None]:
jobpilot = JobPilotWorkflow(timeout=600, verbose=False)
handler = jobpilot.run(
    resume='../data/fake_resume.pdf',
    application_form='../data/fake_application_form.pdf'
)

async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        print("We've filled in your form! Here are the results:\n")
        print(event.result)
        # Get transcription
        transcription_handler = TranscriptionHandler()
        feedback = await transcription_handler.get_transcription()
        if not feedback:
            feedback = input(event.prefix)
        handler.ctx.send_event(
            HumanResponseEvent(
                response=feedback
            )
        )

response = await handler
print(response)

Started parsing the file under job_id 5a1bb358-8a16-475b-ba5e-cdc831fee2eb
We've filled in your form! Here are the results:

Here is the list of fields with succinct, factual answers:

1. **First Name**: Sarah
2. **Last Name**: Chen
3. **Email**: sarah.chen@email.com
4. **Phone**: Not listed
5. **Linkedin**: linkedin.com/in/sarahchen
6. **Project Portfolio**: The candidate has showcased two significant projects: EcoTrack (a full-stack carbon footprint tracking application) and ChatFlow (a real-time chat application with end-to-end encryption).
7. **Degree**: Bachelor of Science in Computer Science with a minor in User Experience Design from the University of California, Berkeley
8. **Graduation Date**: 2017
9. **Current Job Title**: Senior Full Stack Developer
10. **Current Employer**: TechFlow Solutions
11. **Technical Skills**: Proficient in React.js, Redux, Next.js, TypeScript, Node.js, Express.js, GraphQL, PostgreSQL, MongoDB, Docker, Kubernetes, AWS, and more.
12. **Describe why y

### Draw Workflow Diagram

In [None]:
from IPython.display import display, HTML, DisplayHandle

def extract_html_content(filename):
    try:
        with open(filename, 'r') as file:
            html_content = file.read()
            html_content = f""" <div style="width: 100%; height: 800px; overflow: hidden;"> {html_content} </div>"""
            return html_content
    except Exception as e:
        raise Exception(f"Error reading file: {str(e)}")
    
draw_all_possible_flows(JobPilotWorkflow, filename='./jobpilot-workflow.html')

./jobpilot-workflow.html


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [None]:
html_content = extract_html_content('./jobpilot-workflow.html')
display(HTML(html_content), metadata=dict(isolated=True))