##### **IMDB API  Delta table**

In [124]:
# Import required libraries
import requests
import pandas as pd
from pyspark.sql import SparkSession

# OMDb API configuration
api_key = "22345a37"
movie_titles = ["Inception", "The Matrix", "The Dark Knight", "Interstellar", "Avengers: Endgame"]

# Collect movie data
movie_data = []
for title in movie_titles:
    url = f"http://www.omdbapi.com/?t={title}&apikey={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        movie_data.append(data)
    else:
        print(f"Failed to fetch data for {title}")

# Convert to DataFrame
df = pd.DataFrame(movie_data)

# Convert pandas to Spark
spark_df = SparkSession.builder.getOrCreate().createDataFrame(df)

# Save to Lakehouse Bronze as Delta Table
spark_df.write.mode("overwrite").format("delta").saveAsTable("bronze_omdb_api_movies")

# Optional: Show some data
display(spark_df.select("Title", "Year", "imdbRating", "Genre"))


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 127, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a4d710d9-6f4c-45a3-9b02-847c32e75835)

In [125]:
df_movies = spark.read.table("silver_movies_cleaned")
df_sample_ids = df_movies.select("ID").dropna().limit(200)

# Convert to Pandas for API looping
imdb_ids = [row["ID"] for row in df_sample_ids.collect()]

import requests
import pandas as pd
import time

OMDB_API_KEY = "87f56696"  
OMDB_API_URL = "http://www.omdbapi.com/"

fetched_data = []

for imdb_id in imdb_ids:
    params = {"apikey": OMDB_API_KEY, "i": imdb_id}
    response = requests.get(OMDB_API_URL, params=params)

    if response.status_code == 200:
        data = response.json()
        if data.get("Response") == "True":
            fetched_data.append({
                "imdb_id": imdb_id,
                "director": data.get("Director"),
            })
        else:
            print(f"Movie not found for {imdb_id}")
    else:
        print(f"Error fetching {imdb_id}: {response.status_code}")
    
    time.sleep(0.2)  # Be nice to the API

# Convert to DataFrame
omdb_df = pd.DataFrame(fetched_data)



StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 128, Finished, Available, Finished)

In [126]:
# Convert to Spark and save as bronze
spark_omdb_df = spark.createDataFrame(omdb_df)
spark_omdb_df.write.mode("overwrite").format("delta").saveAsTable("bronze_omdb_api_ratings")

display(spark_omdb_df)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 129, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c535fb8d-b3c0-4fd4-9994-8d3b9a23c781)

In [127]:
import pandas as pd
# Load data into pandas DataFrame from "/lakehouse/default/Files/movie_metadata.csv"
df = pd.read_csv("/lakehouse/default/Files/movie_metadata.csv")
display(df)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 130, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8284f4ce-e798-41c1-8f17-43b804358c90)

##### **Simulated SQL-like Table**

In [128]:
from pyspark.sql.functions import col
import random
from datetime import datetime, timedelta
import pandas as pd

# Step 1: Load all movies from silver layer
df_movies = spark.read.table("silver_movies_cleaned").select("ID", "Title")
movie_list = df_movies.toPandas()

# Step 2: Generate synthetic reviews (1–3 per movie)
user_ids = [f"user_{i}" for i in range(1, 101)]
ratings_data = []

for index, row in movie_list.iterrows():
    imdb_id = row["ID"]
    title = row["Title"]
    num_reviews = random.randint(1, 3)  # AT MOST 3 reviews per movie

    for _ in range(num_reviews):
        user = random.choice(user_ids)
        rating = random.randint(1, 10)
        days_ago = random.randint(0, 1000)
        timestamp = (datetime.now() - timedelta(days=days_ago)).isoformat()

        ratings_data.append({
            "user_id": user,
            "imdb_id": imdb_id,
            "movie_title": title,
            "rating": rating,
            "timestamp": timestamp
        })

# Step 3: Write to Lakehouse as bronze_user_ratings
ratings_df = pd.DataFrame(ratings_data)
spark_df_ratings = spark.createDataFrame(ratings_df)
spark_df_ratings.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("bronze_user_ratings")

# Display result
display(spark_df_ratings)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 131, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7c20df62-967b-46e3-9773-19760bbeef00)

