<a href="https://colab.research.google.com/github/tony-wade/Reverse-Engineering/blob/main/Extract_logic_analyser's_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Recommandation
A better logic analyzer (higher sampling rate and fine software) can greatly help your work.

In [None]:
import os
import re
import csv
import pandas as pd
import openpyxl
# deque適合只操作首末元素時用
# from collections import deque

In [None]:
def to_byte(bin_data):
    """
    Convert binary data to hexadecimal format.(不成8的末位會補0)
    ['0','0'...] or ['0...',..] --> ['0x??',..] -> ['??',..]
    """
    bin_str = ''.join(list(bin_data))
    bits_list = [bin_str[i:i+8].ljust(8, '0') for i in range(0, len(bin_str), 8)]
    return [hex(int(bits, 2))[2:].zfill(2) for bits in bits_list]


def convert_to_binary(row_data, pkt_len=None, mode=None):
    """
    Convert data to binary form
    mode = input type: 'SPI', 'BIN'. Turn hex to binary if not given

    """
    if mode == 'SPI':
        # decimal: ['1',... -> [0b1,..-> ['00000001',...], MSB first
        int_data = [int(number) for number in row_data if number]
        return [bin(num)[2:].zfill(8) for num in int_data]

    elif mode == 'BIN' and row_data in ['3', '2']:
        # Extract numbers(len). Returns re.Match object if found, else None.
        search = re.match(r'(\d+)', pkt_len) if pkt_len else ValueError('pkt_len is not given')
        # Filter noise.
        if search:
            pkt_length = int(search.group(1))
            return generate_bit(row_data) if pkt_length >= 200 else None

    else:
        # ['0a',..] -> ['0000...',...] -> ['0','0',...]
        int_data = [int(hex_num, 16) for hex_num in row_data if pd.notna(hex_num)]
        binary_list = [bin(num)[2:].zfill(8) for num in int_data]
        return [char for binary_string in binary_list for char in binary_string]

## CSVs 2 excel

In [None]:
def read_csv(file_path, mode):
    with open(file_path, 'r') as file:
        csv_reader = csv.reader(file, delimiter=',')
        next(csv_reader)  # 跳過標題行
        if mode == 'SPI':
            # 取出偶數行中的Decimal data，並去除空字符串或無效的數字字符串
            return [
                ([x for x in row[3:] if x.strip() and x.strip().isdigit()], None)
                for row_idx, row in enumerate(csv_reader)
                if row_idx % 2 == 0
            ]
        elif mode == 'BIN':
            # 提取Data, pkt_len，並去除空字符串或無效的數字字符串
            return [
                (row[3].strip(), row[4].strip())
                for row in csv_reader
                if row[3].strip().isdigit() and row[4].strip().isdigit()
            ]


def read_all_csv_files(folder_path, mode):
    all_datas = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            datas = read_csv(file_path, mode)
            all_datas.extend(datas)   # 將元素加到末尾(不含[])
    return all_datas


def sequence_to_set(sequences):
    if sequences is None:
        return None, None

    # 資料與長度各設為一個set
    length_set = {len(seq) for seq in sequences}
    sequence_set = set(sequences)

    return sequence_set,  length_set


def generate_bit(Data):
    # 資料判讀
    if Data == '3':
        return '1'
    elif Data == '2':
        return '0'
    else:
        return None


def matching(sub_q, data_set, len_set, worksheet):
    """
    Match subsequence with data set.

    Args:
        sub_q (list): The subsequence to match.
        data_set (set): The set to compare with.
        len_set (list): List of lengths to check for matching.
        worksheet (list): List to append matched sequences.
        bin_list (list): List to append binary sequences.

    Returns:
        None
    """
    if data_set is None:
        raise ValueError('Sequence is not given')

    elif len(sub_q) >= min(len_set):
        for length in len_set:
            sub_seq = ''.join(sub_q[-length:])
            if sub_seq in data_set:
                data_q = sub_q[:-length]
                worksheet.append(to_byte(data_q))
                sub_q.clear()


def process_data(datas, mode, start_end_seq=None):
    sub_queue = []

    # 以set查找
    seq_set, seq_len = sequence_to_set(start_end_seq)

    # 啟用Excel
    workbook = openpyxl.Workbook()
    worksheet = workbook.active

    if start_end_seq:
        for data, pkt_len in datas:
            bin_datas = convert_to_binary(data, pkt_len, mode)
            for data in bin_datas:
                sub_queue.extend(data)
                matching(sub_queue,
                         seq_set,
                         seq_len,
                         worksheet)
            sub_queue.clear()

    else:
        if mode=='SPI':
            for data, pkt_len in datas:
                # SPI下的data為一list   有誤
                hex_datas = [
                    hex(int(dec_data.strip()))[2:].zfill(2)
                    for dec_data in data
                ]
                worksheet.append(hex_datas)
        elif mode=='BIN':
            for data, pkt_len in datas:
                bin_datas = convert_to_binary(data, pkt_len, mode)
                worksheet.append(to_byte(bin_datas))
        else:
            raise ValueError('Invalid mode')


    # 儲存 Excel, 注意不成行序列
    workbook.save('output.xlsx')

