WARSTWA ZŁOTA - ARCHITEKTURA MEDALIONU

Autor: Weronika Wąsikowska

Opis: Pełna implementacja warstwy złotej dla danych Stack Exchange w formacie Delta Lake

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, year, month, dayofmonth, dayofweek, col, date_format, hour, minute, second, explode, date_trunc
from functools import reduce
from pyspark.sql import DataFrame
import time

#Inicjalizacja środowiska
spark = SparkSession.builder.appName("GoldLayer").getOrCreate()

# Definicja ścieżek
silver_path = "dbfs:/FileStore/silver/"
gold_path = "dbfs:/FileStore/gold/"
csv_export_path = "dbfs:/FileStore/export/"

# Wczytanie danych z warstwy srebrnej
df_list_silver = {
    "badges": spark.read.format("delta").load(silver_path + "badges"),
    "comments": spark.read.format("delta").load(silver_path + "comments"),
    "post_history": spark.read.format("delta").load(silver_path + "post_history"),
    "post_links": spark.read.format("delta").load(silver_path + "post_links"),
    "posts": spark.read.format("delta").load(silver_path + "posts"),
    "tags": spark.read.format("delta").load(silver_path + "tags"),
    "users": spark.read.format("delta").load(silver_path + "users"),
    "votes": spark.read.format("delta").load(silver_path + "votes"),
}


In [0]:
# Definicja zapisu danych do warstwy złotej i csv
def save_table(df, table_name):
    delta_path = gold_path + table_name
    csv_path = "dbfs:/FileStore/export/" + table_name + ".csv"

    # Zapis do Delta Lake
    dbutils.fs.rm(delta_path, True)
    df.write.format("delta").mode("overwrite").save(delta_path)

    # Zapis do CSV
    dbutils.fs.rm(csv_path, True)
    df.coalesce(1).write.option("header", True).mode("overwrite").csv(csv_path)

In [0]:
# Tworzenie tabel wymiarów

# Wymiar użytkowników
dim_users = df_list_silver["users"].withColumnRenamed("id", "user_id").dropDuplicates(["user_id"])
save_table(dim_users, "dim_users")

# Wymiar tagów
dim_tags = df_list_silver["tags"].withColumnRenamed("id", "tag_id").dropDuplicates(["tag_id"])
save_table(dim_tags, "dim_tags")

# Wymiar czasu

# Zebranie dat z różnych tabel
dates_posts = df_list_silver["posts"].select(col("creation_date").alias("date"))
dates_votes = df_list_silver["votes"].select(col("creation_date").alias("date"))
dates_badges = df_list_silver["badges"].select(col("date").alias("date"))

# Połączenie wszystkich dat
all_dates = reduce(DataFrame.unionAll, [dates_posts, dates_votes, dates_badges])

# Zaokrąglenie do sekundy i usunięcie duplikatów
all_dates = all_dates.withColumn("date_rounded", date_trunc("second", col("date"))).drop("date").dropDuplicates(["date_rounded"])

# Tworzenie wymiaru czasu
dim_time = all_dates \
    .withColumn("event_date", col("date_rounded")) \
    .withColumn("year", year("event_date")) \
    .withColumn("month", month("event_date")) \
    .withColumn("day", dayofmonth("event_date")) \
    .withColumn("hour", hour("event_date")) \
    .withColumn("minute", minute("event_date")) \
    .withColumn("second", second("event_date")) \
    .withColumn("weekday", dayofweek("event_date")) \
    .withColumn("date_id", date_format("event_date", "yyyyMMddHHmmss")) \
    .withColumn("day_id", date_format("event_date", "yyyyMMdd")) \
    .drop("date_rounded")

# Zapis tabeli dim_time
save_table(dim_time, "dim_time")

# Wymiar typów głosów
dim_vote_type = df_list_silver["votes"].select("vote_type_id", "vote_type").dropDuplicates()
save_table(dim_vote_type, "dim_vote_type")

# Wymiar typów postów
dim_post_type = df_list_silver["posts"].select("post_type_id", "post_type").dropDuplicates()
save_table(dim_post_type, "dim_post_type")

# Wymiar klas odznak
dim_badge_class = df_list_silver["badges"].select("class", "class_name").dropDuplicates()
save_table(dim_badge_class, "dim_badge_class")

In [0]:
from pyspark.sql.functions import count

dim_time.groupBy("date_id") \
    .agg(count("*").alias("cnt")) \
    .filter("cnt > 1") \
    .show()


