Use Synapse Link for Cosmos DB to load data from Cosmos DB (an operational system) to a Spark DataFrame for in-memory analysis with the Spark engine.

In [1]:
df_products = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "RetailSalesDemoDB") \
    .option("spark.cosmos.container", "Products") \
    .load()

StatementMeta(livedemo, 12, 1, Finished, Available)



In [2]:
df_retailsales = spark.read \
    .format("cosmos.olap") \
    .option("spark.synapse.linkedService", "RetailSalesDemoDB") \
    .option("spark.cosmos.container", "RetailSales") \
    .load()

StatementMeta(livedemo, 12, 2, Finished, Available)



In the Cosmos DB database, the `Products` container contains very few items; it is a lookup table. As a result, we leverage the `broadcast()` function to replicate its DataFrame across all nodes in the cluster. This enables local joins. Using `broadcast()` is a powerful optimization when joining with smaller DataFrames.

Besides the `broadcast()` keyword, the Apache Spark engine is performing a standard left join on the `productCode` column.

In [3]:
from pyspark.sql.functions import broadcast

df = df_retailsales.join(broadcast(df_products), on=['productCode'], how='left')

StatementMeta(livedemo, 12, 3, Finished, Available)



Since we are working with DataFrames, standard manipulation is possible, like dropping Cosmos DB system-provided keys that are irrelevant to the data engineering task from the DataFrame.

In [5]:
# Prior to data cleansing
display(df.limit(10))

StatementMeta(livedemo, 12, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 071ff296-1013-4627-b576-d79574c5df6b)

In [None]:
df = df.drop("_rid", "_ts", "_etag", "id")

# Post-data cleansing
display(df.limit(10))

Besides dropping unnecessary DataFrame columns, Data Engineers also ensure that the proper data types are used.

In [6]:
from pyspark.sql.types import IntegerType, DoubleType, FloatType, DateType
from pyspark.sql.functions import to_date

clean_df = df.withColumn("basePrice", df["basePrice"].cast(FloatType())) \
    .withColumn("quantity", df["quantity"].cast(FloatType())) \
    .withColumn("price", df["price"].cast(FloatType())) \
    .withColumn("logQuantity", df["logQuantity"].cast(FloatType())) \
    .withColumn("wholeSaleCost", df["wholeSaleCost"].cast(FloatType())) \
    .withColumn("weekStarting", to_date(df["weekStarting"], 'mm/dd/yyyy'))

StatementMeta(livedemo, 12, 6, Finished, Available)



It is also possible to create derived columns. They are based on existing data in the DataFrame. In this case, the `sales` column references the `price` and `quantity` columns in the existing DataFrame.

In [7]:
feature_df = clean_df.withColumn("sales", (clean_df.price * clean_df.quantity))

StatementMeta(livedemo, 12, 7, Finished, Available)



In [8]:
feature_df.printSchema()

StatementMeta(livedemo, 12, 8, Finished, Available)

root
 |-- productCode: string (nullable = true)
 |-- logQuantity: float (nullable = true)
 |-- quantity: float (nullable = true)
 |-- price: float (nullable = true)
 |-- advertising: long (nullable = true)
 |-- storeId: long (nullable = true)
 |-- weekStarting: date (nullable = true)
 |-- wholeSaleCost: float (nullable = true)
 |-- basePrice: float (nullable = true)
 |-- sales: float (nullable = true)

Note that you can use the built-in visualization tool of Synapse Notebooks to visualize result sets. To generate your own line chart, use the following parameters:

- **Chart type**: Line chart
- **Key**: `weekStarting`
- **Values**: `quantity`
- **Series Group**: `productCode`
- **Aggregation**: Sum

In [9]:
display(feature_df)

StatementMeta(livedemo, 12, 9, Finished, Available)

SynapseWidget(Synapse.DataFrame, 41b4ee03-f19b-4070-a943-a2c6e003b414)

As shown previously, you can write Apache Spark DataFrames as tables that can be  queried through the SQL magic. In this case, the `SurfaceSales_FeatureDF` table is present in the `default` database.

In [10]:
feature_df.write.mode("overwrite").saveAsTable("SurfaceSales_FeatureDF")

StatementMeta(livedemo, 12, 10, Finished, Available)



In [11]:
%%sql

SELECT productCode, SUM(sales), weekStarting
FROM surfacesales_featuredf
GROUP BY productCode, weekStarting

StatementMeta(livedemo, 12, 11, Finished, Available)

<Spark SQL result set with 261 rows and 3 fields>

With Synapse Link, you do not need to first write data from an operational store to a file system, and then query it with Apache Spark. The integration is natively available to Apache Spark pools. 