In [None]:
from pyspark.sql import SparkSession
from delta import *
import pandas as pd
import numpy as np

builder = SparkSession.builder.appName("flight-fusion-test") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
import os
from pathlib import Path
from subprocess import STDOUT, check_output

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.fs import LocalFileSystem

DATA_ROOT = "_ff_data"

def file_relative_path(dunderfile, relative_path):
    return os.path.join(os.path.dirname(dunderfile), relative_path)


def workspace_root() -> Path:
    output = check_output(["cargo", "metadata"], stderr=STDOUT).decode()  # nosec
    key = 'workspace_root":"'
    idx = output.find(key)
    part = output[idx + len(key) :]
    idx = part.find('"')

    return Path(part[:idx])

data_path = workspace_root() / "test" / "db"

In [None]:
ts = pd.date_range(
    start=pd.to_datetime("2020-01-01 00:00:00"),
    end=pd.to_datetime("2020-01-04 00:00:00"),
    freq="3h",
)

table = pa.Table.from_pydict(
    {
        "timestamp": ts,
        "date":  pd.to_datetime(pd.Series(ts)).dt.date,
        "string": np.random.choice(["a", "b", "c"], len(ts)),
        "double": np.random.randn(len(ts)),
        "real": np.random.randn(len(ts)),
        "float": np.random.randn(len(ts)),
    }
)

sdf = spark.createDataFrame(table.to_pandas()).repartition(1)

sdf.write.format("delta").save(str(data_path / "delta" / DATA_ROOT / "simple"), mode="overwrite")
sdf.write.format("delta").partitionBy("date").save(
    str(data_path / "delta" / "partitioned" / DATA_ROOT / "date"), mode="overwrite"
)
sdf.write.format("delta").partitionBy("string").save(
    str(data_path / "delta" / "partitioned" / DATA_ROOT / "string"), mode="overwrite"
)


In [None]:
DeltaTable.createIfNotExists(spark) \
  .tableName("people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .location("tmp/people") \
  .execute()