<a href="https://colab.research.google.com/github/veronezicarlos/Consumo_Api/blob/main/rfid_script_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
     import time
     import csv
     from collections import Counter
     import random
     import string

     # Arquivo para armazenar dados
     arquivo_csv = 'rfid_dados.csv'

     # Função para validar UID (ex.: deve ter pelo menos 8 caracteres hex)
     def validar_uid(uid_hex):
         return len(uid_hex) >= 8

     # Lista para armazenar UIDs lidos
     uids_lidos = []

     # Função simulada para gerar UIDs fictícios (em vez de ler tags reais)
     def gerar_uid_ficticio():
         # Gera um UID hex aleatório de 8-16 caracteres
         length = random.randint(8, 16)
         return ''.join(random.choices(string.hexdigits.lower(), k=length))

     def simular_leitura():
         uid_hex = gerar_uid_ficticio()
         timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
         valido = validar_uid(uid_hex)

         # Escreve no CSV
         with open(arquivo_csv, mode='a', newline='') as file:
             writer = csv.writer(file)
             if file.tell() == 0:
                 writer.writerow(['Timestamp', 'UID', 'Valido'])
             writer.writerow([timestamp, uid_hex, valido])

         # Adiciona à lista para análise
         uids_lidos.append(uid_hex)

         print(f"Tag simulada: UID={uid_hex}, Válido={valido}")
         return True

     def main():
         print("Simulando leituras RFID... (10 leituras fictícias)")
         for _ in range(10):  # Simula 10 leituras
             simular_leitura()
             time.sleep(0.5)  # Pequena pausa para simular tempo real

         # Análise simples após simulação
         contador = Counter(uids_lidos)
         print("\nAnálise de dados:")
         print(f"Total de leituras: {len(uids_lidos)}")
         print(f"UIDs únicos: {len(contador)}")
         print("Top UIDs:")
         for uid, count in contador.most_common(5):
             print(f"  UID {uid}: {count} vezes")

     if __name__ == '__main__':
         main()


Simulando leituras RFID... (10 leituras fictícias)
Tag simulada: UID=01c1bdcc, Válido=True
Tag simulada: UID=dedad41da740af, Válido=True
Tag simulada: UID=3aa5bafbdc3, Válido=True
Tag simulada: UID=e7a77a738aa2, Válido=True
Tag simulada: UID=64bcd689cb, Válido=True
Tag simulada: UID=f5e51096a, Válido=True
Tag simulada: UID=3c199b0d2bb7, Válido=True
Tag simulada: UID=e54bbc7cf5a2, Válido=True
Tag simulada: UID=bcb1d30f77604b, Válido=True
Tag simulada: UID=71228ce85, Válido=True

Análise de dados:
Total de leituras: 10
UIDs únicos: 10
Top UIDs:
  UID 01c1bdcc: 1 vezes
  UID dedad41da740af: 1 vezes
  UID 3aa5bafbdc3: 1 vezes
  UID e7a77a738aa2: 1 vezes
  UID 64bcd689cb: 1 vezes


In [None]:
import time
import csv
import random
import string
import os
import logging
import argparse
import sys
from collections import Counter

# Tentativa de importar bibliotecas para RFID real (opcional)
try:
    import serial  # Para pyserial (leitores seriais)
    SERIAL_AVAILABLE = True
except ImportError:
    SERIAL_AVAILABLE = False
    print("Aviso: pyserial não instalado. Modo 'real' não funcionará. Instale com: pip install pyserial")

try:
    import nfc  # Para nfcpy (leitores NFC)
    NFC_AVAILABLE = True
except ImportError:
    NFC_AVAILABLE = False
    print("Aviso: nfcpy não instalado. Modo 'nfc' não funcionará. Instale com: pip install nfcpy")

try:
    import pandas as pd  # Para análise avançada
    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False
    print("Aviso: pandas não instalado. Usando Counter como fallback. Instale com: pip install pandas")

try:
    import matplotlib.pyplot as plt  # Para gráficos de análise
    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False
    print("Aviso: matplotlib não instalado. Gráficos não estarão disponíveis. Instale com: pip install matplotlib")

# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Arquivo para armazenar dados
arquivo_csv = 'rfid_dados.csv'