+-------+---+
|date_id|cnt|
+-------+---+
+-------+---+



In [0]:
# Tworzenie tabel faktów

# Tabela faktów votes
fact_votes = df_list_silver["votes"].select("id", "post_id", "user_id", "vote_type_id", "creation_date", "bounty_amount", "is_potential_duplicate")
save_table(fact_votes, "fact_votes")

# Tabela faktów posts
fact_posts = df_list_silver["posts"].select("id", "post_type_id", col("owner_user_id").alias("user_id"), "creation_date", "score", "view_count", "favorite_count", "answer_count", "comment_count")
save_table(fact_posts, "fact_posts")

# Tabela faktów badges
fact_badges = df_list_silver["badges"].select("id", col("user_id").alias("user_id"), "name", "class", "tag_based", "date")
save_table(fact_badges, "fact_badges")

In [0]:
# Tworzenie tabeli mostowej bridge_post_tags

post_tags = df_list_silver["posts"].select("id", "tags_array") \
    .withColumn("tag", explode("tags_array")) \
    .select(col("id").alias("post_id"), col("tag"))
save_table(post_tags, "bridge_post_tags")

In [0]:
# Walidacja danych 

from pyspark.sql.utils import AnalysisException

# Ścieżka do warstwy złotej
gold_path = "dbfs:/FileStore/gold/"

# Lista zapisanych tabel w warstwie złotej
gold_tables = [
    "fact_posts",
    "fact_votes",
    "fact_badges",
    "dim_users",
    "dim_tags",
    "dim_vote_type",
    "dim_post_type",
    "dim_badge_class",
    "dim_time",
    "post_tags"
]

# Wczytaj każdą tabelę i sprawdź liczbę rekordów
for table in gold_tables:
    try:
        df = spark.read.format("delta").load(gold_path + table)
        count = df.count()
        print(f" Tabela '{table}' istnieje. Liczba rekordów: {count}")
    except AnalysisException:
        print(f" Tabela '{table}' nie została znaleziona.")


 Tabela 'fact_posts' istnieje. Liczba rekordów: 64988
 Tabela 'fact_votes' istnieje. Liczba rekordów: 688874
 Tabela 'fact_badges' istnieje. Liczba rekordów: 159272
 Tabela 'dim_users' istnieje. Liczba rekordów: 75729
 Tabela 'dim_tags' istnieje. Liczba rekordów: 4774
 Tabela 'dim_vote_type' istnieje. Liczba rekordów: 14
 Tabela 'dim_post_type' istnieje. Liczba rekordów: 6
 Tabela 'dim_badge_class' istnieje. Liczba rekordów: 3
 Tabela 'dim_time' istnieje. Liczba rekordów: 199310
 Tabela 'post_tags' istnieje. Liczba rekordów: 89587


In [0]:
display(dbutils.fs.ls("/FileStore/export/"))

path,name,size,modificationTime
dbfs:/FileStore/export/bridge_post_tags.csv/,bridge_post_tags.csv/,0,0
dbfs:/FileStore/export/dim_badge_class.csv/,dim_badge_class.csv/,0,0
dbfs:/FileStore/export/dim_post_type.csv/,dim_post_type.csv/,0,0
dbfs:/FileStore/export/dim_tags.csv/,dim_tags.csv/,0,0
dbfs:/FileStore/export/dim_time.csv/,dim_time.csv/,0,0
dbfs:/FileStore/export/dim_users.csv/,dim_users.csv/,0,0
dbfs:/FileStore/export/dim_vote_type.csv/,dim_vote_type.csv/,0,0
dbfs:/FileStore/export/fact_badges.csv/,fact_badges.csv/,0,0
dbfs:/FileStore/export/fact_posts.csv/,fact_posts.csv/,0,0
dbfs:/FileStore/export/fact_votes.csv/,fact_votes.csv/,0,0


In [0]:
# Pobieranie plików

def export_csv_with_download(df, name):
    # Ścieżki robocze
    temp_path = f"dbfs:/FileStore/export/{name}.csv"
    final_path = f"dbfs:/FileStore/export-ready/{name}.csv"

    # Usunięcie wcześniejszych wersji
    dbutils.fs.rm(temp_path, True)
    dbutils.fs.rm(final_path, True)

    # Zapis do folderu tymczasowego jako pojedynczy plik
    df.coalesce(1).write.option("header", True).mode("overwrite").csv(temp_path)

    # Znalezienie pliku part-*.csv
    files = dbutils.fs.ls(temp_path)
    part_file = [f.path for f in files if f.name.startswith("part-") and f.name.endswith(".csv")][0]

    # Kopiowanie do widocznej lokalizacji
    dbutils.fs.cp(part_file, final_path)

    # Link do pobrania
    download_url = f"https://community.cloud.databricks.com/files/export-ready/{name}.csv"
    displayHTML(f'<a href="{download_url}" target="_blank">📥 Pobierz plik: {name}.csv</a><br>')


