In [1]:
from pyspark.sql.functions import col, to_timestamp, lit, lag, last, mean, when, floor, concat, regexp_extract, regexp_replace
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Pfad zu den Dateien
path = "Files"
symbols_array = symbols.split(",")

# Datentypen
data_type = ["historic", "realtime"]

# Window-Spezifikation
window_spec = Window.partitionBy("symbol").orderBy("timestamp")

# Liste für die kombinierten DataFrames
all_dfs = []

# Schleife über jedes Symbol
for symbol in symbols_array:
  
    # Initialisiere DataFrames für historic und realtime
    df_transformed_h = None
    df_transformed_r = None
    
    # Lade die relevanten Dateien für das Symbol
    files = mssparkutils.fs.ls(path)
    relevant_files = [file.path for file in files if symbol in file.name.lower()]
    
    # Schleife über die Datentypen (historic und realtime)
    for type in data_type:
        # Finde die passende Datei für den Datentyp
        for file_path in relevant_files:
            index = file_path.split("/")[-1].split("_")[0].lower()
            table_name = f"finance_lakehouse.dbo.{index}_{type}_data_bronze"
            
            try:
                df = spark.read.table(table_name)
                
                if type == "realtime":
                    # Transformation für Realtime-Daten
                    df_transformed_r = df.select(
                        to_timestamp(col("timestamp")).alias("timestamp"),
                        col("high").cast(DoubleType()).alias("high"),
                        col("low").cast(DoubleType()).alias("low"),
                        col("current_price").cast(DoubleType()).alias("close"),
                        col("open").cast(DoubleType()).alias("open"),
                        lit(None).cast(DoubleType()).alias("volume"),
                        lit(symbol.upper()).cast(StringType()).alias("symbol"),
                        lit(None).cast(StringType()).alias("interval"),
                        to_timestamp(col("ingestion_time")).alias("ingestion_time"),
                        col("current_price").cast(DoubleType()).alias("current_price"),
                        col("change").cast(DoubleType()).alias("change"),
                        col("percentage_change").cast(DoubleType()).alias("percentage_change"),
                        col("previous_close").cast(DoubleType()).alias("previous_close")
                    )
                else:
                    # Transformation für Historic-Daten
                    df_transformed_h = df.select(
                        to_timestamp(col("timestamp")).alias("timestamp"),
                        col("high").cast(DoubleType()).alias("high"),
                        col("low").cast(DoubleType()).alias("low"),
                        col("close").cast(DoubleType()).alias("close"),
                        col("open").cast(DoubleType()).alias("open"),
                        col("volume").cast(DoubleType()).alias("volume"),
                        col("symbol").cast(StringType()).alias("symbol"),
                        col("interval").cast(StringType()).alias("interval"),
                        to_timestamp(col("ingestion_time")).alias("ingestion_time"),
                        col("close").cast(DoubleType()).alias("current_price"),
                        lit(None).cast(DoubleType()).alias("change"),
                        lit(None).cast(DoubleType()).alias("percentage_change"),
                        lit(None).cast(DoubleType()).alias("previous_close")
                    )
            except Exception as e:
                print(f"Fehler beim Laden der Tabelle {table_name}: {str(e)}")
                continue
    
    # Kombiniere die DataFrames, wenn beide existieren
    if df_transformed_h is not None and df_transformed_r is not None:
        unified_df = df_transformed_r.union(df_transformed_h)
    elif df_transformed_h is not None:
        unified_df = df_transformed_h
    elif df_transformed_r is not None:
        unified_df = df_transformed_r
    else:
        continue
    
    # Transformationen auf das kombinierte DataFrame anwenden
    unified_df = unified_df.dropDuplicates(["timestamp", "symbol"])
    unified_df = unified_df.orderBy("timestamp")

    # previous_close berechnen (lag von close)
    unified_df = unified_df.withColumn(
        "previous_close",
        when(col("previous_close").isNull(), lag(col("close"), 1).over(window_spec))
        .otherwise(col("previous_close"))
    )
    
    # change/percentage_change berechnen
    unified_df = unified_df.withColumn(
        "change",
        when(col("change").isNull(), col("close") - col("previous_close"))
        .otherwise(col("change"))
    ).withColumn(
        "percentage_change",
        when(col("percentage_change").isNull(), (col("change") / col("previous_close")) * 100)
        .otherwise(col("percentage_change"))
    )
    
    # volume berechnen
    unified_df = unified_df.withColumn(
        "volume",
        when(col("volume").isNull(), last(col("volume"), ignorenulls=True).over(window_spec))
        .otherwise(col("volume"))
    )
    
    # interval hinzufügen für Realtime-Daten
    unified_df = unified_df.withColumn(
        "interval",
        when(col("interval").isNull(), lit("5min")).otherwise(col("interval"))
    )
    
    # change nach 5 Kommastellen abschneiden
    unified_df = unified_df.withColumn(
        "change",
        when(col("change").isNotNull(), floor(col("change") * 100000) / 100000)
        .otherwise(col("change"))
    )
    
    # percentage_change nach 5 Kommastellen abschneiden
    unified_df = unified_df.withColumn(
        "percentage_change",
        when(col("percentage_change").isNotNull(), floor(col("percentage_change") * 100000) / 100000)
        .otherwise(col("percentage_change"))
    )
    
    # Speichere das kombinierte DataFrame
    file_name = f"{symbol}_data_silver"
    unified_df.write.mode("overwrite").saveAsTable(file_name)

StatementMeta(, 0735ed4b-1e7e-4e27-8e82-2818fbeaa3a1, 3, Finished, Available, Finished)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, regexp_replace, when, length, lit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

df = spark.read.table("finance_lakehouse.dbo.news_data_bronze")

# Transformationen durchführen
df_news = df.select(
    col("name").cast(StringType()).alias("source"),
    col("url").cast(StringType()).alias("url"),
    col("urlToImage").cast(StringType()).alias("image"),
    col("content").cast(StringType()).alias("content"),
    col("title").cast(StringType()).alias("title"),
    to_timestamp(col("PublishDate")).alias("publish_date"),
    to_timestamp(col("ingestion_time")).alias("ingestion_time")
)

# Leere Strings durch NULL ersetzen
df_news = df_news.withColumn("content", when(col("content") == "", lit(None)).otherwise(col("content"))) \
                 .withColumn("title", when(col("title") == "", lit(None)).otherwise(col("title")))

# Content-Länge hinzufügen
df_news = df_news.withColumn("content_length", length(col("content")).cast(StringType()))

# Duplikate entfernen
df_news = df_news.dropDuplicates(["url"])

# Nach publish_date und ingestion_time sortieren
df_news = df_news.orderBy("publish_date", "ingestion_time")

# In die Silver Layer speichern
df_news.write.mode("overwrite").saveAsTable("news_data_silver")

StatementMeta(, 0735ed4b-1e7e-4e27-8e82-2818fbeaa3a1, 4, Finished, Available, Finished)