# Using delta tables in data warehouse


We will create a delta table using spark.
And we will schedule to populate the table with random data every minute; imitating data ingest.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, IntegerType, StringType

# Create a Spark session
spark = SparkSession.builder \
    .appName("CreateEmptyDeltaTable") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .getOrCreate()

# Define the schema
schema = StructType([
    StructField("ID", LongType(), False),
    StructField("TimeIngress", TimestampType(), True),
    StructField("valOfIngress", IntegerType(), True),
    StructField("textIngress", StringType(), True)
])

# Create an empty DataFrame with the specified schema
empty_df = spark.createDataFrame([], schema=schema)



# Write the empty DataFrame as a Delta table
empty_df.write.format("delta").mode("overwrite").save("abfss://1860xxxxxxxxxxx699b92e1@onelake.dfs.fabric.microsoft.com/a574xxxxxxxxxx18c7128f/Tables/SampleIngress")

spark.stop()


StatementMeta(, ea423724-f942-42cb-af3b-e247a854e1d3, 7, Finished, Available)

And now we can create a sample data to be inserted (ingressed) into delta table

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, IntegerType, StringType
from pyspark.sql.functions import col, current_timestamp
import random
import string

# Create a Spark session
spark = SparkSession.builder \
    .appName("CreateDeltaTableWithRandomData2") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .getOrCreate()

# Define the schema
schema = StructType([
    StructField("ID", LongType(), False),
    StructField("TimeIngress", TimestampType(), True),
    StructField("valOfIngress", IntegerType(), True),
    StructField("textIngress", StringType(), True)
])

# Generate and add random rows of data
random_data = [(id, None, random.randint(1, 100), ''.join(random.choices(string.ascii_letters, k=50))) for id in range(11, 21)]
random_df = spark.createDataFrame(random_data, schema=schema)

# Append the random data to the Delta table
random_df.write.format("delta").mode("append").save("abfss://1860bxxxxxxxx699b92e1@onelake.dfs.fabric.microsoft.com/a574d1xxxxxxxxx7128f/Tables/SampleIngress")

# Show the updated Delta table
delta_table = spark.read.format("delta").load("abfss://1860beeexxxxxxxxxx9b92e1@onelake.dfs.fabric.microsoft.com/a574d1xxxxxxxxx7128f/Tables/SampleIngress")
delta_table.show()

spark.stop()


StatementMeta(, 362ebd5c-404c-41d0-99a7-b7ba0435382c, 5, Finished, Available)

+---+-----------+------------+--------------------+
| ID|TimeIngress|valOfIngress|         textIngress|
+---+-----------+------------+--------------------+
| 11|       null|          24|SHqmnJsxKydOTZpKg...|
| 12|       null|           8|wBIkiaCBdQsZKhNfm...|
| 13|       null|          16|cZOFcgQtnsXrfGFtq...|
| 14|       null|          67|EuJryztxHbHVUmkhW...|
| 15|       null|           3|jNdlfQuUUpASGBgss...|
| 16|       null|          42|zDHHlaQlFdhBRjXPA...|
| 17|       null|          44|kXVJNHXEVkUgDYkRo...|
| 18|       null|          41|MHFPhtckfWfuBRRCH...|
| 19|       null|          55|lzOToCPWfWDqAQawq...|
| 20|       null|          69|iZweTDTHdEThrxomd...|
| 11|       null|          11|oyttxDujEVvOamNNP...|
| 12|       null|          36|CdXJbJQkWpVGpZkMs...|
| 13|       null|          86|yQUdgISLrfGPFRNLZ...|
| 14|       null|          52|zSsAqKKvbSDnzXity...|
| 15|       null|          17|KRpsNvegBcCQNgbLY...|
| 16|       null|          41|BeJnloqthCOOCcNgS...|
| 17|       