In [129]:
import random
from datetime import datetime, timedelta
import pandas as pd

# Simulate users and movie titles
user_ids = [f"user_{i}" for i in range(1, 101)]
movie_titles = ["Inception", "The Matrix", "The Dark Knight", "Interstellar", "Avengers: Endgame"]

# Generate 500 synthetic ratings
ratings_data = []
for _ in range(500):
    user = random.choice(user_ids)
    movie = random.choice(movie_titles)
    rating = random.randint(1, 10)
    days_ago = random.randint(0, 1000)
    timestamp = (datetime.now() - timedelta(days=days_ago)).isoformat()

    ratings_data.append({
        "user_id": user,
        "movie_title": movie,
        "rating": rating,
        "timestamp": timestamp
    })

# Convert to DataFrame
ratings_df = pd.DataFrame(ratings_data)

# Convert to Spark and write to Lakehouse
spark_df_ratings = spark.createDataFrame(ratings_df)
spark_df_ratings.write.mode("overwrite").format("delta").saveAsTable("bronze_user_ratings")

display(spark_df_ratings)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 132, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 938c79c0-e68d-4088-91d8-aa22cad42b7c)

### Loading CSV to Delta table

In [130]:
df_movies = spark.read.format("csv").option("header","true").load("Files/movie_metadata.csv")
# df now is a Spark DataFrame containing CSV data from "Files/movie_metadata.csv".
display(df_movies)
df_movies.write.mode("overwrite").format("delta").saveAsTable("bronze_movies_metadata")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 133, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 7f6beabd-c876-4e3a-8aa6-15add36e956f)

#### Creating Silver Movies Cleaned

In [131]:
import pandas as pd
import random
from datetime import datetime, timedelta

# Prendre des imdb_id depuis dim_movie
movies = spark.read.table("dim_movie").select("ID").limit(100)
movie_ids = [row["ID"] for row in movies.collect() if row["ID"] is not None]

# Simuler 500 notations
ratings_data = []
for i in range(500):
    ratings_data.append({
        "user_id": f"user_{random.randint(1, 20)}",
        "imdb_id": random.choice(movie_ids),
        "rating": random.randint(1, 10),
        "timestamp": datetime.now() - timedelta(days=random.randint(0, 365))
    })

ratings_df = pd.DataFrame(ratings_data)
spark_df = spark.createDataFrame(ratings_df)

# Écriture dans bronze_user_ratings
spark_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("bronze_user_ratings")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 134, Finished, Available, Finished)

In [132]:
df = spark.sql("SELECT * FROM MovieAnalyticsLakehouse.silver_movies_cleaned LIMIT 1000")
display(df)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 135, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1f0a36e8-dd6e-41f4-bc85-bb21707e2b44)

#### Silver User ratings

In [133]:
df_bronze = spark.read.table("bronze_user_ratings")
df_silver = df_bronze.select(
    col("user_id").cast("string"),
    col("imdb_id").cast("string"),
    col("rating").cast("double"),
    to_timestamp("timestamp").alias("rating_timestamp")
)
df_silver.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("silver_user_ratings")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 136, Finished, Available, Finished)

In [134]:
spark.read.table("bronze_omdb_api_ratings").printSchema()


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 137, Finished, Available, Finished)

root
 |-- imdb_id: string (nullable = true)
 |-- director: string (nullable = true)



In [135]:
df_omdb = spark.read.table("bronze_omdb_api_ratings")

silver_omdb = df_omdb.select(
    col("imdb_id"),
    col("director")
).dropna(subset=["imdb_id"])

silver_omdb.write.mode("overwrite").format("delta").saveAsTable("silver_omdb_api")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 138, Finished, Available, Finished)

In [136]:
df = spark.sql("SELECT * FROM MovieAnalyticsLakehouse.silver_movies_cleaned LIMIT 1000")
display(df)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 139, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1d09bd92-df15-4a7c-9f07-035fd2d95368)

In [137]:
df = spark.sql("SELECT * FROM MovieAnalyticsLakehouse.bronze_omdb_api_movies LIMIT 1000")
display(df)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 140, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 11613a24-1686-48a7-8fcb-082e6475810f)

