## Install

In [49]:
%pip install -q py7zr
%pip install -q xmltodict

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


## Imports

In [2]:
import os
import zipfile
import pandas
import py7zr
import glob
import xmltodict
import json

## Data acquisition

## Pre processing

In [3]:
from functools import reduce
from unicodedata import normalize
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

nltk.download('stopwords')
nltk.download('wordnet')

stop_words = set(stopwords.words('portuguese'))
word_net_lemmatizer = WordNetLemmatizer()

def remove_accent_diacritic(text: str):
  return normalize('NFKD', text)
    
def remove_special_char(text: str):
  def internal():
    for character in text:
      if character.isalnum() or character.isspace():
        yield character
  return ''.join(internal())

def remove_stop_words(text: str):
  words = word_tokenize(text)
  return list(filter(lambda x: x not in stop_words, words))

def lemmatize_text(text: str):
  return ' '.join(map(word_net_lemmatizer.lemmatize, text))

steps = [
  str.lower,
  remove_accent_diacritic,
  remove_special_char,
  remove_stop_words,
  lemmatize_text,
]

def process_description(text):
  return reduce(lambda result, func: func(result), steps, text)

def pre_processing(df: pandas.DataFrame):
  total_rows = len(df)

  df.drop_duplicates(inplace=True)
  # Remove the lines where the quantity purchased has decimal places, which indicates that the product was not manufactured in an industry, but is sold in bulk.
  df = df[df['quantity'] % 1 == 0]
  
  df.info() 

  unique_rows = len(df)
  removed_rows = total_rows - unique_rows
  print(f'Total rows: {total_rows}')
  print(f'Unique rows: {unique_rows}')
  print(f'Removed rows: {removed_rows}')
  print(f'Removed rows percentage: {round(removed_rows / total_rows * 100, 2)}%')
  return df

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\teichx\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\teichx\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


### Data to classify

In [4]:
def get_class_items():
  folder_path = '../data/raw'
  processed_files = 0
  for file_name in os.listdir(folder_path):
    if not file_name.endswith('.zip'):
      continue
    zip_path = os.path.join(folder_path, file_name)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
      file_items_name = f'{file_name.removesuffix('.zip')}_NotaFiscalItem.csv'
      with zip_ref.open(file_items_name) as file:
        file.seek(0)
        data_frame = pandas.read_csv(file, delimiter=';', encoding='latin1')
        data_frame.rename(columns={
          'CHAVE DE ACESSO': 'access_key',
          'DATA EMISSÃO': 'emission_date',
          'CPF/CNPJ Emitente': 'emission_owner',
          'NÚMERO PRODUTO': 'product_index',
          'DESCRIÇÃO DO PRODUTO/SERVIÇO': 'description',
          'CÓDIGO NCM/SH': 'ncm',
          'NCM/SH (TIPO DE PRODUTO)': 'ncm_description',
          'CFOP': 'cfop',
          'QUANTIDADE': 'quantity',
          'UNIDADE': 'unit_kind',
          'VALOR UNITÁRIO': 'unitary_value',
          'VALOR TOTAL': 'total_value',
        }, inplace=True)
        selected_fields = data_frame \
          .loc[:, [
            'access_key',
            'emission_date',
            'emission_owner',
            'product_index',
            'description',
            'ncm',
            'ncm_description',
            'cfop',
            'quantity',
            'unit_kind',
            'unitary_value',
            'total_value',
          ]]
        selected_fields['quantity'] = selected_fields['quantity'].str.replace(',', '.')
        selected_fields['total_value'] = selected_fields['total_value'].str.replace(',', '.')
        selected_fields['unitary_value'] = selected_fields['unitary_value'].str.replace(',', '.')
        selected_fields['processed_description'] = selected_fields['description'].apply(process_description)

        yield selected_fields \
          .astype({
            'access_key': 'str',
            'emission_date': 'datetime64[ms]',
            'emission_owner': 'str',
            'product_index': 'int32',
            'description': 'str',
            'processed_description': 'str',
            'ncm': 'str',
            'ncm_description': 'str',
            'cfop': 'str',
            'quantity': 'float64',
            'unit_kind': 'str',
            'unitary_value': 'float64',
            'total_value': 'float64',
          })
        processed_files += 1
        print(f'Processed files {processed_files}')


def get_class_combined():
  combined_path = '../data/combined.parquet.br'
  if os.path.exists(combined_path):
    print('Reading cache combined data')
    return pandas.read_parquet(combined_path)
  combined_data = pandas.concat(get_class_items(), ignore_index=True)
  print('Start pre-processing')
  processed_data = pre_processing(combined_data)
  print('End pre-processing')
  print('Start compression')
  processed_data.to_parquet(combined_path, index=False, compression='brotli')
  print('End compression')
  return processed_data

data_frame = get_class_combined()

Reading cache combined data


#### Analyze dataset

In [5]:
table = dict(map(lambda x: (str(x), {'unique': 0}), range(0, 100)))
for table_item in data_frame['access_key'].unique():
  table[table_item[0:2]]['unique'] += 1
  
table = dict(filter(lambda x: x[1]['unique'] > 0, table.items()))
for key in table.keys():
  current_state = data_frame.where(data_frame['access_key'].str.startswith(key))
  total_value = current_state['total_value']
  table[key] |= {
    'total': total_value.count(),
    'total_value': round(total_value.sum(), 2),
  }

