In [3]:
from pathlib import Path
from mpflash.vendor.board_database import Database
from mpflash.logger import log

## build boardlist from repo

In [4]:
## iterator to flatten the board database into a list of tuples


def iter_boards(db: Database, version: str = ""):
    version = version.strip()
    for b in db.boards:
        board = db.boards[b]
        yield (
            version,
            board.name,
            board.name,
            board.mcu,
            "",  # no variant
            board.port.name if board.port else "",
            board.path.split("/micropython/", 1)[1],  # TODO - remove hack
            board.description,
            "micropython",  # family
        )
        if board.variants:
            for v in board.variants:
                yield (
                    version,
                    f"{board.name}-{v.name}",
                    board.name,
                    board.mcu,
                    v.name,
                    board.port.name if board.port else "",
                    board.path.split("/micropython/", 1)[1],  # TODO - remove hack
                    v.description,
                    "micropython",  # family
                )

In [5]:
from typing import List
import mpflash.basicgit as git
from mpflash.versions import get_preview_mp_version, get_stable_mp_version, micropython_versions


def boardlist_from_repo(
    versions: List[str],
    mpy_dir: Path,
):
    longlist = []
    if not mpy_dir.is_dir():
        print(f"Directory {mpy_dir} not found")
    for version in versions:
        print("-" * 60)
        build_nr = ""
        if "preview" in version:
            ok = git.checkout_tag("master", mpy_dir)
            if describe := git.get_git_describe(mpy_dir):
                parts = describe.split("-", 3)
                if len(parts) >= 3:
                    build_nr = parts[2]
        else:
            ok = git.checkout_tag(version, mpy_dir)
        if not ok:
            print(f"Failed to checkout {version} in {mpy_dir}")
            continue

        print(f"{git.get_git_describe(mpy_dir)} - {build_nr}")
        # un-cached database
        db = Database(mpy_dir)
        shortlist = list(iter_boards(db, version=version))
        print(f"boards found {len(db.boards.keys())}")
        print(f"boards-variants found {len(shortlist)}")
        longlist.extend(shortlist)
    return longlist

In [6]:
mpy_path = Path("../repos/micropython")
version = "stable"
db = Database(mpy_path)
# print( db.boards)
shortlist = list(iter_boards(db, version=version))
print(f"boards found {len(db.boards.keys())}")
print(db.boards["RPI_PICO2_W"])

boards found 189
Board(name='RPI_PICO2_W', variants=[], url='https://www.raspberrypi.com/products/raspberry-pi-pico-2/', mcu='rp2350', product='Pico 2 W', vendor='Raspberry Pi', images=['rp2-pico2-w.jpg'], deploy=['../deploy.md'], port=Port(name='rp2'), path='../repos/micropython/ports/rp2/boards/RPI_PICO2_W')


In [7]:
# mpy_path = Path("../repos/micropython")
# testlist = boardlist_from_repo(
#     versions = micropython_versions(minver="1.24.1"), # older versions do not have the board.json files
#     mpy_dir = mpy_path,
#     )


In [8]:
do_package = False


mpy_path = Path("../repos/micropython")
all = micropython_versions(minver="1.18")
# all = all[-1:]

if do_package:
    assert mpy_path.exists()
    longlist = boardlist_from_repo(
        versions=all,
        mpy_dir=mpy_path,
    )

    print("=" * 60)
    print(f"Total boards-variants: {len(longlist)}")

------------------------------------------------------------
v1.18 - 
boards found 124
boards-variants found 148
------------------------------------------------------------
v1.19 - 
boards found 141
boards-variants found 165
------------------------------------------------------------
v1.19.1 - 
boards found 141
boards-variants found 165
------------------------------------------------------------
v1.20.0 - 
boards found 160
boards-variants found 184
------------------------------------------------------------
v1.21.0 - 
boards found 158
boards-variants found 190
------------------------------------------------------------
v1.22.0 - 
boards found 163
boards-variants found 195
------------------------------------------------------------
v1.22.1 - 
boards found 163
boards-variants found 195
------------------------------------------------------------
v1.22.2 - 
boards found 163
boards-variants found 195
------------------------------------------------------------
v1.23.0 - 
boards found

## Package the list of boards in a zipped csv for inclusion in the mpflash package 
without compression , or as a database it takes up too much space 



In [9]:
import os
import zipfile
import pandas as pd

