## Extracting CSI data from .pcap files

The code below is used to convert .pcap file in .csv file

In [1]:
print('Olá mundo!')

Olá mundo!


In [2]:
import numpy as np
import pandas as pd
from csiread import Nexmon

# ========= CONFIGURAÇÕES =========
pcap_file = "/media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/csi_test.pcap"
csv_file  = "csi_output2.csv"
chipset   = "43455c0"   # Raspberry Pi 4 (BCM43455c0)
bw        = 40          # mesma usada na captura

# ========= LEITURA =========
print(f"Lendo arquivo: {pcap_file} ...")
reader = Nexmon(pcap_file, chip=chipset, bw=bw)
reader.read()

csi = reader.csi   # shape esperado: (n_pacotes, n_subcarriers)

print(f"Shape do CSI: {csi.shape}")

# ========= PROCESSAMENTO =========
amplitude = np.abs(csi)
phase = np.angle(csi)

# cada linha = 1 pacote, colunas = subportadoras
df_amp = pd.DataFrame(amplitude)
df_phase = pd.DataFrame(phase)

# renomear colunas
df_amp.columns   = [f"subcarrier_{i}"       for i in range(df_amp.shape[1])]
df_phase.columns = [f"subcarrier_{i}_phase" for i in range(df_phase.shape[1])]

# Concatenar amplitude + fase
df_out = pd.concat([df_amp, df_phase], axis=1)

# ========= EXPORTAÇÃO =========
df_out.to_csv(csv_file, index=False)
print(f"Arquivo CSV gerado: {csv_file}")


Lendo arquivo: /media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/csi_test.pcap ...
Shape do CSI: (16543, 128)
16543 packets parsed
Arquivo CSV gerado: csi_output2.csv


### Script usando `csiread` com `pandas` (com leitura em blocos para não explodir memória)

In [2]:

import numpy as np
import pandas as pd
from csiread import Nexmon

# ========= CONFIGURAÇÕES =========
pcap_file = "/media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/cap1_rpi1.pcap"   # entrada
csv_file  = "csi_output_csiread.csv"  # saída
chipset   = "43455c0"            # Raspberry Pi 4 (BCM43455c0)
bw        = 40                   # largura de banda (20/40 MHz)

# ========= LEITURA =========
print(f"[csiread] Lendo arquivo: {pcap_file}")
reader = Nexmon(pcap_file, chip=chipset, bw=bw)

# Primeiro, leia todo o arquivo para obter o número total de pacotes
reader.read()
total_packets = len(reader.csi)
print(f"[csiread] Total de pacotes: {total_packets}")

# Para evitar explodir a RAM, leia em blocos
batch_size = 1000
all_batches = []

for i in range(0, total_packets, batch_size):
    end_idx = min(i + batch_size, total_packets)
    print(f"[csiread] Processando pacotes {i} a {end_idx-1}")
    
    # Extrair dados do lote atual
    csi_batch = reader.csi[i:end_idx]
    
    # Verificar se há dados válidos
    if csi_batch.size == 0:
        continue
        
    # Processar amplitude e fase
    if len(csi_batch.shape) == 3:  # (pacotes, subcarriers, antenas)
        amplitude = np.abs(csi_batch[:, :, 0])   # antena 0
        phase = np.angle(csi_batch[:, :, 0])
    else:  # (pacotes, subcarriers)
        amplitude = np.abs(csi_batch)
        phase = np.angle(csi_batch)

    # Criar DataFrames para o lote
    df_amp = pd.DataFrame(amplitude, columns=[f"subcarrier_{j}" for j in range(amplitude.shape[1])])
    df_phase = pd.DataFrame(phase, columns=[f"subcarrier_{j}_phase" for j in range(phase.shape[1])])

    all_batches.append(pd.concat([df_amp, df_phase], axis=1))

# Concatenar todos os lotes
if all_batches:
    df_out = pd.concat(all_batches, ignore_index=True)
    print(f"[csiread] Shape final: {df_out.shape}")
    
    # ========= EXPORTAÇÃO =========
    df_out.to_csv(csv_file, index=False)
    print(f"[csiread] CSV gerado: {csv_file}")