class RFIDReader:
    """
    Classe para gerenciar leituras RFID, validação e análise de dados.
    Suporta modos simulação, serial (pyserial) e NFC (nfcpy).
    """

    def __init__(self, arquivo_csv='rfid_dados.csv'):
        self.arquivo_csv = arquivo_csv
        self.uids_lidos = []
        self._inicializar_csv()

    def _inicializar_csv(self):
        """Inicializa o CSV com cabeçalho se não existir."""
        if not os.path.exists(self.arquivo_csv):
            try:
                with open(self.arquivo_csv, mode='w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['Timestamp', 'UID', 'Valido'])
            except IOError as e:
                logging.error(f"Erro ao inicializar CSV: {e}")
                raise

    def validar_uid(self, uid_hex):
        """
        Valida UID: deve ser hex puro e ter comprimento exato (8 ou 12 chars para 4-6 bytes comuns).
        """
        if not all(c in string.hexdigits.lower() for c in uid_hex):
            return False
        return len(uid_hex) in [8, 12]  # Exemplos comuns; ajuste conforme necessário

    def gerar_uid_ficticio(self):
        """Gera um UID fictício para simulação."""
        length = random.choice([8, 12])
        return ''.join(random.choices(string.hexdigits.lower(), k=length))

    def ler_uid_serial(self, porta='/dev/ttyUSB0', baudrate=9600):
        """Lê UID via serial (pyserial)."""
        if not SERIAL_AVAILABLE:
            raise ImportError("pyserial não disponível para leitura serial.")
        try:
            with serial.Serial(porta, baudrate, timeout=1) as ser:
                data = ser.readline().decode('utf-8').strip()
                return data.lower() if data else None
        except serial.SerialException as e:
            logging.error(f"Erro na leitura serial: {e}")
            return None

    def ler_uid_nfc(self):
        """Lê UID via NFC (nfcpy)."""
        if not NFC_AVAILABLE:
            raise ImportError("nfcpy não disponível para leitura NFC.")
        try:
            clf = nfc.ContactlessFrontend('usb')  # Ajuste conforme dispositivo
            tag = clf.connect(rdwr={'on-connect': lambda tag: False})
            uid = tag.identifier.hex() if tag else None
            clf.close()
            return uid.lower() if uid else None
        except Exception as e:
            logging.error(f"Erro na leitura NFC: {e}")
            return None

    def simular_leitura(self, modo='simulacao', porta='/dev/ttyUSB0', baudrate=9600):
        """Simula ou realiza uma leitura RFID."""
        if modo == 'simulacao':
            uid_hex = self.gerar_uid_ficticio()
        elif modo == 'serial':
            uid_hex = self.ler_uid_serial(porta, baudrate)
            if uid_hex is None:
                logging.warning("Nenhuma tag detectada via serial.")
                return False
        elif modo == 'nfc':
            uid_hex = self.ler_uid_nfc()
            if uid_hex is None:
                logging.warning("Nenhuma tag detectada via NFC.")
                return False
        else:
            raise ValueError("Modo inválido: use 'simulacao', 'serial' ou 'nfc'")

        timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
        valido = self.validar_uid(uid_hex)

        # Tratamento de erros na escrita do CSV
        try:
            with open(self.arquivo_csv, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([timestamp, uid_hex, valido])
        except IOError as e:
            logging.error(f"Erro ao escrever no CSV: {e}")
            return False

        self.uids_lidos.append(uid_hex)
        logging.info(f"Tag lida: UID={uid_hex}, Válido={valido}")
        return True

    def analisar_dados(self, gerar_grafico=False):
        """Analisa os dados lidos e opcionalmente gera um gráfico."""
        if not self.uids_lidos:
            logging.warning("Nenhuma leitura para analisar.")
            return

        if PANDAS_AVAILABLE:
            df = pd.DataFrame({'UID': self.uids_lidos})
            total_leituras = len(df)
            uids_unicos = df['UID'].nunique()
            top_uids = df['UID'].value_counts().head(5).to_dict()
        else:
            contador = Counter(self.uids_lidos)
            total_leituras = len(self.uids_lidos)
            uids_unicos = len(contador)
            top_uids = dict(contador.most_common(5))

        logging.info(f"Análise de dados: Total de leituras: {total_leituras}, UIDs únicos: {uids_unicos}")
        logging.info("Top UIDs:")
        for uid, count in top_uids.items():
            logging.info(f"  UID {uid}: {count} vezes")

        # Gerar gráfico se solicitado e matplotlib disponível
        if gerar_grafico and MATPLOTLIB_AVAILABLE:
            plt.figure(figsize=(8, 5))
            plt.bar(list(top_uids.keys()), list(top_uids.values()))
            plt.xlabel('UID')
            plt.ylabel('Frequência')
            plt.title('Top UIDs Lidos')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig('rfid_analise_grafico.png')
            logging.info("Gráfico salvo como 'rfid_analise_grafico.png'")
            plt.show()

        # Exportar análise para o CSV
        try:
            with open(self.arquivo_csv, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([])  # Linha em branco
                writer.writerow(['Análise'])
                writer.writerow(['Total Leituras', total_leituras])
                writer.writerow(['UIDs Únicos', uids_unicos])
                writer.writerow(['Top UIDs'])
                for uid, count in top_uids.items():
                    writer.writerow([uid, count])
        except IOError as e:
            logging.error(f"Erro ao exportar análise para CSV: {e}")

def main():
    parser = argparse.ArgumentParser(description="Simulador/Leitor de RFID")
    parser.add_argument('--num-leituras', type=int, default=10, help='Número de leituras (padrão: 10)')
    parser.add_argument('--pausa', type=float, default=0.5, help='Pausa entre leituras em segundos (padrão: 0.5)')
    parser.add_argument('--modo', choices=['simulacao', 'serial', 'nfc'], default='simulacao', help='Modo: simulacao, serial ou nfc (padrão: simulacao)')
    parser.add_argument('--porta', type=str, default='/dev/ttyUSB0', help='Porta serial para modo serial (padrão: /dev/ttyUSB0)')
    parser.add_argument('--baudrate', type=int, default=9600, help='Baudrate para modo serial (padrão: 9600)')
    parser.add_argument('--seed', type=int, default=None, help='Seed para random (para reprodutibilidade em simulação)')
    parser.add_argument('--limpar-csv', action='store_true', help='Limpar o CSV antes de iniciar')
    parser.add_argument('--grafico', action='store_true', help='Gerar gráfico de análise (requer matplotlib)')

    # Compatibilidade com Jupyter/Colab
    if 'ipykernel' in sys.modules:
        args = parser.parse_args([])
    else:
        args = parser.parse_args(sys.argv[1:])

    # Inicializar reader
    reader = RFIDReader()

    if args.limpar_csv:
        try:
            os.remove(reader.arquivo_csv)
            reader._inicializar_csv()
            logging.info("CSV limpo e reinicializado.")
        except OSError as e:
            logging.error(f"Erro ao limpar CSV: {e}")

    if args.seed is not None and args.modo == 'simulacao':
        random.seed(args.seed)
        logging.info(f"Seed definida para reprodutibilidade: {args.seed}")

    logging.info(f"Iniciando leituras RFID... ({args.num_leituras} leituras no modo {args.modo})")
    for _ in range(args.num_leituras):
        reader.simular_leitura(args.modo, args.porta, args.baudrate)
        time.sleep(args.pausa)

    reader.analisar_dados(gerar_grafico=args.grafico)

if __name__ == '__main__':
    main()


In [None]:
import time
import csv
import random
import string
import os
import logging
import argparse
import sys
import sqlite3  # Módulo nativo do Python para SQLite
from collections import Counter

# Tentativa de importar bibliotecas para RFID real (opcional)
try:
    import serial  # Para pyserial (leitores seriais)
    SERIAL_AVAILABLE = True
except ImportError:
    SERIAL_AVAILABLE = False
    print("Aviso: pyserial não instalado. Modo 'real' não funcionará. Instale com: pip install pyserial")

try:
    import nfc  # Para nfcpy (leitores NFC)
    NFC_AVAILABLE = True
except ImportError:
    NFC_AVAILABLE = False
    print("Aviso: nfcpy não instalado. Modo 'nfc' não funcionará. Instale com: pip install nfcpy")

try:
    import pandas as pd  # Para análise avançada
    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False
    print("Aviso: pandas não instalado. Usando Counter como fallback. Instale com: pip install pandas")

try:
    import matplotlib.pyplot as plt  # Para gráficos de análise
    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False
    print("Aviso: matplotlib não instalado. Gráficos não estarão disponíveis. Instale com: pip install matplotlib")

# Configuração de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DatabaseManager:
    """
    Gerenciador de banco de dados usando sqlite3 nativo.
    """

    def __init__(self, db_path='rfid_dados.db'):
        self.db_path = db_path
        self._criar_tabela()

    def _criar_tabela(self):
        """Cria a tabela leituras_rfid se não existir."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS leituras_rfid (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    uid TEXT NOT NULL,
                    valido BOOLEAN NOT NULL
                )
            ''')
            conn.commit()
        except sqlite3.Error as e:
            logging.error(f"Erro ao criar tabela: {e}")
            raise
        finally:
            conn.close()

    def inserir_leitura(self, timestamp, uid, valido):
        """Insere uma leitura no BD."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('INSERT INTO leituras_rfid (timestamp, uid, valido) VALUES (?, ?, ?)', (timestamp, uid, valido))
            conn.commit()
            logging.info(f"Leitura inserida no BD: UID={uid}")
        except sqlite3.Error as e:
            logging.error(f"Erro ao inserir no BD: {e}")
            raise
        finally:
            conn.close()

    def obter_leituras(self):
        """Retorna todas as leituras como lista de dicionários."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('SELECT timestamp, uid, valido FROM leituras_rfid')
            rows = cursor.fetchall()
            return [{'timestamp': row[0], 'uid': row[1], 'valido': row[2]} for row in rows]
        except sqlite3.Error as e:
            logging.error(f"Erro ao consultar BD: {e}")
            return []
        finally:
            conn.close()

    def analisar_dados_bd(self):
        """Analisa dados diretamente do BD usando SQL."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            # Total de leituras
            cursor.execute('SELECT COUNT(*) FROM leituras_rfid')
            total_leituras = cursor.fetchone()[0]

            # UIDs únicos
            cursor.execute('SELECT COUNT(DISTINCT uid) FROM leituras_rfid')
            uids_unicos = cursor.fetchone()[0]

            # Top UIDs
            cursor.execute('''
                SELECT uid, COUNT(*) as count
                FROM leituras_rfid
                GROUP BY uid
                ORDER BY count DESC
                LIMIT 5
            ''')
            top_uids = {row[0]: row[1] for row in cursor.fetchall()}

            return total_leituras, uids_unicos, top_uids
        except sqlite3.Error as e:
            logging.error(f"Erro na análise do BD: {e}")
            return 0, 0, {}
        finally:
            conn.close()

class RFIDReader:
    """
    Classe para gerenciar leituras RFID, validação e análise de dados.
    Suporta modos simulação, serial (pyserial) e NFC (nfcpy).
    """

    def __init__(self, arquivo_csv='rfid_dados.csv', db_manager=None):
        self.arquivo_csv = arquivo_csv
        self.db_manager = db_manager
        self.uids_lidos = []  # Mantém para fallback se BD falhar
        if not self.db_manager:
            self._inicializar_csv()

    def _inicializar_csv(self):
        """Inicializa o CSV com cabeçalho se não existir."""
        if not os.path.exists(self.arquivo_csv):
            try:
                with open(self.arquivo_csv, mode='w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['Timestamp', 'UID', 'Valido'])
            except IOError as e:
                logging.error(f"Erro ao inicializar CSV: {e}")
                raise

    def validar_uid(self, uid_hex):
        """
        Valida UID: deve ser hex puro e ter comprimento exato (8 ou 12 chars para 4-6 bytes comuns).
        """
        if not all(c in string.hexdigits.lower() for c in uid_hex):
            return False
        return len(uid_hex) in [8, 12]  # Exemplos comuns; ajuste conforme necessário

    def gerar_uid_ficticio(self):
        """Gera um UID fictício para simulação."""
        length = random.choice([8, 12])
        return ''.join(random.choices(string.hexdigits.lower(), k=length))

    def ler_uid_serial(self, porta='/dev/ttyUSB0', baudrate=9600):
        """Lê UID via serial (pyserial)."""
        if not SERIAL_AVAILABLE:
            raise ImportError("pyserial não disponível para leitura serial.")
        try:
            with serial.Serial(porta, baudrate, timeout=1) as ser:
                data = ser.readline().decode('utf-8').strip()
                return data.lower() if data else None
        except serial.SerialException as e:
            logging.error(f"Erro na leitura serial: {e}")
            return None

    def ler_uid_nfc(self):
        """Lê UID via NFC (nfcpy)."""
        if not NFC_AVAILABLE:
            raise ImportError("nfcpy não disponível para leitura NFC.")
        try:
            clf = nfc.ContactlessFrontend('usb')  # Ajuste conforme dispositivo
            tag = clf.connect(rdwr={'on-connect': lambda tag: False})
            uid = tag.identifier.hex() if tag else None
            clf.close()
            return uid.lower() if uid else None
        except Exception as e:
            logging.error(f"Erro na leitura NFC: {e}")
            return None

    def simular_leitura(self, modo='simulacao', porta='/dev/ttyUSB0', baudrate=9600):
        """Simula ou realiza uma leitura RFID."""
        if modo == 'simulacao':
            uid_hex = self.gerar_uid_ficticio()
        elif modo == 'serial':
            uid_hex = self.ler_uid_serial(porta, baudrate)
            if uid_hex is None:
                logging.warning("Nenhuma tag detectada via serial.")
                return False
        elif modo == 'nfc':
            uid_hex = self.ler_uid_nfc()
            if uid_hex is None:
                logging.warning("Nenhuma tag detectada via NFC.")
                return False
        else:
            raise ValueError("Modo inválido: use 'simulacao', 'serial' ou 'nfc'")

        timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
        valido = self.validar_uid(uid_hex)

        # Tenta inserir no BD; se falhar, usa CSV como fallback
        if self.db_manager:
            try:
                self.db_manager.inserir_leitura(timestamp, uid_hex, valido)
            except Exception:
                logging.warning("Falha no BD; usando CSV como fallback.")
                self._salvar_csv(timestamp, uid_hex, valido)
        else:
            self._salvar_csv(timestamp, uid_hex, valido)

        self.uids_lidos.append(uid_hex)
        logging.info(f"Tag lida: UID={uid_hex}, Válido={valido}")
        return True

    def _salvar_csv(self, timestamp, uid_hex, valido):
        """Salva no CSV (fallback)."""
        try:
            with open(self.arquivo_csv, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([timestamp, uid_hex, valido])
        except IOError as e:
            logging.error(f"Erro ao escrever no CSV: {e}")

    def analisar_dados(self, gerar_grafico=False):
        """Analisa os dados (do BD ou lista)."""
        if self.db_manager:
            total_leituras, uids_unicos, top_uids = self.db_manager.analisar_dados_bd()
        else:
            # Fallback para análise local (como antes)
            if not self.uids_lidos:
                logging.warning("Nenhuma leitura para analisar.")
                return

            if PANDAS_AVAILABLE:
                df = pd.DataFrame({'UID': self.uids_lidos})
                total_leituras = len(df)
                uids_unicos = df['UID'].nunique()
                top_uids = df['UID'].value_counts().head(5).to_dict()
            else:
                contador = Counter(self.uids_lidos)
                total_leituras = len(self.uids_lidos)
                uids_unicos = len(contador)
                top_uids = dict(contador.most_common(5))

        logging.info(f"Análise de dados: Total de leituras: {total_leituras}, UIDs únicos: {uids_unicos}")
        logging.info("Top UIDs:")
        for uid, count in top_uids.items():
            logging.info(f"  UID {uid}: {count} vezes")

        # Gerar gráfico se solicitado
        if gerar_grafico and MATPLOTLIB_AVAILABLE:
            plt.figure(figsize=(8, 5))
            plt.bar(list(top_uids.keys()), list(top_uids.values()))
            plt.xlabel('UID')
            plt.ylabel('Frequência')
            plt.title('Top UIDs Lidos')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig('rfid_analise_grafico.png')
            logging.info("Gráfico salvo como 'rfid_analise_grafico.png'")
            plt.show()

def main():
    parser = argparse.ArgumentParser(description="Simulador/Leitor de RFID com SQLite")
    parser.add_argument('--num-leituras', type=int, default=10, help='Número de leituras (padrão: 10)')
    parser.add_argument('--pausa', type=float, default=0.5, help='Pausa entre leituras em segundos (padrão: 0.5)')
    parser.add_argument('--modo', choices=['simulacao', 'serial', 'nfc'], default='simulacao', help='Modo: simulacao, serial ou nfc (padrão: simulacao)')
    parser.add_argument('--porta', type=str, default='/dev/ttyUSB0', help='Porta serial para modo serial (padrão: /dev/ttyUSB0)')
    parser.add_argument('--baudrate', type=int, default=9600, help='Baudrate para modo serial (padrão: 9600)')
    parser.add_argument('--seed', type=int, default=None, help='Seed para random (para reprodutibilidade em simulação)')
    parser.add_argument('--db-path', type=str, default='rfid_dados.db', help='Caminho para o arquivo BD SQLite (padrão: rfid_dados.db)')
    parser.add_argument('--usar-bd', action='store_true', help='Usar BD em vez de CSV')
    parser.add_argument('--grafico', action='store_true', help='Gerar gráfico de análise (requer matplotlib)')

    # Compatibilidade com Jupyter/Colab
    if 'ipykernel' in sys.modules:
        args = parser.parse_args([])
    else:
        args = parser.parse_args(sys.argv[1:])

    # Inicializar BD se solicitado
    db_manager = None
    if args.usar_bd:
        try:
            db_manager = DatabaseManager(args.db_path)
            logging.info(f"BD SQLite inicializado em {args.db_path}")
        except Exception as e:
            logging.error(f"Erro ao inicializar BD: {e}. Usando CSV como fallback.")

    # Inicializar reader
    reader = RFIDReader(db_manager=db_manager)

    if args.seed is not None and args.modo == 'simulacao':
        random.seed(args.seed)
        logging.info(f"Seed definida para reprodutibilidade: {args.seed}")

    logging.info(f"Iniciando leituras RFID... ({args.num_leituras} leituras no modo {args.modo})")
    for _ in range(args.num_leituras):
        reader.simular_leitura(args.modo, args.porta, args.baudrate)
        time.sleep(args.pausa)

    reader.analisar_dados(gerar_grafico=args.grafico)

if __name__ == '__main__':
    main()


In [None]:
import time
import csv
import random
import string
import os
import logging
import argparse
import sys
import sqlite3  # Native Python module for SQLite
from collections import Counter

# Attempt to import optional libraries for real RFID reading
try:
    import serial  # For pyserial (serial readers)
    SERIAL_AVAILABLE = True
except ImportError:
    SERIAL_AVAILABLE = False
    print("Warning: pyserial not installed. 'serial' mode will not work. Install with: pip install pyserial")

try:
    import nfc  # For nfcpy (NFC readers)
    NFC_AVAILABLE = True
except ImportError:
    NFC_AVAILABLE = False
    print("Warning: nfcpy not installed. 'nfc' mode will not work. Install with: pip install nfcpy")

try:
    import pandas as pd  # For advanced analysis
    PANDAS_AVAILABLE = True
except ImportError:
    PANDAS_AVAILABLE = False
    print("Warning: pandas not installed. Using Counter as fallback. Install with: pip install pandas")

try:
    import matplotlib.pyplot as plt  # For analysis graphs
    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False
    print("Warning: matplotlib not installed. Graphs will not be available. Install with: pip install matplotlib")

# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class DatabaseManager:
    """
    Database manager using native sqlite3.
    Handles RFID reading data storage and analysis.
    """

    def __init__(self, db_path='rfid_data.db'):
        self.db_path = db_path
        self._create_table()

    def _create_table(self):
        """Creates the rfid_readings table if it doesn't exist, with an index for performance."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS rfid_readings (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT NOT NULL,
                    uid TEXT NOT NULL,
                    valid BOOLEAN NOT NULL
                )
            ''')
            # Add index for faster UID queries
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_uid ON rfid_readings (uid)')
            conn.commit()
        except sqlite3.Error as e:
            logging.error(f"Error creating table: {e}")
            raise
        finally:
            conn.close()

    def insert_reading(self, timestamp, uid, valid):
        """Inserts a reading into the database."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('INSERT INTO rfid_readings (timestamp, uid, valid) VALUES (?, ?, ?)', (timestamp, uid, valid))
            conn.commit()
            logging.info(f"Reading inserted into DB: UID={uid}")
        except sqlite3.Error as e:
            logging.error(f"Error inserting into DB: {e}")
            raise
        finally:
            conn.close()

    def get_readings(self):
        """Returns all readings as a list of dictionaries."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('SELECT timestamp, uid, valid FROM rfid_readings')
            rows = cursor.fetchall()
            return [{'timestamp': row[0], 'uid': row[1], 'valid': row[2]} for row in rows]
        except sqlite3.Error as e:
            logging.error(f"Error querying DB: {e}")
            return []
        finally:
            conn.close()

    def analyze_data_db(self):
        """Analyzes data directly from DB using SQL."""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            # Total readings
            cursor.execute('SELECT COUNT(*) FROM rfid_readings')
            total_readings = cursor.fetchone()[0]

            # Unique UIDs
            cursor.execute('SELECT COUNT(DISTINCT uid) FROM rfid_readings')
            unique_uids = cursor.fetchone()[0]

            # Top UIDs
            cursor.execute('''
                SELECT uid, COUNT(*) as count
                FROM rfid_readings
                GROUP BY uid
                ORDER BY count DESC
                LIMIT 5
            ''')
            top_uids = {row[0]: row[1] for row in cursor.fetchall()}

            return total_readings, unique_uids, top_uids
        except sqlite3.Error as e:
            logging.error(f"Error analyzing DB: {e}")
            return 0, 0, {}
        finally:
            conn.close()

class RFIDReader:
    """
    Class to manage RFID readings, validation, and data analysis.
    Supports simulation, serial (pyserial), and NFC (nfcpy) modes.
    """

    def __init__(self, csv_file='rfid_data.csv', db_manager=None):
        self.csv_file = csv_file
        self.db_manager = db_manager
        self.read_uids = []  # Keeps for fallback if DB fails
        if not self.db_manager:
            self._initialize_csv()

    def _initialize_csv(self):
        """Initializes CSV with header if it doesn't exist."""
        try:
            if not os.path.exists(self.csv_file):
                with open(self.csv_file, mode='w', newline='') as file:
                    writer = csv.writer(file)
                    writer.writerow(['Timestamp', 'UID', 'Valid'])
        except IOError as e:
            logging.error(f"Error initializing CSV: {e}")
            raise

    def validate_uid(self, uid_hex):
        """
        Validates UID: must be pure hex and length between 4-20 chars (1-10 bytes, flexible for common tags).
        """
        if not uid_hex or not all(c in string.hexdigits.lower() for c in uid_hex):
            return False
        return 4 <= len(uid_hex) <= 20  # Flexible range; adjust as needed

    def generate_fake_uid(self):
        """Generates a fake UID for simulation."""
        length = random.choice([8, 12, 16])  # Common lengths
        return ''.join(random.choices(string.hexdigits.lower(), k=length))

    def read_uid_serial(self, port='/dev/ttyUSB0', baudrate=9600):
        """Reads UID via serial (pyserial). Assumes simple line-based output."""
        if not SERIAL_AVAILABLE:
            raise ImportError("pyserial not available for serial reading.")
        try:
            with serial.Serial(port, baudrate, timeout=1) as ser:
                data = ser.readline().decode('utf-8').strip()
                return data.lower() if data else None
        except serial.SerialException as e:
            logging.error(f"Error in serial reading: {e}")
            return None

    def read_uid_nfc(self):
        """Reads UID via NFC (nfcpy). Basic connection."""
        if not NFC_AVAILABLE:
            raise ImportError("nfcpy not available for NFC reading.")
        try:
            clf = nfc.ContactlessFrontend('usb')  # Adjust for your device
            tag = clf.connect(rdwr={'on-connect': lambda tag: False})
            uid = tag.identifier.hex() if tag else None
            clf.close()
            return uid.lower() if uid else None
        except Exception as e:
            logging.error(f"Error in NFC reading: {e}")
            return None

    def simulate_reading(self, mode='simulation', port='/dev/ttyUSB0', baudrate=9600):
        """Simulates or performs an RFID reading."""
        if mode == 'simulation':
            uid_hex = self.generate_fake_uid()
        elif mode == 'serial':
            uid_hex = self.read_uid_serial(port, baudrate)
            if uid_hex is None:
                logging.warning("No tag detected via serial.")
                return False
        elif mode == 'nfc':
            uid_hex = self.read_uid_nfc()
            if uid_hex is None:
                logging.warning("No tag detected via NFC.")
                return False
        else:
            raise ValueError("Invalid mode: use 'simulation', 'serial', or 'nfc'")

        timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
        valid = self.validate_uid(uid_hex)

        # Try DB; fallback to CSV
        if self.db_manager:
            try:
                self.db_manager.insert_reading(timestamp, uid_hex, valid)
            except Exception:
                logging.warning("DB failure; using CSV as fallback.")
                self._save_to_csv(timestamp, uid_hex, valid)
        else:
            self._save_to_csv(timestamp, uid_hex, valid)

        self.read_uids.append(uid_hex)
        logging.info(f"Tag read: UID={uid_hex}, Valid={valid}")
        return True

    def _save_to_csv(self, timestamp, uid_hex, valid):
        """Saves to CSV (fallback)."""
        try:
            with open(self.csv_file, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([timestamp, uid_hex, valid])
        except IOError as e:
            logging.error(f"Error writing to CSV: {e}")

    def analyze_data(self, generate_graph=False, export_json=False):
        """Analyzes data (from DB or list). Optionally exports to JSON."""
        if self.db_manager:
            total_readings, unique_uids, top_uids = self.db_manager.analyze_data_db()
        else:
            if not self.read_uids:
                logging.warning("No readings to analyze.")
                return

            if PANDAS_AVAILABLE:
                df = pd.DataFrame({'UID': self.read_uids})
                total_readings = len(df)
                unique_uids = df['UID'].nunique()
                top_uids = df['UID'].value_counts().head(5).to_dict()
            else:
                counter = Counter(self.read_uids)
                total_readings = len(self.read_uids)
                unique_uids = len(counter)
                top_uids = dict(counter.most_common(5))

        logging.info(f"Data analysis: Total readings: {total_readings}, Unique UIDs: {unique_uids}")
        logging.info("Top UIDs:")
        for uid, count in top_uids.items():
            logging.info(f"  UID {uid}: {count} times")

        # Export to JSON if requested
        if export_json:
            import json
            data = {'total_readings': total_readings, 'unique_uids': unique_uids, 'top_uids': top_uids}
            with open('rfid_analysis.json', 'w') as f:
                json.dump(data, f, indent=4)
            logging.info("Analysis exported to 'rfid_analysis.json'")

        # Generate graph if requested
        if generate_graph and MATPLOTLIB_AVAILABLE and top_uids:
            plt.figure(figsize=(8, 5))
            plt.bar(list(top_uids.keys()), list(top_uids.values()))
            plt.xlabel('UID')
            plt.ylabel('Frequency')
            plt.title('Top UIDs Read')
            plt.xticks(rotation=45)
            plt.tight_layout()
            plt.savefig('rfid_analysis_graph.png')
            logging.info("Graph saved as 'rfid_analysis_graph.png'")
            # Only show if not headless
            if plt.get_backend() != 'Agg':
                plt.show()

def main():
    parser = argparse.ArgumentParser(description="RFID Reader/Simulator with SQLite")
    parser.add_argument('--num-readings', type=int, default=10, help='Number of readings (default: 10)')
    parser.add_argument('--pause', type=float, default=0.5, help='Pause between readings in seconds (default: 0.5)')
    parser.add_argument('--mode', choices=['simulation', 'serial', 'nfc'], default='simulation', help='Mode: simulation, serial, or nfc (default: simulation)')
    parser.add_argument('--port', type=str, default='/dev/ttyUSB0', help='Serial port for serial mode (default: /dev/ttyUSB0)')
    parser.add_argument('--baudrate', type=int, default=9600, help='Baudrate for serial mode (default: 9600)')
    parser.add_argument('--seed', type=int, default=None, help='Seed for random (for reproducibility in simulation)')
    parser.add_argument('--db-path', type=str, default='rfid_data.db', help='Path to SQLite DB file (default: rfid_data.db)')
    parser.add_argument('--use-db', action='store_true', help='Use DB instead of CSV')
    parser.add_argument('--graph', action='store_true', help='Generate analysis graph (requires matplotlib)')
    parser.add_argument('--export-json', action='store_true', help='Export analysis to JSON')

    # Jupyter/Colab compatibility
    if 'ipykernel' in sys.modules or 'google.colab' in sys.modules:
        args = parser.parse_args([])  # Use defaults in notebooks
    else:
        args = parser.parse_args()

    # Initialize DB if requested
    db_manager = None
    if args.use_db:
        try:
            db_manager = DatabaseManager(args.db_path)
            logging.info(f"SQLite DB initialized at {args.db_path}")
        except Exception as e:
            logging.error(f"Error initializing DB: {e}. Using CSV as fallback.")

    # Initialize reader
    reader = RFIDReader(db_manager=db_manager)

    if args.seed is not None and args.mode == 'simulation':
        random.seed(args.seed)
        logging.info(f"Seed set for reproducibility: {args.seed}")

    logging.info(f"Starting RFID readings... ({args.num_readings} readings in {args.mode} mode)")
    for _ in range(args.num_readings):
        reader.simulate_reading(args.mode, args.port, args.baudrate)
        time.sleep(args.pause)

    reader.analyze_data(generate_graph=args.graph, export_json=args.export_json)

if __name__ == '__main__':
    main()


In [6]:
# === 1. INSTALAÇÕES ===
!pip install fastapi uvicorn pyngrok pandas matplotlib -q

import nest_asyncio
nest_asyncio.apply()

import sqlite3
import time
import random
import string
import threading
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
import pandas as pd
import matplotlib.pyplot as plt
from pyngrok import ngrok
import uvicorn

# === 2. SEU TOKEN DO NGROK (OBRIGATÓRIO) ===
NGROK_AUTH_TOKEN = "35FnG9cEOwAt4uLKoA1YaqL5uNc_4DpvfN7ksFxekMsnGGfwW"  # MUDE AQUI!
ngrok.set_auth_token(NGROK_AUTH_TOKEN)

# === 3. CONFIGURAÇÃO ===
app = FastAPI(title="RFID Dashboard Pro")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

DB_PATH = 'rfid_data.db'

# === 4. GERENCIADOR DO BANCO ===
class DatabaseManager:
    def __init__(self, db_path=DB_PATH):
        self.db_path = db_path
        self._create_table()

    def _create_table(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS rfid_readings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                uid TEXT NOT NULL,
                valid BOOLEAN NOT NULL
            )
        ''')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_uid ON rfid_readings (uid)')
        conn.commit()
        conn.close()

    def insert(self, uid, valid):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
        cursor.execute('INSERT INTO rfid_readings (timestamp, uid, valid) VALUES (?, ?, ?)',
                      (timestamp, uid, valid))
        conn.commit()
        conn.close()

    def get_recent(self, limit=20):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            SELECT timestamp, uid, valid FROM rfid_readings
            ORDER BY id DESC LIMIT ?
        ''', (limit,))
        rows = cursor.fetchall()
        conn.close()
        return [{'timestamp': r[0], 'uid': r[1], 'valid': bool(r[2])} for r in rows]

    def stats(self):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('SELECT COUNT(*), COUNT(DISTINCT uid), SUM(valid) FROM rfid_readings')
        total, unicos, validos = cursor.fetchone()
        cursor.execute('''
            SELECT uid, COUNT(*) FROM rfid_readings
            GROUP BY uid ORDER BY COUNT(*) DESC LIMIT 5
        ''')
        top5 = cursor.fetchall()
        conn.close()
        return {
            "total": total or 0,
            "unicos": unicos or 0,
            "validos": validos or 0,
            "top5": [{"uid": u[:10] + ('...' if len(u) > 10 else ''), "count": c} for u, c in top5]
        }

db = DatabaseManager()

# === 5. SIMULAÇÃO RFID ===
def validate_uid(uid):
    return 4 <= len(uid) <= 20 and all(c in string.hexdigits.lower() for c in uid)

def generate_uid():
    length = random.choice([8, 12, 16])
    return ''.join(random.choices(string.hexdigits.lower(), k=length))

def simulation_loop():
    while True:
        uid = generate_uid()
        valid = validate_uid(uid)
        db.insert(uid, valid)
        print(f"RFID: {uid} | Válido: {valid}")
        time.sleep(2)

# === 6. ROTAS DA API ===
@app.get("/", response_class=HTMLResponse)
def home():
    return """
    <div style="font-family: Arial; text-align: center; padding: 2rem;">
        <h1>RFID Dashboard Pro</h1>
        <p><a href="/docs">API Docs</a> | <a href="/tags">Tags</a> | <a href="/stats">Stats</a> | <a href="/graph">Graph</a> | <a href="/download">DB</a></p>
    </div>
    """

@app.get("/tags")
def tags():
    return db.get_recent(20)

@app.get("/stats")
def stats():
    return db.stats()

@app.get("/graph")
def graph():
    stats = db.stats()
    top = stats["top5"]
    if not top:
        return HTMLResponse("<p style='text-align:center; padding:2rem;'>Sem dados para gráfico</p>")
    uids = [t["uid"] for t in top]
    counts = [t["count"] for t in top]
    plt.figure(figsize=(10,6))
    bars = plt.bar(uids, counts, color=['#4CAF50', '#2196F3', '#FF9800', '#F44336', '#9C27B0'])
    plt.title("Top 5 UIDs Mais Lidos", fontsize=16, fontweight='bold')
    plt.ylabel("Frequência")
    plt.xlabel("UID")
    plt.xticks(rotation=45)
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height + 0.1,
                f'{int(height)}', ha='center', va='bottom')
    plt.tight_layout()
    plt.savefig("graph.png")
    plt.close()
    return FileResponse("graph.png", media_type="image/png")

@app.get("/download")
def download():
    return FileResponse(DB_PATH, filename="rfid_data.db")

# === 7. INICIAR TUDO ===
threading.Thread(target=simulation_loop, daemon=True).start()

def run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

threading.Thread(target=run_server, daemon=True).start()

# === 8. NGROK ===
ngrok.kill()
tunnel = ngrok.connect(8000, "http")
print(f"\nDASHBOARD RFID PROFISSIONAL RODANDO!")
print(f"Link: {tunnel}")
print(f"Teste: {tunnel}/tags")
print(f"Gráfico: {tunnel}/graph\n")

INFO:     Started server process [518]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
ERROR:    [Errno 98] error while attempting to bind on address ('0.0.0.0', 8000): [errno 98] address already in use
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.


RFID: 4aeeaecbbffc4ba4 | Válido: True

DASHBOARD RFID PROFISSIONAL RODANDO!
Link: NgrokTunnel: "https://ungouged-damien-scarabaeoid.ngrok-free.dev" -> "http://localhost:8000"
Teste: NgrokTunnel: "https://ungouged-damien-scarabaeoid.ngrok-free.dev" -> "http://localhost:8000"/tags
Gráfico: NgrokTunnel: "https://ungouged-damien-scarabaeoid.ngrok-free.dev" -> "http://localhost:8000"/graph