zip_file = "micropython_boards.zip"
csv_filename = "micropython_boards.csv"

if do_package:
    columns = ["version", "board_id", "board_name", "mcu", "variant", "port", "path", "description", "family"]
    df = pd.DataFrame(longlist, columns=columns)

    # Create the ZIP file and add the CSV data directly without creating an intermediate file
    with zipfile.ZipFile(zip_file, "w", zipfile.ZIP_DEFLATED) as zipf:
        # Create a temporary in-memory CSV string
        csv_data = df.to_csv(index=False)
        # Write the CSV data directly to the zip file
        zipf.writestr(csv_filename, csv_data)

    # # Get file sizes to show compression ratio
    # csv_size = os.path.getsize(csv_filename)
    zip_size = os.path.getsize(zip_file)
    # compression_ratio = (1 - (zip_size / csv_size)) * 100

    print(f"ZIP file created: {zip_file} ({zip_size:,} bytes)")
    # print(f"CSV file created: {csv_filename} ({csv_size:,} bytes)")
    # print(f"Compression ratio: {compression_ratio:.2f}%")

ZIP file created: micropython_boards.zip (16,634 bytes)


## Create database from the zipped / boardlist.csv file

### Define views

In [10]:
# Views

import sqlite3


def create_views(conn: sqlite3.Connection):
    """
    Create views for the SQLite database.

    Args:
        conn (sqlite3.Connection): SQLite connection object
    """
    log.debug("Creating database views")
    cursor = conn.cursor()

    views = {
        "latest_boards": """
            SELECT b.*, d.version, d.filename, d.source
            FROM boards b
            LEFT JOIN downloads d ON b.board_id = d.board_id AND b.version = d.version
        """,
        "board_downloaded": """
            SELECT 
                b.board_id,
                b.board_name ,
                UPPER(b.variant) as variant,
                b.description,
                LOWER(b.mcu) as mcu,
                b.version as version,
                b.path,
                b.port as port,
                b.family as family,
                d.version as download_version,
                d.build,
                d.filename

            FROM
                boards b
            left JOIN 
                downloads d 
            ON 
                b.board_id = d.board_id
                AND d.version LIKE b.version || '%'
            ORDER BY
                d.version DESC,
                d.build DESC,
                d.board_id;
        """,
        "board_variants_versions": """
        SELECT 
            UPPER(board_name) as board_name,
            json_group_array (DISTINCT UPPER(variant)) AS variants,
            json_group_array (DISTINCT (version)) AS versions
        FROM boards
        GROUP BY UPPER(board_name)
        ORDER BY UPPER(board_name)
        """,
        "board_id_versions": """
        SELECT 
            UPPER(board_id) as board_id,
            json_group_array (DISTINCT (version)) AS versions
        FROM boards
        GROUP BY UPPER(board_id)
        ORDER BY UPPER(board_id)
        """,
    }

    # Drop existing views if they exist
    for view_name in views:
        cursor.execute(f"DROP VIEW IF EXISTS {view_name}")

    # Create new views
    for view_name, query in views.items():
        cursor.execute(f"CREATE VIEW {view_name} AS {query}")

    conn.commit()

In [11]:
# create basic schema
import sqlite3


def create_schema(conn: sqlite3.Connection):
    log.debug("Creating database tables")
    with conn:
        # Create metadata table if it doesn't exist
        conn.execute("""
        CREATE TABLE IF NOT EXISTS metadata (
            key TEXT PRIMARY KEY,
            value TEXT
        )
        """)

        # Create the same table schema
        conn.execute("""
        CREATE TABLE IF NOT EXISTS boards (
            "version" TEXT NOT NULL,
            "board_id" TEXT NOT NULL,
            "board_name" TEXT,
            "mcu" TEXT,
            "variant" TEXT,
            "path" TEXT,
            "description" TEXT,
            "text" TEXT,
            "port" TEXT DEFAULT "",
            "familiy" TEXT DEFAULT "micropython",
            PRIMARY KEY(version, board_id)
        )
        """)
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS downloads (
                port TEXT,
                board TEXT,
                filename TEXT PRIMARY KEY,
                source TEXT,
                board_id TEXT,
                version TEXT,
                build TEXT,
                ext TEXT,
                family TEXT,
                custom TEXT,
                description TEXT
            )
            """
        )

        conn.commit()

        create_views(conn)

In [12]:
# metadata


from mpflash.config import config
import sqlite3


def get_database_version(conn: sqlite3.Connection):
    # Connect to the SQLite database and fetch the version

    cursor = conn.cursor()

    # Query for the 'version' key
    try:
        cursor.execute("SELECT value FROM metadata WHERE key = ?", ("version",))
    except sqlite3.OperationalError as e:
        return None
    # Result will be None if not found, otherwise will contain the value
    value = value[0] if (value := cursor.fetchone()) else None
    return value


def set_database_version(conn: sqlite3.Connection, version: str):
    # Connect to the SQLite database and set the version
    with sqlite3.connect(config.db_path) as conn:
        cursor = conn.cursor()
        # Create metadata table if it doesn't exist
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS metadata (
            key TEXT PRIMARY KEY,
            value TEXT
        )
        """)
        conn.commit()
        # Insert or replace the version value
        cursor.execute("INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)", ("version", version))
        conn.commit()