In [138]:
spark.read.table("silver_movies_cleaned").printSchema()


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 141, Finished, Available, Finished)

root
 |-- ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Budget: integer (nullable = true)
 |-- Revenue: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- IMDB: double (nullable = true)



In [139]:
movies_dim = spark.sql("""
SELECT 
    ID AS movie_key,
    Title AS title,
    Year AS release_year,
    Language AS language,
    Duration AS duration,
    Budget AS budget,
    Revenue AS revenue,
    IMDB AS imdb_rating
FROM silver_movies_cleaned
""")

movies_dim.write.mode("overwrite").format("delta").saveAsTable("gold_movies_dim")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 142, Finished, Available, Finished)

In [140]:
df = spark.sql("SELECT * FROM MovieAnalyticsLakehouse.silver_user_ratings")
display(df)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 143, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e0bcbb7f-b25e-43bc-baf4-fc54024307e5)

### `dim_movie` with CSD 2


In [141]:
from pyspark.sql.functions import current_timestamp, lit, monotonically_increasing_id, col

# Step 1: Load both dataframes
df_movies = spark.read.table("silver_movies_cleaned")
df_omdb = spark.read.table("bronze_omdb_api_ratings")

# Step 2: Join on IMDb ID
df_joined = df_movies.join(
    df_omdb,
    df_movies["ID"] == df_omdb["imdb_id"],
    how="left"
)

# Step 3: Add SCD2 metadata
df_dim_movie = df_joined.withColumn("movie_sk", monotonically_increasing_id()) \
    .withColumn("start_date", current_timestamp()) \
    .withColumn("end_date", lit(None).cast("timestamp")) \
    .withColumn("is_current", lit(True))

# Step 4: Write to delta table
df_dim_movie.write.mode("overwrite").format("delta").saveAsTable("dim_movie")

# Optional: Display a preview
display(df_dim_movie.select("movie_sk", "Title", "director", "start_date", "is_current"))


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 144, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a70d20e7-cb4a-4f1c-bf43-461d7f56d678)

### `dim_user` with CSD 2


In [142]:
from pyspark.sql.functions import col

df_ratings = spark.read.table("silver_user_ratings")
df_users = df_ratings.select("user_id").distinct()

from pyspark.sql.functions import monotonically_increasing_id, expr
import random

# Generate random countries (or use a fixed list)
countries = ["USA", "UK", "India", "Canada", "Germany"]

# Register as a temporary view to use SQL to add synthetic data
df_users.createOrReplaceTempView("temp_users")

# Random registration date in the past ~3 years
df_users_meta = spark.sql(f"""
SELECT 
  monotonically_increasing_id() AS user_sk,
  user_id,
  date_sub(current_date(), CAST(rand() * 1000 AS INT)) AS registration_date,
  '{random.choice(countries)}' AS country
FROM temp_users
""")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 145, Finished, Available, Finished)

In [143]:
df_users_meta.write.mode("overwrite").format("delta").saveAsTable("dim_user")

# Optional preview
display(df_users_meta)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 146, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e66551c8-bdfa-4c81-86dd-7d0ea6971c85)

In [144]:
tables = [
    "bronze_user_ratings",
    "bronze_omdb_api_ratings",
    "silver_movies_cleaned",
    "silver_user_ratings",
    "dim_user",
    "dim_movie",
    "fact_movie_ratings"
]

for table in tables:
    print(f"--- {table} ---")
    try:
        df = spark.read.table(table)
        df.printSchema()
        df.show(3, truncate=False)
    except Exception as e:
        print(f"Could not load {table}: {e}")
    print("\n\n")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 147, Finished, Available, Finished)

--- bronze_user_ratings ---
root
 |-- user_id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+-------+---------+------+--------------------------+
|user_id|imdb_id  |rating|timestamp                 |
+-------+---------+------+--------------------------+
|user_10|tt0999913|4     |2024-07-05 20:46:29.750908|
|user_19|tt1068680|1     |2025-03-02 20:46:29.75091 |
|user_19|tt0134084|8     |2025-01-08 20:46:29.750912|
+-------+---------+------+--------------------------+
only showing top 3 rows




--- bronze_omdb_api_ratings ---
root
 |-- imdb_id: string (nullable = true)
 |-- director: string (nullable = true)

