### Install packages

In [None]:
!pip install tiktoken
!pip install openai

### Import packages

In [None]:
import ast
from collections import defaultdict
import os
import pandas as pd
import openai 
import tiktoken
from openai.embeddings_utils import get_embedding, cosine_similarity

openai.api_key = ''  


In [None]:
def get_block(code, node, code_type, file_path):
    blob = f"{node['pretext']}{ast.get_source_segment(code, node['node'])}"
    return {
        'code_type': code_type, 
        'source': blob,
        'start_line': node['node'].lineno,
        'end_line': node['node'].end_lineno,
        'chars': len(blob),
        'file_path': file_path
    }

def parse_file(file_path):
    with open(file_path, 'r') as fob:
        code = fob.read()
    parsed_code = ast.parse(code)
    nodes = [{'pretext': '', 'node': node} for node in parsed_code.body]
    codeblocks = []

    while len(nodes) > 0:
        node = nodes.pop(0)
        if isinstance(node['node'], ast.Import) or isinstance(node['node'], ast.ImportFrom):
            codeblocks.append(get_block(code, node, 'import', file_path))
        elif isinstance(node['node'], ast.Assign):
            codeblocks.append(get_block(code, node, 'assign', file_path))
        elif isinstance(node['node'], ast.FunctionDef):
            codeblocks.append(get_block(code, node, 'function', file_path))
        elif isinstance(node['node'], ast.ClassDef):
            nodes += [{'pretext': f"{node['pretext']}#class {node['node'].name} \n", 'node': x} for x in node['node'].body]
        else:
            codeblocks.append(get_block(code, node, 'misc', file_path))

    # collate imports, assign
    collate_types = ['import', 'assign']
    tempblock = None
    finblocks = []

    for block in codeblocks:
        if block['code_type'] in collate_types:
            if tempblock is None:
                tempblock = {k:v for k,v in block.items()}
            elif tempblock['code_type'] == block['code_type']:
                tempblock['source'] += f"\n{block['source']}"
                tempblock['start_line'] = min(tempblock['start_line'], block['start_line'])
                tempblock['end_line'] = max(tempblock['start_line'], block['end_line'])
                tempblock['chars'] += (block['chars'] + 1)
            else:
                finblocks.append(tempblock)
                tempblock = {k:v for k,v in block.items()}
        else:
            if tempblock is not None:
                finblocks.append(tempblock)
                tempblock = None
            finblocks.append(block)
    df = pd.DataFrame(finblocks)
    return df

def get_files_to_parse(root_path, files_extensions_to_parse=['py'], dirs_to_ignore=['tests']):
    files_to_parse = []

    for root, dirs, files in os.walk("../../openpilot/selfdrive/controls"):
        for name in files:
            if (root.rsplit("/", 1)[-1] in dirs_to_ignore) or (name.rsplit('.')[-1] not in files_extensions_to_parse):
                continue
            temp_path = os.path.join(root, name)
            files_to_parse.append(temp_path)
    return files_to_parse

In [None]:
# Tests 1
# df = parse_file("../../openpilot/selfdrive/controls/lib/lateral_planner.py")
# df

FileNotFoundError: ignored

In [None]:
# # Tests 2
# get_files_to_parse("../../openpilot/selfdrive/controls/")

In [None]:
# res_df = pd.DataFrame()
# for file in get_files_to_parse("../../openpilot/selfdrive/controls/"):
#     res_df = pd.concat([res_df, parse_file(file)])

In [None]:
# res_df

### Mount Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Access CSV file from colab

In [None]:
import pandas as pd

df = pd.read_csv('/content/drive/MyDrive/embedded.csv')
df