In [13]:
# update boardlist from zip
import zipfile
import io
import sqlite3
import pandas as pd
from pathlib import Path


def update_boardlist_schema(conn: sqlite3.Connection):
    conn.row_factory = sqlite3.Row  # return rows as dicts

    # Create indices for faster searching
    conn.execute("CREATE INDEX IF NOT EXISTS idx_version ON boards (version)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_id_version ON boards (board_id,version)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_descr ON boards (description)")

    conn.commit()


def load_data_from_zip(conn: sqlite3.Connection, zip_file: Path):
    csv_filename = "micropython_boards.csv"  # name of the .csv inside the .zip

    # Check if the zip file exists
    if not zip_file.exists() or not zip_file.is_file():
        print(f"Zip file {zip_file} not found.")
        return
    conn.row_factory = sqlite3.Row  # return rows as dicts

    # Load data directly from the zip file
    with zipfile.ZipFile(zip_file, "r") as zipf:
        # Read the CSV file from the zip
        with zipf.open(csv_filename) as csv_file:
            # Use pandas to read the CSV data
            df_boardlist = pd.read_csv(io.TextIOWrapper(csv_file, "utf-8"))
            # Replace NaN values with empty strings to avoid NULL values in the database
            df_boardlist = df_boardlist.fillna("")
            # Insert data into the new SQLite database
            df_boardlist.to_sql("boards", conn, if_exists="replace", index=False)

    # Create indices for faster searching
    conn.execute("CREATE INDEX IF NOT EXISTS idx_version ON boards (version)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_id_version ON boards (board_id,version)")
    # conn.execute('CREATE INDEX IF NOT EXISTS idx_board_id ON boards (board_id)')
    # conn.execute('CREATE INDEX IF NOT EXISTS idx_board_name ON boards (board_name)')
    conn.execute("CREATE INDEX IF NOT EXISTS idx_descr ON boards (description)")

    conn.commit()

## create database 
- check version  of the data 
- update the data 
- 

## Migrate json data

In [14]:
from xmlrpc.client import boolean
import pandas as pd
import sqlite3
import os
from pathlib import Path


def load_jsonl_to_sqlite(
    jsonl_path: Path,
    conn: sqlite3.Connection,
    table_name="downloads",
):
    """
    Load a JSONL file into a SQLite database using pandas.

    Args:
        jsonl_path (str or Path): Path to the JSONL file
        db_path (str or Path): Path to the SQLite database

    Returns:
        int: Number of records imported
    """

    # Ensure file exists
    if not jsonl_path.exists():
        raise FileNotFoundError(f"JSONL file not found: {jsonl_path}")

    # Read JSONL file into pandas DataFrame
    print("Reading JSONL file into DataFrame...")
    df = pd.read_json(jsonl_path, lines=True)
    record_count = len(df)

    if record_count == 0:
        print("JSONL file is empty")
        return 0
    # clean up the column names and data
    # Replace NaN values with empty strings to avoid NULL values in the database
    df = df.fillna("")
    # remove the url column
    if "url" in df.columns:
        df = df.drop(columns=["url"])
    # rename the variant column to board_id
    if "variant" in df.columns:
        df = df.rename(columns={"variant": "board_id"})
    if "firmware" in df.columns:
        df = df.rename(columns={"firmware": "source"})

    # # change the preview and custom columns to boolean
    # for col in ['custom', 'preview']:
    #     if col in df.columns:
    #         df[col] = df[col].astype(bool)
    # Convert filename paths to POSIX format
    if "filename" in df.columns:
        df["filename"] = df["filename"].apply(lambda x: Path(x).as_posix() if x else "")

    # append '-preview' to the version column if preview is True
    if "preview" in df.columns:
        df["version"] = df.apply(lambda row: f"{row['version']}-preview" if row["preview"] else row["version"], axis=1)
        df = df.drop(columns=["preview"])

    # first remove all rows from the table
    conn.execute(f"DELETE FROM {table_name}")
    conn.commit()

    # Write DataFrame to SQLite
    print(f"Writing {record_count} records to database...")
    df.to_sql(table_name, conn, if_exists="append", index=False)

    # Create indices for faster searching
    cursor = conn.cursor()
    for col in df.columns:
        if col.lower() in ["board_id", "filename", "version"]:
            cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_dl_{col} ON {table_name} ("{col}")')

    conn.commit()

    print(f"Successfully imported {record_count} records")
    return record_count