+---------+------------------+
|imdb_id  |director          |
+---------+------------------+
|tt0473075|Mike Newell       |
|tt1663662|Guillermo del Toro|
|tt1399103|Michael Bay       |
+---------+------------------+
only showing top 3 rows




--- silver_movies_cleaned ---
root
 |-- 

In [145]:
# When selecting or transforming:
df_movies = df_movies.withColumnRenamed("ID", "imdb_id")

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 148, Finished, Available, Finished)

In [146]:
from pyspark.sql.functions import col

# Load data
df_ratings = spark.read.table("silver_user_ratings")
df_users = spark.read.table("dim_user").select("user_id", "user_sk")
df_movies = spark.read.table("dim_movie").select("imdb_id", "movie_sk")

# Join
df_fact = df_ratings.join(df_users, on="user_id", how="inner") \
                    .join(df_movies, on="imdb_id", how="inner")

# Final selection
fact_movie_ratings = df_fact.select(
    "user_sk", "movie_sk", "rating", col("rating_timestamp").alias("rating_time")
)

# Save
fact_movie_ratings.write.mode("overwrite").format("delta").saveAsTable("fact_movie_ratings")


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 149, Finished, Available, Finished)

In [147]:
from pyspark.sql.functions import col

# Read current dim_movie
dim_movie = spark.read.table("dim_movie")

# Drop redundant ID column
dim_movie_cleaned = dim_movie.drop("ID")

# Overwrite the table
dim_movie_cleaned.write.mode("overwrite").format("delta").saveAsTable("dim_movie")




StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 150, Finished, Available, Finished)

In [148]:
spark.read.table("silver_user_ratings").printSchema()


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 151, Finished, Available, Finished)

root
 |-- user_id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- rating_timestamp: timestamp (nullable = true)



In [149]:
from pyspark.sql import functions as F

# Charger la table silver_user_ratings
df_ratings = spark.read.table("silver_user_ratings")

# Agréger par imdb_id
gold_rating_stats = df_ratings.groupBy("imdb_id") \
    .agg(
        F.avg("rating").alias("avg_rating"),
        F.count("rating").alias("rating_count")
    )

# Sauvegarder dans la couche Gold
gold_rating_stats.write.mode("overwrite").format("delta").saveAsTable("gold_movie_rating_stats")

# Afficher un aperçu
display(gold_rating_stats)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 152, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d04c89d3-8dda-4746-88e8-0c1c0f8aa554)

In [150]:
from pyspark.sql import functions as F

# Charger les stats depuis la table précédente
df_stats = spark.read.table("gold_movie_rating_stats")

# Filtrer les films avec au moins 20 votes et trier par note moyenne
gold_top_movies = df_stats \
    .filter(F.col("rating_count") >= 20) \
    .orderBy(F.col("avg_rating").desc()) \
    .limit(50)

# Écrire dans la couche Gold
gold_top_movies.write.mode("overwrite").format("delta").saveAsTable("gold_top_movies")

# Aperçu
display(gold_top_movies)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 153, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 46923e81-9ca7-44ba-b258-0c71e81fa8a8)

In [151]:
from pyspark.sql import functions as F

# Charger les notations
df_ratings = spark.read.table("silver_user_ratings")

# Agréger par utilisateur
gold_user_activity = df_ratings.groupBy("user_id").agg(
    F.count("*").alias("total_ratings"),
    F.max("rating_timestamp").alias("last_activity")
)

# Sauvegarder en Gold
gold_user_activity.write.mode("overwrite").format("delta").saveAsTable("gold_user_activity")

# Aperçu
display(gold_user_activity)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 154, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c9d0c4cc-bfab-4990-9916-f5868b4fadaa)

In [152]:
from pyspark.sql.functions import monotonically_increasing_id

# Lire les users distincts
df_ratings = spark.read.table("silver_user_ratings")
distinct_users = df_ratings.select("user_id").distinct()

# Ajouter une clé surrogate
dim_user = distinct_users.withColumn("user_sk", monotonically_increasing_id())

# Sauvegarde
dim_user.write.mode("overwrite").format("delta").saveAsTable("dim_user")
display(dim_user)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 155, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5a1e1c23-5743-4bd7-807c-0cb58f5f8fc7)