In [None]:
# 讀取CSV,輸出excel
input_folder = '.'
data = read_all_csv_files(input_folder, 'SPI')
process_data(datas=data, mode='SPI')

## Excel extraction

In [None]:
def num_unique_values(col):
    """
    Returns the number of unique non-space values in a pd.Series or DataFrame column.
    """
    return col.nunique(axis=1)


def extract_binary_row_data(row_data, filter=None, param=None):
    """
    Extract desired sequences from binary row data based on different filters and parameters.

    Parameters:
    row_data (list): List of binary data to be processed.
    filter (str, optional): The type of filter to apply ('target_str', 'excess_bits', 'analyze_protocol'). Default is None.
    param (optional): Parameter for the chosen filter. If filter is 'target_str', it should be a string.
                      If filter is 'excess_bits' or 'analyze_protocol', it should be a tuple (length of sequence, [undesired bit positions in the sequence]).

    Returns:
    list: A list of processed data according to the specified filter and parameter.
    """
    # concate pd.series to a str
    valid_data = [str(data) for data in row_data if data]
    data_str = ''.join(valid_data)

    if filter is None:
        return [to_byte(data_str)]

    elif param is None:
        raise ValueError('param is not given')

    elif filter == 'target_str':
        # 以開頭做區分
        #if data_str.startswith(param):
        #   return [to_byte(data_str)]
        # 找特定行
        return [to_byte(data_str)] if param in data_str else None
        # 只輸出指定序列後的資料
        #pattern = re.compile(f'{param}(.*?)(?={param}|$)')
        #data_list = pattern.findall(data_str)  # 削去指定序列後留下'',沒找到會=[]
        #return [to_byte(data) for data in data_list if data]

    elif filter == 'excess_bits':
        # 去除多餘的bits,不影響排序
        length, remove_positions = param
        data_chunks = [data_str[i:i+length] for i in range(0, len(data_str), length)]
        filtered_chunks = [
            ''.join(char for i, char in enumerate(chunk) if i not in remove_positions)
            for chunk in data_chunks
        ]
        return [to_byte(''.join(filtered_chunks))]

    elif filter == 'analyze_protocol':
        # 去除多餘的bits, 按照指定長度排序
        length, remove_positions = param
        data_chunks = [data_str[i:i+length] for i in range(0, len(data_str), length)]
        filtered_chunks = [
            ''.join(char for i, char in enumerate(chunk) if i not in remove_positions)
            for chunk in data_chunks
        ]
        return [[filtered_str[:4]] + to_byte(filtered_str[4:]) for filtered_str in filtered_chunks]


    else:
        raise ValueError('Invalid filter')



def extract_from_xlsx(input_folder, output_file, mode=None, filter=None, param=None):
  """
  Extract row data from an input Excel file to an output Excel file.
  """
  for filename in os.listdir(input_folder):
      if filename.endswith(".xlsx"):
          file_path = os.path.join(input_folder, filename)
          print("Opening file:", file_path)

          # Read the Excel file into a DataFrame with binary form
          df = pd.read_excel(file_path, header=None, dtype=str)
          df = pd.DataFrame(convert_to_binary(row) for _, row in df.iterrows())

          # clear same data if needed
          if mode == 'clear_same_column':
              unique_counts = df.nunique()
              cols_to_keep = unique_counts[unique_counts > 1].index
              df = df[cols_to_keep]
              df.columns = range(df.shape[1])

          # Extract binary data from each row
          extracted_hex_data =[]
          for _, row in df.iterrows():
              extracted_bin_data = extract_binary_row_data(row, filter, param)
              # print(extracted_bin_data) if extracted_bin_data else None
              extracted_hex_data.extend(extracted_bin_data) if extracted_bin_data else None


          # Create a new DataFrame with the extracted data
          df_extracted = pd.DataFrame(extracted_hex_data)

          # Save the new DataFrame to an Excel file
          df_extracted.to_excel(output_file, index=False, header=False)


In [None]:
# 要去除的binary通訊協定之stop-start
excess_position = (41,[0,5,40])

# 目標序列，篩出之後的資料
target_sequence = '0000000000000000'