### Backup function
]

In [15]:
import sqlite3
from pathlib import Path


def backup_db(conn: sqlite3.Connection, backup_path: Path):
    """
    Backup the SQLite database to a specified path.

    Args:
        conn (sqlite3.Connection): SQLite connection object
        backup_path (str or Path): Path to save the backup file
    """
    # Ensure the backup directory exists
    backup_path.parent.mkdir(parents=True, exist_ok=True)

    # Perform the backup
    with open(backup_path, "wb") as f:
        for line in conn.iterdump():
            f.write(f"{line}\n".encode("utf-8"))

    print(f"Backup created at {backup_path}")

## Combine all 


In [16]:
# combine all the above

import sqlite3
from packaging.version import Version

with sqlite3.connect(config.db_path) as conn:
    if not get_database_version(conn):
        create_schema(conn)
        set_database_version(conn, "0.1")
    current = get_database_version(conn)
    if not current or Version(current) < Version("1.24.1"):
        update_boardlist_schema(conn)
        zip_file = Path("micropython_boards.zip")
        load_data_from_zip(conn, zip_file)

        # Create/update views
        create_views(conn)

        # set_database_version(conn, "1.24.1")

    # Test retrieving some data
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM boards")
    record_count = cursor.fetchone()[0]
    print(f"Total records stored in database from zip: {record_count}")