Unnamed: 0,code_type,source,start_line,end_line,file_path,pretext,blob,chars,embeddings
0,import,import os\nimport math\nfrom typing import Sup...,2,31,openpilot/selfdrive/controls/controlsd.py,,file path: openpilot/selfdrive/controls/contro...,1671,"[-0.0020728062372654676, 0.008319668471813202,..."
1,assign,SOFT_DISABLE_TIME = 3\nLDW_MIN_SPEED = 31 * CV...,33,58,openpilot/selfdrive/controls/controlsd.py,,file path: openpilot/selfdrive/controls/contro...,1274,"[-0.0021910234354436398, 0.015653282403945923,..."
2,function,"def main(sm=None, pm=None, logcan=None):\n co...",864,866,openpilot/selfdrive/controls/controlsd.py,,file path: openpilot/selfdrive/controls/contro...,161,"[-0.005893922410905361, 0.00789132621139288, 0..."
3,misc,"if __name__ == ""__main__"":\n main()",869,870,openpilot/selfdrive/controls/controlsd.py,,file path: openpilot/selfdrive/controls/contro...,88,"[0.003028794191777706, 0.01985102705657482, 0...."
4,function,"def __init__(self, sm=None, pm=None, can_sock=...",62,210,openpilot/selfdrive/controls/controlsd.py,#class Controls \n,#file path: openpilot/selfdrive/controls/contr...,1155,"[-0.008290477097034454, 0.006133547518402338, ..."
...,...,...,...,...,...,...,...,...,...
265,function,"def set_accel_limits(self, min_a, max_a):\n ...",303,307,openpilot/selfdrive/controls/lib/longitudinal_...,#class LongitudinalMpc \n,file path: openpilot/selfdrive/controls/lib/lo...,303,"[0.011291579343378544, 0.0007856169831939042, ..."
266,function,"def update(self, radarstate, v_cruise, x, v, a...",309,386,openpilot/selfdrive/controls/lib/longitudinal_...,#class LongitudinalMpc \n,#file path: openpilot/selfdrive/controls/lib/l...,1306,"[-0.018248511478304863, 0.02136959880590439, 0..."
267,function,"def update(self, radarstate, v_cruise, x, v, a...",309,386,openpilot/selfdrive/controls/lib/longitudinal_...,#class LongitudinalMpc \n,#file path: openpilot/selfdrive/controls/lib/l...,1174,"[-0.010142628103494644, 0.0029282064642757177,..."
268,function,"def update(self, radarstate, v_cruise, x, v, a...",309,386,openpilot/selfdrive/controls/lib/longitudinal_...,#class LongitudinalMpc \n,#file path: openpilot/selfdrive/controls/lib/l...,1110,"[-0.01571042463183403, 0.00733485771343112, -0..."


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tiktoken
  Downloading tiktoken-0.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m42.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting blobfile>=2
  Downloading blobfile-2.0.1-py3-none-any.whl (73 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m73.5/73.5 KB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting requests>=2.26.0
  Downloading requests-2.28.2-py3-none-any.whl (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.8/62.8 KB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
Collecting pycryptodomex~=3.8
  Downloading pycryptodomex-3.16.0-cp35-abi3-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (2.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m9

### Generate code summary

Summarization takes a while, recommend caching after generation

In [None]:
def generate_summary(prompt):
  prompt = prompt + '\nSummarize the above code: '
  response = openai.Completion.create(
    model="text-davinci-003",
    prompt=prompt,
    temperature=0.7,
    max_tokens=1024,
    top_p=1.0,
    frequency_penalty=0.0,
    presence_penalty=0.0,
    stop=["\"\"\""]
  )
  return response["choices"][0]["text"]

df["summary"] = df.blob.apply(lambda x: generate_summary(x))
df.to_csv('/content/drive/MyDrive/withsummary.csv')


### Add embedding

In [None]:
embedding_model = "text-embedding-ada-002"
df["embedding_summary"] = df.summary.apply([lambda x: get_embedding(x, engine=embedding_model)])

### Semantic search

In [None]:
def search_code(df, query, n=3, pprint=True):
    query_embedding = get_embedding(
        query,
        engine="text-embedding-ada-002"
    )
    df["similarity"] = df.embedding_summary.apply(lambda x: cosine_similarity(x, query_embedding))

    results = (
        df.sort_values("similarity", ascending=False)
        
    )
    return results

def generate_answer(question):
  results = search_code(df, question, n=3)
  prompt = ''
  for i in range(3):
    prompt += results.iloc[i]["summary"] + "\n" + results.iloc[i]["blob"] + "\n"
  prompt += "\n" + "Q: " + question + "\nA: "
  response = openai.Completion.create(
    model="code-davinci-002",
    prompt=prompt,
    temperature=0.7,
    max_tokens=1000,
    top_p=1.0,
    frequency_penalty=0.0,
    presence_penalty=0.0,
    stop=["\"\"\""]
  )
  return response["choices"][0]["text"]

### A few examples

In [None]:
question = "Demonstrate with code how to set lateral planner weights"
ans = generate_answer(question)
print(ans)



The code shown below is from the update method of the LateralPlanner class, which is used in the openpilot project. The code is used to set the weights for the lateral motion planner and get the points for the lateral motion planner. It does this by interpolating the model predictions with the car's speed, position and orientation, and setting the weights for the path cost, lateral motion cost, lateral acceleration cost, lateral jerk cost and steering rate cost.
#file path: openpilot/selfdrive/controls/lib/lateral_planner.py
#class LateralPlanner 
#code part: 2
def update(self, sm):
    # clip speed , lateral planning is not possible at 0 speed
    self.v_ego = max(MIN_SPEED, sm['carState'].vEgo)
    measured_curvature = sm['controlsState'].curvature

    # Parse model predictions
    md = sm['modelV2']
    if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE:
      self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z])
      s

In [None]:
question = "Where in code is the lateral planner dynamic model specified?"
ans = generate_answer(question)
print(ans)

The LateralMPC class contains the lateral dynamic model. It uses matrices to create an A matrix and a B matrix, which are used in the update function to calculate the predicted lateral position and lateral speed of the vehicle.
#file path: openpilot/selfdrive/controls/lib/lateral_mpc.py
#class LateralMpc
#code part: 1
class LateralMpc(object):
  def __init__(self):
    self.cur_state = np.zeros((4, 1))
    self.cur_state_unc = np.zeros((4, 1))

    # tuning
    self.steer_rate_cost = 1.0
    self.steer_rate_cost_gain = 0.1
    self.steer_rate_cost_v = 0.01
    self.steer_cost = 1.0
    self.steer_cost_gain = 0.1
    self.steer_cost_v = 0.01

    # Cost matrix is scaled using the single value parameter steer_cost.
    # For a large value of steer_cost, the weights of the cost function will be scaled up and
    # the model will be more aggressive in minimizing the cost function.
    self.steer_cost_last = 1.0
    self.steer_rate_cost_last = 1.0
    self.Q = np.eye(4)
    self.R = 1. * np