# Lista tabel do eksportu
csv_tables = {
    "fact_posts": fact_posts,
    "fact_votes": fact_votes,
    "fact_badges": fact_badges,
    "dim_users": dim_users,
    "dim_tags": dim_tags,
    "dim_vote_type": dim_vote_type,
    "dim_badge_class": dim_badge_class,
    "dim_time": dim_time,
    "dim_post_type": dim_post_type,
    "bridge_post_tags": post_tags
}

# Eksport i generowanie linków
for name, df in csv_tables.items():
    export_csv_with_download(df, name)


In [0]:
display(dbutils.fs.ls("/FileStore/export-ready/"))

path,name,size,modificationTime
dbfs:/FileStore/export-ready/bridge_post_tags.csv,bridge_post_tags.csv,1577135,1743767202000
dbfs:/FileStore/export-ready/dim_badge_class.csv,dim_badge_class.csv,42,1743767192000
dbfs:/FileStore/export-ready/dim_post_type.csv,dim_post_type.csv,115,1743767200000
dbfs:/FileStore/export-ready/dim_tags.csv,dim_tags.csv,144278,1743767187000
dbfs:/FileStore/export-ready/dim_time.csv,dim_time.csv,13791264,1743767198000
dbfs:/FileStore/export-ready/dim_users.csv,dim_users.csv,17035241,1743767185000
dbfs:/FileStore/export-ready/dim_vote_type.csv,dim_vote_type.csv,210,1743767190000
dbfs:/FileStore/export-ready/fact_badges.csv,fact_badges.csv,9054814,1743767179000
dbfs:/FileStore/export-ready/fact_posts.csv,fact_posts.csv,3191921,1743767167000
dbfs:/FileStore/export-ready/fact_votes.csv,fact_votes.csv,34757947,1743767175000


In [0]:
#Zapis dim_users.csv ponownie (osobno) z powodu błędów w odczycie 
from pyspark.sql.functions import col
import re

# Utworzenie wymiaru użytkowników
df_dim_users = df_list_silver["users"] \
    .withColumnRenamed("id", "user_id") \
    .dropDuplicates(["user_id"])

# Ścieżka do zapisu
export_path = "dbfs:/FileStore/export-ready/dim_users.csv"

# Usunięcie poprzednich plików (jeśli istnieją)
dbutils.fs.rm(export_path, True)

# Eksport CSV z obsługą znaków specjalnych
df_dim_users.coalesce(1).write \
    .option("header", True) \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", True) \
    .mode("overwrite") \
    .csv(export_path)

# Automatyczne wykrycie nazwy pliku
files = dbutils.fs.ls(export_path)
csv_file = [f.path for f in files if f.path.endswith(".csv")][0]

# Wyciągnięcie nazwy pliku (ostatni człon ścieżki)
file_name = csv_file.split("/")[-1]

# Wygenerowanie linku do pobrania
download_url = f"https://community.cloud.databricks.com/files/export-ready/dim_users.csv/{file_name}"
displayHTML(f'<a href="{download_url}" target="_blank">📥 Pobierz plik dim_users.csv</a>')


In [0]:
display(dbutils.fs.ls("/FileStore/export/dim_users.csv/"))

path,name,size,modificationTime
dbfs:/FileStore/export/dim_users.csv/_SUCCESS,_SUCCESS,0,1743767183000
dbfs:/FileStore/export/dim_users.csv/_committed_5297017968719088424,_committed_5297017968719088424,113,1743767183000
dbfs:/FileStore/export/dim_users.csv/_started_5297017968719088424,_started_5297017968719088424,0,1743767181000
dbfs:/FileStore/export/dim_users.csv/part-00000-tid-5297017968719088424-dbc95bf3-dda5-4497-8623-72df51e411fc-728-1-c000.csv,part-00000-tid-5297017968719088424-dbc95bf3-dda5-4497-8623-72df51e411fc-728-1-c000.csv,17035241,1743767183000