In [153]:
# Lire les films distincts
df_movies = spark.read.table("silver_movies_cleaned").select("ID").dropna().distinct()

# Ajouter une clé technique
dim_movie = df_movies.withColumn("movie_sk", monotonically_increasing_id())

# Sauvegarde
dim_movie.write.mode("overwrite").format("delta").saveAsTable("dim_movie")
display(dim_movie)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 156, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 477c4766-eec2-446b-a3f5-8db4a181bd1e)

In [154]:
spark.read.table("fact_movie_ratings").show()


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 157, Finished, Available, Finished)

+-------+--------+------+--------------------+
|user_sk|movie_sk|rating|         rating_time|
+-------+--------+------+--------------------+
|     16|       3|   3.0|2024-12-20 20:46:...|
|     10|       3|   9.0|2025-01-03 20:46:...|
|     15|      34|   4.0|2024-07-30 20:46:...|
|      3|    4679|   9.0|2025-03-04 20:46:...|
|      3|      37|   9.0|2025-03-04 20:46:...|
|      6|     186|   6.0|2025-03-09 20:46:...|
|     11|     156|   6.0|2024-05-25 20:46:...|
|      7|    4679|   6.0|2024-07-01 20:46:...|
|      7|      37|   6.0|2024-07-01 20:46:...|
|      9|      88|   9.0|2024-12-10 20:46:...|
|     17|    4679|   7.0|2024-09-06 20:46:...|
|     17|      37|   7.0|2024-09-06 20:46:...|
|     19|      34|   3.0|2024-06-18 20:46:...|
|      4|      88|   7.0|2024-10-23 20:46:...|
|      6|     186|   7.0|2024-08-17 20:46:...|
|      1|     186|   4.0|2025-03-18 20:46:...|
|      9|      88|  10.0|2025-04-28 20:46:...|
|      2|     156|   9.0|2024-11-13 20:46:...|
|     15|    

In [155]:
print("Total lignes dans silver_user_ratings :", df_ratings.count())
print("Total lignes après join avec dim_user :", df_ratings.join(df_users, "user_id").count())
print("Total lignes après join complet :", df_fact.count())


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 158, Finished, Available, Finished)

Total lignes dans silver_user_ratings : 500
Total lignes après join avec dim_user : 500
Total lignes après join complet : 0


In [156]:
spark.read.table("silver_user_ratings").select("imdb_id").distinct().show(10)
spark.read.table("dim_movie").select("ID").distinct().show(10)

StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 159, Finished, Available, Finished)

+---------+
|  imdb_id|
+---------+
|tt1031969|
|tt1637706|
|tt0383028|
|tt1499658|
|tt0121766|
|tt0107840|
|tt0244000|
|tt0810922|
|tt0046949|
|tt4707756|
+---------+
only showing top 10 rows

+---------+
|       ID|
+---------+
|tt0121766|
|tt1386588|
|tt0449010|
|tt0133240|
|tt0119528|
|tt0363988|
|tt1499658|
|tt0244000|
|tt0241025|
|tt3531824|
+---------+
only showing top 10 rows



In [157]:
from pyspark.sql.functions import col

# Lire les tables
df_ratings = spark.read.table("silver_user_ratings")
df_users = spark.read.table("dim_user").select("user_id", "user_sk")
df_movies = spark.read.table("dim_movie").select(col("ID").alias("imdb_id"), "movie_sk")

# Join avec les dimensions
df_fact = df_ratings \
    .join(df_users, on="user_id", how="inner") \
    .join(df_movies, on="imdb_id", how="inner")

# Sélectionner les colonnes finales
fact_movie_ratings = df_fact.select(
    "user_sk",
    "movie_sk",
    col("rating").cast("double").alias("rating"),
    col("rating_timestamp").alias("rating_time")
)

# Écrire
fact_movie_ratings.write.mode("overwrite").format("delta").saveAsTable("fact_movie_ratings")
display(fact_movie_ratings)


StatementMeta(, 09b18325-c428-4bb6-bee5-c7c85d58b277, 160, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8a30cdbf-3990-4d44-ae94-c182c6fe475a)