In [None]:
import os
import pandas as pd
import sqlite3
import shutil
from IPython.display import display, Markdown
pd.options.plotting.backend = "plotly"

pd.set_option("max_colwidth", 200)

get_from_gdrive_backup = True # This option only works in Colab. The logged in Google Account needs to be the same as for your backup from Home Assistant. Set it to False if you want to copy the data manually.

db_filename = "home-assistant_v2.db"

In [None]:
is_colab = True if os.getenv("COLAB_RELEASE_TAG") else False

def print_md(text:str):
    display(Markdown(text))

def q(query:str):
    with sqlite3.connect(db_filename) as con:
        df = pd.read_sql_query(query, con)
        return df

def extract_file(tar_filename:str, filename:str, path="."):
    import tarfile
    print(f"Extract from {tar_filename}")
    with tarfile.open(tar_filename, 'r') as tar:
        tar.extract(filename, path=path)

def read_from_backup(filename:str):
    tar_gz = "./homeassistant.tar.gz"
    extract_file(filename, "./homeassistant.tar.gz", path="data")
    extract_file(os.path.join("data", tar_gz), "data/home-assistant_v2.db")
    shutil.move("data/home-assistant_v2.db", "home-assistant_v2.db")

def get_backup_filename_from_gdrive_snapshot():
    snapshot_folder = os.path.join(gdrive_folder, "MyDrive", "Home Assistant Snapshots")
    backups_filenames = backups_filenames = [e for e in os.listdir(snapshot_folder) if e.lower().startswith("core_")]
    if not backups_filenames:
        raise Exception("Could not find backups in the snapshot older")
    backups_filenames = [os.path.join(snapshot_folder, e) for e in backups_filenames]
    result = backups_filenames[0]
    return result

if is_colab and get_from_gdrive_backup and not os.path.exists(db_filename):
    gdrive_folder = "/gdrive"
    if is_colab:
        from google.colab import drive
        drive.mount(gdrive_folder)
    read_from_backup(get_backup_filename_from_gdrive_snapshot())



In [None]:

print_md("# States Usage")
states = q('''
SELECT
  COUNT(*) AS cnt,
  COUNT(*) * 100 / (SELECT COUNT(*) FROM states) AS cnt_pct,
  states_meta.entity_id
FROM states
INNER JOIN states_meta ON states.metadata_id=states_meta.metadata_id
GROUP BY states_meta.entity_id
ORDER BY cnt DESC
''')
#states = states[states.entity_id.str.contains("text here")]
#states = states[states.cnt > 5000]
states.head(20)


In [None]:
states[["cnt", "entity_id"]].sort_values(by="cnt", ascending=True).set_index("cnt").plot(kind="line")

In [None]:
print_md("# Events")
q('''
SELECT
  COUNT(*) as cnt,
  COUNT(*) * 100 / (SELECT COUNT(*) FROM events) AS cnt_pct,
  event_types.event_type
FROM events
INNER JOIN event_types ON events.event_type_id = event_types.event_type_id
GROUP BY event_types.event_type
ORDER BY cnt DESC
''').head(20)