In [1]:
import pyspark
import delta
from delta.tables import DeltaTable

In [2]:
builder = (
    pyspark.sql.SparkSession.builder.appName("nested-delta")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.databricks.delta.constraints.allowUnenforcedNotNull.enabled", True)
)

In [3]:
spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()

24/05/20 09:40:09 WARN Utils: Your hostname, Min-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.10.141 instead (on interface en0)
24/05/20 09:40:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/Users/anders/Library/Caches/pypoetry/virtualenvs/nested-delta-01GGJnWy-py3.12/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/anders/.ivy2/cache
The jars for the packages stored in: /Users/anders/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8a4aab3b-9e88-4342-8c99-34260561cf98;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 196ms :: artifacts dl 9ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   

# Data Structure

In [4]:
import pyspark.sql.functions as F
from pyspark.sql import types as t
import uuid
from delta.tables import *
from enum import Enum
from dataclasses import dataclass, field

In [5]:

@dataclass
class SIUnitDataMixin:
    sign: str
    unit: int
    dimension: str

class SIUnit(SIUnitDataMixin, Enum):
    AMPERE = ("A", "ampere", "electric current")
    AMPERE_HOUR = ("Ah", "ampere-hour", "electric capacity")
    CELCIUS = ("c", "celcius", "temperature")
    VOLT = ("V", "volt", "electric potential")
    MILLISECOND = ("ms", "millisecond", "time")

    def to_dict(self):
        return {
            "sign": self.sign,
            "name": self.unit,
            "dimension": self.dimension
        }

In [6]:
SIUnit.AMPERE_HOUR.to_dict()

{'sign': 'Ah', 'name': 'ampere-hour', 'dimension': 'electric capacity'}

In [7]:
data_schema = StructType([
    StructField(
        "test_id",
        t.StringType(),
        nullable=False,
        metadata={"comment": "uuid identifier for the test"}
    ),
    StructField(
        name="cycler_info",
        nullable=False,
        metadata={"comment": "Details about the cycler used to run the test"},
        dataType=t.StructType([
            StructField(
                "channel_number",
                t.IntegerType(),
                nullable=False,
                metadata={"comment": "Which equipment channel number (or similar identifier) was used to run this test"}
            ),
            StructField(
                "cycler_id",
                t.StringType(),
                nullable=False,
                metadata={"comment": "An optional identifying string unique to the cycler across the organization"}
            ),
            StructField(
                "server_version",
                t.StringType(),
                nullable=True,
                metadata={"comment": "The version of the server software used to run the test"}
            ),
            StructField(
                "client_version",
                t.StringType(),
                nullable=True,
                metadata={"comment": "The version of the client software used to run the test"}
            ),
        ]),
    ),
    StructField(
        "device_info",
        t.StructType([
            StructField(
                "device_id",
                t.StringType(),
                nullable=False,
                metadata={"comment": "Identifying string unique to the device/cell/pack across the organization"}
            ),
            StructField(
                "device_name",
                t.StringType(),
                nullable=False,
                metadata={"comment": "Descriptive name the test device"}
            ),
            StructField(
                "nominal_capacity",
                t.FloatType(),
                nullable=True,
                metadata={
                    "comment": "The nominal capacity of the test device in Ampere Hours",
                    "unit": SIUnit.AMPERE_HOUR.to_dict()
                }
            ),
            StructField(
                "calibration_date",
                t.DateType(),
                nullable=True,
                metadata={"comment": "The date the test device was calibrated"}
            ),
            StructField(
                "form_factor",
                t.StringType(),
                nullable=True,
                metadata={"comment": "The form factor of the test device"}
            ),
            StructField(
                "cell_formulation_id",
                t.StringType(),
                nullable=True,
                metadata={"comment": "An id that identifies what chemical formulation was used to make the cell."}
            ),
            StructField(
                "cathode",
                t.StringType(),
                nullable=True,
                metadata={"comment": "The type of cathode in the device"}
            ),
            StructField(
                "anode",
                t.StringType(),
                nullable=True,
                metadata={"comment": "The type of anode in the device"}
            )
        ]),
        nullable=False,
        metadata={"comment": "Info about the device/cell/pack that was tested"}
    ),
    StructField(
        "procedure",
        t.StructType([
            StructField(
                "procedure_id",
                t.StringType(),
                nullable=False,
                metadata={"comment": "uuid identifier for the test procedure"}
            ),
            StructField(
                "procedure_name",
                t.StringType(),
                nullable=False,
                metadata={"comment": "Descriptive name the test procedure"}
            ),
        ]),
        nullable=False,
        metadata={"comment": "Info about the test procedure"}
    ),
    StructField(
        "project",
        t.StringType(),
        nullable=True,
        metadata={"comment": "The name of the project the device belongs to"}
    ),
    StructField(
        name="test_data",
        nullable=False,
        metadata={"comment": "The measurements made by the cycler during the test"},
        dataType=StructType([
            StructField(
              "start_datetime",
              t.TimestampType(),
              nullable=False,
              metadata={"comment": "The absolute datetime the test started. ISO 8601 format. Must include timezone. If timezone is not specified, UTC is assumed"}  
            ),
            StructField(
            "measurements",
                t.ArrayType(
                    StructType([
                        StructField(
                            "current",
                            t.FloatType(),
                            nullable=False,
                            metadata={
                                "comment": "The current in Ampere. The sign convention is positive for charge current and negative for discharge current.",
                                "unit": SIUnit.AMPERE.to_dict()
                            }
                        ),
                        StructField(
                            "voltage",
                            t.FloatType(),
                            nullable=False,
                            metadata={
                                "comment": "The voltage in Volts",
                                "unit": SIUnit.VOLT.to_dict()
                            }
                        ),
                        StructField(
                            "elapsed_time",
                            t.IntegerType(),
                            nullable=False,
                            metadata={"comment": "Elapsed time in milliseconds since `start_datetime`"}
                        ),
                        StructField(
                            "measurement_datetime",
                            t.TimestampType(),
                            nullable=False,
                            metadata={"comment": """The absolute datetime of the measurement for the individual data point. 
                                    ISO 8601 format. Must include timezone. If timezone is not specified, UTC is assumed"""}
                        ),
                        StructField(
                            "datapoint_number",
                            t.IntegerType(),
                            nullable=True,
                            metadata={"comment": "The index number of the data point within the test. Starting at 1."}
                        ),
                        StructField(
                            "cycle_number",
                            t.LongType(),
                            nullable=True,
                            metadata={"comment": """The index number of the cycle within the test. Starting at 1.
                                    Cycles are monotonically increasing and gapless (doesn’t skip any numbers).
                                    Special Note: if a cycle column is not observed, the default algorith mwill look for 
                                    the first charge datapoint after any discharge datapoint as the boundary for a new cycle"""}
                        ),
                        StructField(
                            "step_index",
                            t.LongType(),
                            metadata={"comment": "Program step number associated with each control step."}
                        ),
                        StructField(
                            "step_time",
                            t.FloatType(),
                            metadata={
                                "comment": "Elapsed time since the start of the start of the current step.",
                                "unit": SIUnit.MILLISECOND.to_dict()
                            }
                        )

                    ])
                )
            )
        ]),
       
    )
])