In [6]:
# Acquired from 'https://servicodados.ibge.gov.br/api/v1/localidades/estados' 2024-10-27T10:24:00-03:00
with open('../data/br_states.json', encoding='utf8') as file:
  states_json = json.loads(file.read())
  state_codes = dict(map(lambda x: (str(x['id']), x), states_json))

In [7]:
meta_table_rows = list(table.items())
meta_table_rows.sort(key=lambda x: x[1]['total'], reverse=True)
meta_data_table = pandas.DataFrame(
  data=map(lambda x: {
    'UF': state_codes.get(x[0])['sigla'],
    'State': state_codes.get(x[0])['nome'],
    'Unique invoices': x[1]['unique'],
    'Product rows': x[1]['total'],
    'Total amount': x[1]['total_value'],
  }, meta_table_rows), 
)
print(meta_data_table)

    UF                State  Unique invoices  Product rows  Total amount
0   SP            São Paulo           958658       4347276  9.930479e+10
1   RJ       Rio de Janeiro           753427       2428270  4.477481e+10
2   RS    Rio Grande do Sul           402972       1871396  5.318799e+09
3   MG         Minas Gerais           331109       1222405  1.789578e+10
4   DF     Distrito Federal           376974       1036455  1.515608e+10
5   PR               Paraná           257581        920613  8.389014e+09
6   SC       Santa Catarina           213615        553848  5.367721e+09
7   PE           Pernambuco           141205        477757  6.922940e+09
8   MS   Mato Grosso do Sul            88916        414219  9.358505e+08
9   BA                Bahia           107064        413975  1.460084e+09
10  PA                 Pará            97942        391397  9.696880e+08
11  AM             Amazonas            96810        384924  1.309225e+09
12  RN  Rio Grande do Norte            94481       

### Classes data

In [8]:
def get_class_items():
  folder_path = '../data/tce_rs'
  processed_files = 0
  for file_name in os.listdir(folder_path):
    if not file_name.endswith('.xml.7z'):
      continue
    zip_path = os.path.join(folder_path, file_name)
    
    with py7zr.SevenZipFile(zip_path, mode='r') as zip_ref:
      zip_ref.extractall(folder_path)
      
  for invoice_xml in glob.glob(f'{folder_path}/**/*.xml'):
    processed_files += 1
    with open(invoice_xml, 'r', encoding='utf-8') as file:
      data = xmltodict.parse(file.read())
      products = data.get('nfeProc', {}).get('NFe', {}).get('infNFe', {}).get('det', [])
      product_list = products \
        if isinstance(products, list) \
        else [products]
      
      if not product_list:
        continue
        
      access_key = data['nfeProc']['protNFe']['infProt']['chNFe']
      cnpj = data['nfeProc']['NFe']['infNFe']['emit']['CNPJ']
      emission_date = pandas.to_datetime(data['nfeProc']['NFe']['infNFe']['ide']['dhEmi']) \
        .tz_convert(None)
      for product in product_list:
        ean = product['prod']['cEAN']
        if not ean or ean == 'SEM GTIN':
          continue
        
        description = product['prod']['xProd']
        yield {
          'access_key': access_key,
          'emission_date': emission_date,
          'emission_owner': cnpj,
          'product_index': int(product['@nItem']),
          'description': description,
          'ean': ean,
          'ncm': product['prod']['NCM'],
          'cfop': product['prod']['CFOP'],
          'quantity': float(product['prod']['qCom']),
          'unit_kind': product['prod']['uCom'],
          'unitary_value': float(product['prod']['vUnCom']),
          'total_value': float(product['prod']['vProd']),
          'processed_description': process_description(description),
        }
        
    if processed_files % 100 == 0:
      print(f'Processed files: {processed_files}')

def get_class_combined():
  class_path = '../data/class.parquet.br'
  if os.path.exists(class_path):
    print('Reading cache combined data')
    return pandas.read_parquet(class_path)

  class_data = pandas.DataFrame(get_class_items()) \
    .astype({
      'access_key': 'str',
      'emission_date': 'datetime64[ms]',
      'emission_owner': 'str',
      'product_index': 'int32',
      'description': 'str',
      'ncm': 'str',
      'ean': 'str',
      'cfop': 'str',
      'quantity': 'float64',
      'unit_kind': 'str',
      'unitary_value': 'float64',
      'total_value': 'float64',
    })
  print('Start pre-processing')
  processed_data = pre_processing(class_data)
  print('End pre-processing')
  print('Start compression')
  processed_data.to_parquet(class_path, index=False, compression='brotli')
  print('End compression')
  return processed_data

class_data = get_class_combined()

Reading cache combined data


In [43]:
descriptions_dict = dict()
for row in class_data.itertuples():
  if not descriptions_dict.get(row.processed_description):
    descriptions_dict[row.processed_description] = set()
    
  item = frozenset([
    ('ncm', row.ncm),
    ('ean', row.ean),
  ])
  descriptions_dict[row.processed_description].add(item)
  
for key, value in descriptions_dict.items():
  descriptions_dict[key] = list(map(dict, value))
  
with open('../data/ean_list.json', 'w') as file:
  file.write(json.dumps(descriptions_dict, indent=2))