In [None]:
import detective.core as detective
import detective.functions as functions
import pandas as pd

db = detective.db_from_hass_config()

In [None]:
db.entities

In [8]:
from datetime import datetime

export_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

In [9]:
hadf = db.fetch_all_sensor_data(limit=1_000_000_000)


            SELECT states.state, states.last_updated_ts, states_meta.entity_id
            FROM states
            JOIN states_meta
            ON states.metadata_id = states_meta.metadata_id
            WHERE
                states_meta.entity_id  LIKE '%sensor%'
            AND
                states.state NOT IN ('unknown', 'unavailable')
            ORDER BY last_updated_ts DESC
        LIMIT 1000000000
The returned Pandas dataframe has 2796940 rows of data.


In [10]:
hadf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2796940 entries, 0 to 2796939
Data columns (total 3 columns):
 #   Column           Dtype  
---  ------           -----  
 0   state            object 
 1   last_updated_ts  float64
 2   entity_id        object 
dtypes: float64(1), object(2)
memory usage: 64.0+ MB


In [None]:
hadf.head()

In [13]:
staging_path = "/tmp/"
filename = f'upload_{export_time}.csv'
filename_parquet = filename.split(".")[0] + ".parquet"
print(filename_parquet)

upload_2024-11-15_22-58-12.parquet


In [14]:
hadf.to_parquet(staging_path + filename_parquet)

In [15]:
%%sh
ls -la /tmp

total 27636
drwxrwxrwt 1 root root     4096 Nov 15 22:59 .
drwxr-xr-x 1 root root     4096 Nov 15 22:48 ..
drwxr-xr-x 2 root root     4096 Nov 15 22:48 .bashio
-rw-r--r-- 1 root root 28285435 Nov 15 22:59 upload_2024-11-15_22-58-12.parquet


In [None]:
# Utility commands to create schema and table:
cursor.execute(f"CREATE SCHEMA `{CATALOG}`.{SCHEMA}")
cursor.execute(f"CREATE TABLE `{CATALOG}`.{SCHEMA}.sensor_data (state FLOAT, last_updated_ts TIMESTAMP, entity_id STRING);")

In [None]:
import os
from databricks import sql


with sql.connect(server_hostname            = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path                  = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token               = os.getenv("DATABRICKS_TOKEN"),
                 staging_allowed_local_path = "/tmp/") as connection:

  with connection.cursor() as cursor:

    # Write a local file to the specified path in a volume.
    # Specify OVERWRITE to overwrite any existing file in that path.
    cursor.execute(
      f"PUT '/tmp/{filename_parquet}' INTO '{DBX_VOLUMES_PATH}/{filename_parquet}' OVERWRITE"
    )

In [None]:
# Alternative SQL snippet for initial load:
f"""
COPY INTO `{CATALOG}`.{SCHEMA}.sensor_data
  FROM (
    SELECT 
      TRY_CAST(state AS FLOAT) AS state, 
      TRY_CAST(last_updated_ts AS TIMESTAMP) AS last_updated_ts, 
      entity_id 
    FROM '{DBX_VOLUMES_PATH}/{filename_parquet}'
  )
  FILEFORMAT = parquet
  COPY_OPTIONS ('mergeSchema' = 'true');
"""

In [None]:
with sql.connect(server_hostname            = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path                  = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token               = os.getenv("DATABRICKS_TOKEN"),
                 staging_allowed_local_path = "/tmp/") as connection:

  with connection.cursor() as cursor:

    # Upsert data from the uploaded file into the table.
    cursor.execute(
      f"""
      MERGE INTO `{CATALOG}`.{SCHEMA}.sensor_data AS target
        USING (
          SELECT 
            TRY_CAST(state AS FLOAT) AS state, 
            TRY_CAST(last_updated_ts AS TIMESTAMP) AS last_updated_ts, 
            entity_id
          FROM read_files('{DBX_VOLUMES_PATH}/{filename_parquet}')
        ) AS source
        ON target.entity_id = source.entity_id AND target.last_updated_ts = source.last_updated_ts
        WHEN MATCHED THEN
          UPDATE SET 
            target.state = source.state,
            target.last_updated_ts = source.last_updated_ts
        WHEN NOT MATCHED THEN
          INSERT (state, last_updated_ts, entity_id)
          VALUES (source.state, source.last_updated_ts, source.entity_id);
      """
    )

    print(cursor.fetchall())