else:
    print("[csiread] Erro: Nenhum dado foi processado")


[csiread] Lendo arquivo: /media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/cap1_rpi1.pcap
5549 packets parsed
[csiread] Total de pacotes: 5549
[csiread] Processando pacotes 0 a 999
[csiread] Processando pacotes 1000 a 1999
[csiread] Processando pacotes 2000 a 2999
[csiread] Processando pacotes 3000 a 3999
[csiread] Processando pacotes 4000 a 4999
[csiread] Processando pacotes 5000 a 5548
[csiread] Shape final: (5549, 256)
[csiread] CSV gerado: csi_output_csiread.csv


### Script usando `csiread` com `numpy`

In [3]:
import numpy as np
from csiread import Nexmon
import csv

# ========= CONFIGURAÇÕES =========
pcap_file = "/media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/cap1_rpi1.pcap"   # entrada
csv_file  = "csi_output_csiread2.csv"  # saída
chipset   = "43455c0"            # Raspberry Pi 4 (BCM43455c0)
bw        = 40                   # largura de banda (20/40 MHz)

# ========= LEITURA =========
print(f"[csiread] Lendo arquivo: {pcap_file}")
reader = Nexmon(pcap_file, chip=chipset, bw=bw)

# Primeiro, leia todo o arquivo para obter o número total de pacotes
reader.read()
total_packets = len(reader.csi)
print(f"[csiread] Total de pacotes: {total_packets}")

# Para evitar explodir a RAM, leia em blocos
batch_size = 1000
all_data = []

for i in range(0, total_packets, batch_size):
    end_idx = min(i + batch_size, total_packets)
    print(f"[csiread] Processando pacotes {i} a {end_idx-1}")
    
    # Extrair dados do lote atual
    csi_batch = reader.csi[i:end_idx]
    
    # Verificar se há dados válidos
    if csi_batch.size == 0:
        continue
        
    # Processar amplitude e fase usando apenas numpy
    if len(csi_batch.shape) == 3:  # (pacotes, subcarriers, antenas)
        amplitude = np.abs(csi_batch[:, :, 0])   # antena 0
        phase = np.angle(csi_batch[:, :, 0])
    else:  # (pacotes, subcarriers)
        amplitude = np.abs(csi_batch)
        phase = np.angle(csi_batch)

    # Concatenar amplitude + fase horizontalmente usando numpy
    batch_data = np.hstack([amplitude, phase])
    all_data.append(batch_data)

# Concatenar todos os lotes verticalmente usando numpy
if all_data:
    final_data = np.vstack(all_data)
    print(f"[csiread] Shape final: {final_data.shape}")
    
    # ========= EXPORTAÇÃO =========
    # Criar nomes das colunas
    n_subcarriers = final_data.shape[1] // 2
    columns = [f"subcarrier_{i}" for i in range(n_subcarriers)] + [f"subcarrier_{i}_phase" for i in range(n_subcarriers)]
    
    # Exportar para CSV usando csv nativo (mais rápido que pandas)
    with open(csv_file, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(columns)  # Cabeçalho
        writer.writerows(final_data)  # Dados
    
    print(f"[csiread] CSV gerado: {csv_file}")
    print(f"[csiread] Dados salvos: {final_data.shape[0]} pacotes x {final_data.shape[1]} colunas")
else:
    print("[csiread] Erro: Nenhum dado foi processado")


[csiread] Lendo arquivo: /media/gioliadmin/DataBase/Programas_ML_BD/SBSeg2025/Rasp/cap1_rpi1.pcap
[csiread] Total de pacotes: 5549
[csiread] Processando pacotes 0 a 999
5549 packets parsed
[csiread] Processando pacotes 1000 a 1999
[csiread] Processando pacotes 2000 a 2999
[csiread] Processando pacotes 3000 a 3999
[csiread] Processando pacotes 4000 a 4999
[csiread] Processando pacotes 5000 a 5548
[csiread] Shape final: (5549, 256)
[csiread] CSV gerado: csi_output_csiread2.csv
[csiread] Dados salvos: 5549 pacotes x 256 colunas
