# Enrich Data and Build Star Schema

In [5]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, dayofmonth, month, year, weekofyear, dayofweek, monotonically_increasing_id, when, date_format, rand
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
import pandas as pd
from openai import AzureOpenAI

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 9, Finished, Available)

In [6]:
#Azure AI Services
AI_SERVICE_ENDPOINT = "https://fabrichackathonopenai2.openai.azure.com/"
#AI_SERVICE_KEY = <keyvault>
AI_SERVICE_LOCATION = "swedencentral"
AI_SERVICE_DEPLOYMENT = "gpt4"
AI_SERVICE_API = "2023-12-01-preview"

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 10, Finished, Available)

In [7]:
df_enriched_groceries = spark.sql("SELECT * FROM lh_silver.enriched_groceries")
display(df_enriched_groceries)

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 11, Finished, Available)

SynapseWidget(Synapse.DataFrame, 847717c6-3b51-4403-bb60-0437ef6ec503)

## Dim Date

In [8]:
# Define the date range
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 12, 31)
date_list = [(start_date + timedelta(days=x)).date() for x in range((end_date - start_date).days + 1)]

# Create DataFrame with the date range
df_date = spark.createDataFrame(date_list, DateType()).toDF("date")

# Extract date parts
df_date = df_date.withColumn("year", year(col("date")))\
       .withColumn("month", month(col("date")))\
       .withColumn("day", dayofmonth(col("date")))\
       .withColumn("week_of_year", weekofyear(col("date")))\
       .withColumn("day_of_week", dayofweek(col("date")))
       # Add more columns as needed

# Write DataFrame into lakehouse
df_date.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Tables/dim_date")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 12, Finished, Available)

## Dim Time of Day

In [9]:
# Generate all times of the day with Pandas
times = pd.date_range("00:00:00", "23:59:59", freq="S").time

# Create a DataFrame with all components
data = [(t.strftime("%H%M%S"), t.hour, t.minute, t.second) for t in times]
columns = ["time_key", "hour", "minute", "second"]

df_time_of_day = spark.createDataFrame(data, columns)

# Write DataFrame into lakehouse
df_time_of_day.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Tables/dim_time_of_day")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 13, Finished, Available)

## Dim Product

In [10]:
# Extract distinct products 
df_product = df_enriched_groceries \
    .select(col('product_name'), col('brand_name')) \
    .distinct()

# Generate key column
df_product = df_product.select(monotonically_increasing_id().alias("product_key"),"*")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 14, Finished, Available)

In [11]:
# Create a list of distinct products names for GPT-4
product_df = df_product.select("product_name").toPandas()
product_list = product_df['product_name'].tolist()
product_list = list(set(product_list))
product_list_string = ', '.join(product_list)

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 15, Finished, Available)

In [12]:
type(product_list_string)

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 16, Finished, Available)

str

In [13]:
# Ask GPT-4 to suggest categories

client = AzureOpenAI(
    api_key=AI_SERVICE_KEY,  
    api_version=AI_SERVICE_API,
    base_url=f"{AI_SERVICE_ENDPOINT}openai/deployments/{AI_SERVICE_DEPLOYMENT}",
)

response = client.chat.completions.create(
  model="gpt-4",
  messages=[
    {
      "role": "user",
      "content": [
        {"type": "text", "text": 
          f"""
          We are creating a dimension table for a Power BI report. The dimensions table includes until now only product names.
          Based on the product names it is your task to build five categories and give one category to each product.
          Furthermore, you should estimate the country of origin of the product.
          The list of products can be seen here: 

          {product_list_string}

          Return the result in a JSON format with product_name and category. The format should be [{{"product_name": "xxx", "category": "yyy", "origin": "zzz"}}, {{"product_name": "xxx", "category": "yyy", "origin": "zzz"}}]
          As the data has to be processed later as a JSON, please do not add any comments.

          """},
      ],
    }
  ],
  max_tokens=5000,
)

products_with_categories = response.choices[0].message.content

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 17, Finished, Available)

In [14]:
# Enrich product data with a category
df_pd_category = pd.read_json(products_with_categories)
df_category = spark.createDataFrame(df_pd_category)
df_product = df_product.alias("product") \
    .join(df_category.alias("category"), col("product.product_name") == col("category.product_name"), "left") \
    .select( \
        col("product.*") \
        ,col("category.category").alias("category_name") \
        ,col("category.origin").alias("product_origin") \
        )

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 18, Finished, Available)

In [15]:
df_product.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Tables/dim_product")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 19, Finished, Available)

## Dim User

In [16]:
# Data to be included in the DataFrame
data = [Row(used_key=1, user_name="Daniel"),
        Row(used_key=2, user_name="Luca"),
        Row(used_key=3, user_name="Robin")]

# Creating DataFrame from the data
df_user = spark.createDataFrame(data)

# Write DataFrame into lakehouse
df_user.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Tables/dim_user")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 20, Finished, Available)

## Fact Groceries

In [17]:
# Extract distinct products 
df_groceries = df_enriched_groceries.alias("groceries") \
    .join(df_product.alias("product"), df_enriched_groceries.product_name == df_product.product_name, "left") \
    .join(df_date.alias("date"), df_enriched_groceries.timestamp == df_date.date, "left") \
    .join(df_time_of_day.alias("time"), date_format(df_enriched_groceries.timestamp, "HHmmss") == df_time_of_day.time_key, "left") \
    .select(
        col("date.date").alias("date_key") \
        ,col("time.time_key") \
        ,"product.product_key" \
        ,(col("groceries.kg_co2_per_metric_value")).cast("decimal(20,2)") \
        ,col("groceries.metric_value") \
        ,col("groceries.image_url")
        )

df_groceries = df_groceries.withColumn("user_key", (rand() * 3).cast("int") + 1)

# Write DataFrame into lakehouse
df_groceries.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("Tables/fact_groceries")

StatementMeta(, 3bbf3926-ffdf-4c55-bb96-e7e797fac607, 21, Finished, Available)