In [9]:
# create a delta table for fake test data
battery_test_data = (
    DeltaTable
    .createOrReplace(spark)
    .tableName("battery_test_data")
    .property("descrption", "battery test data")
    .addColumns(data_schema)
    .location("./temp/battery_test_data")
    .execute()
)

24/05/20 09:41:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [24]:
# create timestamp with the datetime standard library from "2023-05-01T10:00:00+00:00"


datetime.datetime(2023, 5, 1, 10, 0, tzinfo=datetime.timezone.utc)

In [25]:
# generate fake Data based on data_schema and insert into delta table
from datetime import datetime

fake_data = [
    (
        "test_id_1",
        (1, "cycler_001", "1.2.3", "2.4.5"),
        ("device_001", "Device A", 2.5, datetime.strptime("2023-06-15", "%Y-%m-%d"), "cylindrical", "ABC123", "NMC", "Graphite"),
        ("proc_001", "Procedure X"),
        "Project Alpha",
        (
            datetime.fromisoformat("2023-05-01T10:00:00+00:00"), 
            [
                (0.5, 3.7, 0, datetime.fromisoformat("2023-05-01T10:00:00+00:00"), 1, 1, 1, 0.0),
                (-1.0, 3.2, 3600000, datetime.fromisoformat("2023-05-01T11:00:00+00:00"), 2, 1, 2, 3600000.0)
            ]
        )
    )
]

# Create a DataFrame from the sample data
df = spark.createDataFrame(fake_data, data_schema)

# Show the DataFrame
df.show(truncate=False)
# insert fake data into delta table
df.write.mode("append").insertInto("battery_test_data")

                                                                                

+---------+-----------------------------+---------------------------------------------------------------------------+-----------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------+
|test_id  |cycler_info                  |device_info                                                                |procedure              |project      |test_data                                                                                                                               |
+---------+-----------------------------+---------------------------------------------------------------------------+-----------------------+-------------+----------------------------------------------------------------------------------------------------------------------------------------+
|test_id_1|{1, cycler_001, 1.2.3, 2.4.5}|{device_001, Device A, 2.5, 2023-06-15, cylindrical, ABC123, NMC, Graphite}|{pro

                                                                                

In [26]:
# select from the table
selection = spark.sql("SELECT * FROM battery_test_data")

In [27]:
selection.select("test_data.measurements.current").show()

+-----------+
|    current|
+-----------+
|[0.5, -1.0]|
+-----------+

