**Ingest data with Spark and Microsoft Fabric notebooks**

load external data

In [1]:
# Azure Blob Storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"

# Construct connection path
wasbs_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}'
print(wasbs_path)

# Read parquet data from Azure Blob Storage path
blob_df = spark.read.parquet(wasbs_path)

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 3, Finished, Available)

wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow


In [2]:
# Declare file name    
file_name = "yellow_taxi"
    
 # Construct destination path
output_parquet_path = f"abfss://84f2dcc0-2b43-4893-bbd8-51b76625626f@onelake.dfs.fabric.microsoft.com/6e6651de-6c40-415a-83bb-02353b552127/Files/RawData/{file_name}"
print(output_parquet_path)
        
# Load the first 1000 rows as a Parquet file
blob_df.limit(1000).write.mode("overwrite").parquet(output_parquet_path)

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 4, Finished, Available)

abfss://84f2dcc0-2b43-4893-bbd8-51b76625626f@onelake.dfs.fabric.microsoft.com/6e6651de-6c40-415a-83bb-02353b552127/Files/RawData/yellow_taxi


**Transform and load data to a Delta table**

In [3]:
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month

# Read the parquet data from the specified path
raw_df = spark.read.parquet(output_parquet_path)   

# Add dataload_datetime column with current timestamp
filtered_df = raw_df.withColumn("dataload_datetime", current_timestamp())

# Filter columns to exclude any NULL values in storeAndFwdFlag
filtered_df = filtered_df.filter(raw_df["storeAndFwdFlag"].isNotNull())

# Load the filtered data into a Delta table
table_name = "yellow_taxi"  # Replace with your desired table name
filtered_df.write.format("delta").mode("append").saveAsTable(table_name)

# Display results
display(filtered_df.limit(1))

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, cf4a208e-2c77-4e62-a9d1-3c90df91fc7b)

**Optimize Delta table writes**

In [4]:
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month

# Read the parquet data from the specified path
raw_df = spark.read.parquet(output_parquet_path)    

# Add dataload_datetime column with current timestamp
opt_df = raw_df.withColumn("dataload_datetime", current_timestamp())

# Filter columns to exclude any NULL values in storeAndFwdFlag
opt_df = opt_df.filter(opt_df["storeAndFwdFlag"].isNotNull())

# Enable V-Order
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")

# Enable automatic Delta optimized write
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")

# Load the filtered data into a Delta table
table_name = "yellow_taxi_opt"  # New table name
opt_df.write.format("delta").mode("append").saveAsTable(table_name)

# Display results
display(opt_df.limit(1))

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 042042c2-b0c5-45d9-ad84-b7acd24d896b)

**Analyze Delta table data with SQL queries**

In [7]:
# Load table into df
delta_table_name = "yellow_taxi"
table_df = spark.read.format("delta").table(delta_table_name)

# Create temp SQL table
table_df.createOrReplaceTempView("yellow_taxi_temp")

# SQL Query
table_df = spark.sql('SELECT * FROM yellow_taxi_temp')

# Display 10 results
display(table_df.limit(10))

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 3264c7fa-9ebf-4fe5-8293-78220a9ebed1)

In [8]:
# Load table into df
delta_table_name = "yellow_taxi_opt"
opttable_df = spark.read.format("delta").table(delta_table_name)

# Create temp SQL table
opttable_df.createOrReplaceTempView("yellow_taxi_opt")

# SQL Query to confirm
opttable_df = spark.sql('SELECT * FROM yellow_taxi_opt')

# Display results
display(opttable_df.limit(10))

StatementMeta(, 7ad2ff89-e455-4495-9818-e631223bca89, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, ea3f9bba-7f33-46a5-bba7-59857ddb7f69)