[32m2025-04-16 21:26:31.618[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mcreate_views[0m:[36m11[0m - [34m[1mCreating database views[0m


Total records stored in database from zip: 2515


In [17]:
with sqlite3.connect(config.db_path) as conn:
    # Execute the function
    jsonl_path = config.firmware_folder / "firmware.jsonl"
    if jsonl_path.exists():
        print(f"Loading JSONL file {jsonl_path} into database...")
        record_count = load_jsonl_to_sqlite(jsonl_path, conn)

Loading JSONL file C:\Users\josverl\Downloads\firmware\firmware.jsonl into database...
Reading JSONL file into DataFrame...
Writing 108 records to database...
Successfully imported 108 records


### Operational Create a backup

In [18]:
from mpflash.config import config

if 0:
    with sqlite3.connect(config.db_path, isolation_level="EXCLUSIVE") as conn:
        backup_db(conn, config.db_path.with_suffix(".bak"))

In [19]:
from os import path
from pydantic import BaseModel
from typing import List
import json
import sqlite3


class BoardVersion(BaseModel):
    board_id: str
    variant: str
    description: str
    versions: List[str]
    path: str = ""

    @classmethod
    def from_db_row(cls, row):
        return cls(
            board_id=row["board_id"],
            variant=row["variant"],
            description=row["description"],
            versions=json.loads(row["versions"]),
            path=row["path"],
        )


def get_board_versions(cursor, search_desc: str, search_variant: str) -> List[BoardVersion]:
    query = """
    SELECT DISTINCT
        UPPER(board_id) as board_id,
        UPPER(variant) as variant,
        description,
        json_group_array(version) as versions,
        path
    FROM boards
    WHERE description LIKE ?
    AND variant like ?
    GROUP BY UPPER(board_id) , UPPER(variant), description;
    """

    cursor.execute(query, (search_desc, search_variant))
    rows = cursor.fetchall()
    # conn.close()

    return [BoardVersion.from_db_row(row) for row in rows]

In [20]:
# # db_path = r"d:\mypython\mpflash\scripts\micropython_boards_from_zip.db"

# conn2.row_factory = sqlite3.Row  # return rows as dicts
# cursor = conn2.cursor()
# description = "Pimoroni Pico LiPo"#  16MB with RP2040"
# variant = "FLASH_16M"

# description = "PYBv1.1"
# variant = "DP"

# descr = description.rsplit(" with ",1)[0].strip()
# print(f"Searching for description: {descr} and variant: {variant}")
# results = get_board_versions(cursor, f"{descr}%", variant)


# for board in results:
#     print(board)

In [21]:
from mpflash.common import PORT_FWTYPES, FWInfo


def add_download_row(conn: sqlite3.Connection, board: FWInfo):
    """
    Adds / updates a row to the downloaded firmware table in the database.

      - downloads.board_id <-- FWInfo.variant
      - downloads.source   <-- FWInfo.firmware

        The unique key is downloads.filename
    Args:
        conn : The database connection to use.
        board : The firmware information to add to the database.

    """
    with conn:
        conn.execute(
            """
            INSERT INTO downloads 
                (port, board, filename, source, board_id, version, build, ext, family, custom, description) 
            VALUES 
                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(filename) DO UPDATE SET
                port=excluded.port,
                board=excluded.board,
                source=excluded.source,
                board_id=excluded.board_id,
                version=excluded.version,
                build=excluded.build,
                ext=excluded.ext,
                family=excluded.family,
                custom=excluded.custom,
                description=excluded.description
            """,
            (
                board.port,
                board.board,
                board.filename,
                board.firmware,
                board.variant,
                board.version,
                board.build,
                board.ext,
                board.family,
                board.custom,
                board.description,
            ),
        )
        conn.commit()

In [22]:
fw_test = FWInfo(
    port="JOVER",
    board="ESP32-WROVER-IE",
    variant="FLASH_4TB",
    firmware="esp32-20231001-v1.24.1.hex",
    version="v1.24.1",
    build="20231001",
    ext=".bin",
    family="ESP32",
    custom=False,
    description="ESP32-WROVER-IE 16MB with ESP32",
    filename="esp32-20231001-v1.24.1.bin",
)

with sqlite3.connect(config.db_path) as conn:
    add_download_row(conn, fw_test)

In [23]:
def downloaded_firmwares_from_db() -> List[FWInfo]:
    """Load a list of locally downloaded firmwares from the database"""

    with sqlite3.connect(config.db_path) as conn:
        firmwares: List[FWInfo] = []
        try:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute("SELECT * FROM downloads")
            rows = cursor.fetchall()
            for row in rows:
                fw_info = FWInfo.from_dict(
                    {
                        "filename": row["filename"],
                        "version": row["version"],
                        "board": row["board"],
                        "variant": row["board_id"],
                        "port": row["port"],
                        "firmware": row["source"],
                        "build": row["build"],
                        "preview": 1 if int(row["build"]) > 0 else 0,
                    }
                )
                firmwares.append(fw_info)
        except sqlite3.Error as e:
            log.error(f"Database error: {e}")

    # sort by filename
    firmwares.sort(key=lambda x: x.filename)
    return firmwares

In [24]:
downloaded_firmwares_from_db()

[FWInfo(port='JOVER', board='ESP32-WROVER-IE', filename='esp32-20231001-v1.24.1.bin', firmware='esp32-20231001-v1.24.1.hex', variant='FLASH_4TB', preview=1, version='v1.24.1', url='', build='20231001', ext='.hex', family='micropython', custom=False, description=''),
 FWInfo(port='esp32', board='ESP32_GENERIC', filename='esp32/ESP32_GENERIC-D2WD-v1.23.0.bin', firmware='https://micropython.org/resources/firmware/ESP32_GENERIC-D2WD-20240602-v1.23.0.bin', variant='ESP32_GENERIC-D2WD', preview=0, version='v1.23.0', url='', build='0', ext='.bin', family='micropython', custom=False, description=''),
 FWInfo(port='esp32', board='ESP32_GENERIC', filename='esp32/ESP32_GENERIC-D2WD-v1.24.0-preview.109.bin', firmware='https://micropython.org/resources/firmware/ESP32_GENERIC-D2WD-20240718-v1.24.0-preview.109.gfce3cea24.bin', variant='ESP32_GENERIC-D2WD', preview=1, version='v1.24.0-preview', url='', build='109', ext='.bin', family='micropython', custom=False, description=''),
 FWInfo(port='esp32', 