# 指定檔案位置，初始為當下位置
input_folder = '.'

In [None]:
# 篩出excel資料
extract_from_xlsx(input_folder=input_folder,
                  output_file='258_0000.xlsx',
                  #mode='clear_same_column',
                  filter='target_str',
                  param=target_sequence
                  )

Opening file: ./258pure.xlsx


# 新增功能

In [None]:
# 臨時要讀新的LA, 並分析protocol
# 設格式為 (time, scl, sda)
def read_csv(file_path):
    with open(file_path, 'r') as file:
        csv_reader = csv.reader(file, delimiter=',')
        next(csv_reader)  # 跳過標題行
        next(csv_reader)  # 跳過0s
        return [
                (float(row[0].strip()), row[2].strip(), row[1].strip())
                for row in csv_reader
                if len(row) >= 3 and row[2].strip().isdigit() and row[1].strip().isdigit()
            ]

def read_all_csv_files(folder_path):
    all_datas = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            datas = read_csv(file_path)
            all_datas.extend(datas)   # 將元素加到末尾(不含[])
    return all_datas


# CSV to protocol form
def process_data(datas):
    bin_datas = []
    pre_scl = None
    prev_time = 0
    worksheet = []

    for (time, scl, sda) in datas:
        if scl == '1' and time - prev_time >= 0.0000007 and pre_scl != '1':
            # 一旦有值更改就會被輸出，因此以週期作區分且設為正緣觸發
            if time - prev_time > 0.005:
                # 長於5ms則視為不同輸入
                bin_str = ''.join(bin_datas)
                byte_data = extract_binary_row_data(bin_str, 'analyze_protocol', (41,[0,5,40]))
                worksheet.extend(byte_data)
                bin_datas = []  # 清空 bin_datas
            bin_datas.extend(sda)
            prev_time = time

        pre_scl = scl  # 使中央stop-start影響降至1 bit



    bin_str = ''.join(bin_datas)
    byte_data = extract_binary_row_data(bin_str, 'analyze_protocol', (41,[0,5,40]))
    worksheet.extend(byte_data)
    workbook = pd.DataFrame(worksheet)
    workbook.to_excel('老三258_test.xlsx', index=False, header=False)



input_folder ='.'

process_data(read_all_csv_files(input_folder))

In [None]:
#  臨時直接以首行做分類用
def seperate_from_firstline(input_folder, output_file):
  """
  Extract row data from an input Excel file to an output Excel file.
  """
  for filename in os.listdir(input_folder):
      if filename.endswith(".xlsx"):
          file_path = os.path.join(input_folder, filename)
          print("Opening file:", file_path)

          df = pd.read_excel(file_path, header=None, dtype=str)

          extracted_hex_data =[]
          for _, row in df.iterrows():
              if row[0] not in ['0111', '0011', '0110']:   # 'ee'...etc
                    extracted_hex_data.append([data for data in row if data])


          # Create a new DataFrame with the extracted data
          df_extracted = pd.DataFrame(extracted_hex_data)

          # Save the new DataFrame to an Excel file
          df_extracted.to_excel(output_file, index=False, header=False)


seperate_from_firstline(input_folder=input_folder,
                  output_file='277_else.xlsx',
                  )

Opening file: ./老三277_test.xlsx


In [None]:
# 臨時要讀另一個LA的輸出至bin
# 輸入格式為 (id, time, data)
def read_csv(file_path):
    with open(file_path, 'r') as file:
        csv_reader = csv.reader(file, delimiter=',')
        next(csv_reader)  # 跳過標題行
        next(csv_reader)  # 跳過0s
        return [
                float(row[2].strip()) for row in csv_reader
                if len(row) == 3
            ]

def read_all_csv_files(folder_path):
    all_datas = []
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.csv'):
            file_path = os.path.join(folder_path, file_name)
            datas = read_csv(file_path)
            all_datas.extend(datas)   # 將元素加到末尾(不含[])
    return all_datas


# CSV to protocol form
def process_data(datas):
    bin_datas = []
    pre_scl = None
    prev_time = 0
    worksheet = []

    for (time, scl, sda) in datas:
        if scl == '1' and time - prev_time >= 0.0000007 and pre_scl != '1':
            # 一旦有值更改就會被輸出，因此以週期作區分且設為正緣觸發
            if time - prev_time > 0.005:
                # 長於5ms則視為不同輸入
                bin_str = ''.join(bin_datas)
                byte_data = extract_binary_row_data(bin_str, 'analyze_protocol', (41,[0,5,40]))
                worksheet.extend(byte_data)
                bin_datas = []  # 清空 bin_datas
            bin_datas.extend(sda)
            prev_time = time

        pre_scl = scl  # 使中央stop-start影響降至1 bit



    bin_str = ''.join(bin_datas)
    byte_data = extract_binary_row_data(bin_str, 'analyze_protocol', (41,[0,5,40]))
    worksheet.extend(byte_data)
    workbook = pd.DataFrame(worksheet)
    workbook.to_excel('老三258_test.xlsx', index=False, header=False)



