In [1]:
# Test Data Insert
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from datetime import datetime

# Define schema based on your table
schema = StructType([
    StructField("coin_id", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("name", StringType(), True),
    StructField("current_price", DoubleType(), True),
    StructField("market_cap", LongType(), True),
    StructField("price_change_24h", DoubleType(), True),
    StructField("price_change_percentage_24h", DoubleType(), True),
    StructField("last_updated", StringType(), True),
    StructField("ingestion_timestamp", TimestampType(), True),
])

# Create a test row with a 15% spike
data = [[
    "testcoin", "TST", "Test Coin", 9999.99, 50000000, 1300.0, 18.0,
    datetime.utcnow().isoformat(), datetime.utcnow()
]]

# Create DataFrame and write to Delta table
test_df = spark.createDataFrame(data, schema=schema)
test_df.write.mode("append").format("delta").saveAsTable("coin_price_data")

print("✅ Test spike row inserted into Lakehouse.")


StatementMeta(, a17205c6-9387-4fb3-99f5-3943430fd99f, 3, Finished, Available, Finished)

✅ Test spike row inserted into Lakehouse.


In [5]:
df = spark.sql("SELECT * FROM CryptoLakehouse.dbo.coin_price_data")
display(df)

StatementMeta(, c0159674-d5eb-43e2-a316-0b9122d338ee, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 33f0500c-8ec2-4360-a19d-8356260c45fe)

In [3]:
df = spark.sql("SELECT Count(*) FROM CryptoLakehouse.dbo.coin_price_data")
display(df)

StatementMeta(, c0159674-d5eb-43e2-a316-0b9122d338ee, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 459a0fe2-6e38-467a-a0fa-334c850ed7aa)

In [6]:
df.printSchema()

StatementMeta(, 79832564-0f45-46fd-9f44-d81a1b2eb7ce, 8, Finished, Available, Finished)

root
 |-- coin_id: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- current_price: double (nullable = true)
 |-- market_cap: long (nullable = true)
 |-- price_change_24h: double (nullable = true)
 |-- price_change_percentage_24h: double (nullable = true)
 |-- last_updated: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)



In [6]:
#Clean-Up Script: Remove Fictitious Testcoin Rows

# backup before cleaning
df.write.mode("overwrite").format("delta").saveAsTable("coin_price_data_backup")

# Load the table
df = spark.table("coin_price_data")

# Filter out rows where coin_id or symbol is 'testcoin' or 'TST'
cleaned_df = df.filter(~(
    (df.coin_id == "testcoin") |
    (df.symbol == "TST")
))

# Overwrite the table with only real data
cleaned_df.write.mode("overwrite").format("delta").saveAsTable("coin_price_data")

print("✅ Test data (testcoin) removed successfully from coin_price_data.")


StatementMeta(, c0159674-d5eb-43e2-a316-0b9122d338ee, 8, Finished, Available, Finished)

✅ Test data (testcoin) removed successfully from coin_price_data.