input_folder ='.'

process_data(read_all_csv_files(input_folder))

## MCU燒錄淺談
能重覆燒錄的分為EEPROM或Flash


1.   Flash: 電訊號即可清寫,方便
2.   EEPROM(非快閃): 需用強紫外線清除後才能寫入

flah在寫入時會需要輸入一對hex code作為起始燒錄的鑰匙,

    ex. AA55, A5F1,...
    (以instruction為準,因aa/55在電訊號中較不易因noise而產生=少見)

今mcu為二線燒錄: clock, data (即不含gnd, vcc), 且訊號起始條件為clock-low/data-high。

UART為獨立單向線路做溝通且以事先設定之Baud Rate為通訊速度, 不含clock (或是傳a,5,f,0這類來做auto baud rate)

JTAG為4線燒錄; SPI則有3 or 4線

寫入MCU的C code會轉換成.hex等檔案, 格式如intel hex等。著重在表示燒錄進mcu的各位置,以code運行次序作為排列基準。

而轉換成.bin後，則照次序assembly寫入，並且之中還會插入一些資料。

**注意各logic analyser對於 start-stop在輸出資料的呈現**



### 部分8051系列MCU的燒錄準則

*   Silicon Labs: AN127 - C2 Interface



```
# LSB first
注意: 各指令間的stop-start波型會被分析儀判定為0b01/0b00(可由手冊及分析儀資料推知)，
且data write後與data read等待回傳時會有wait插入, 會是00...01


0.   純CLK操作(略)
1.   addr write: 0b11 0x02  
              (FPCTL register)
2.   data write: 0b01 0b00 0x02    <->    10 00 01000000 (LSB first)
           ''   ''  0x04
           ''   ''  0x01
           (ins.)(len.)(flash write enable)

3.   addr write: 0b11 0xB4
              (FPDAT register,因型號而異)
4.   data write: 0b01 0b00 0x07
           (ins.)(len.)(PI:block write, block size=code length <= 256, 1 block=多個page)

InBusy:
5.   addr read: 0b10 "mcu 回傳 8 bits", 重複確認直到倒數第二位=0
OutReady:
6.   addr read: 0b10 "mcu 回傳 8 bits", 重複確認直到末位=1

7.   data read: 0b00 0b00 "回傳1byte", 直到=0x0D


8.   data write: 0b01 0b00 0x__(code高位)
9.   InBusy
10.   data write: 0b01 0b00 0x__(code低位)
11.   InBusy

12.   data write: 0b01 0b00 0x__(code length, 00=1 byte)
13.   InBusy

14.   data write: 0b01 0b00 0x__(assembly code)
15.   InBusy
     (重複14.15逐byte寫入直到寫完)

16.   OutReady + step 7.
```




*   SMBus:

屬於類I2C protocol的通訊方式，Silicon Labs下的晶片能以此做二線通信。 Flash燒錄上則是開啟特定register位置後使用MOVX指令做寫入，但不同晶片間開啟差異大。



```
# MSB first
Ack是由接收端發送的訊號

R/W : S |　slave addr　｜ R/W | Ack | Data | Ack | P     or
 bits: 1     8      1    1    8   1    1

                 ...    | Data | Ack | Data | Ack | ....




*   Atmel(microchip)-AT89LP:

 據CTO所言, SPI能藉由自定義reset方式來將燒錄時的線路降至2 (SDA,SCL)。



```
# MSB first
少數型號不須'55',同時不支援 Write with auto earse

Program enable: AA 55 AC 53 '53'
                 由slave回傳

Write code page:  AA 55 50  addr(h) addr(l) data_bytes
            _ _ '70'  _    _     _
              with auto earse
```





*   TK18 - I2C

無詳細communication datasheet, 直接與燒錄訊號,反組譯資料做比對




```
與燒錄按鍵時刻做比對，燒錄起始為5a a5

flash register addr.應是 0x40, program instruction write 則為 0x50
(I2C:7 bits addr.+ write = A0 ACK 05 ACK)

似乎固定為page write,一次寫 1 page共 128 bytes

訊號顯示是 write[0x40]: 00 F8 0A
      stop 長間隔 start = 0b010
      write[0x40]: 05 datas...
```

