# Wyzwanie adwentowe - Dane są wszędzie

In [0]:
%run /Users/wojciech.zdziebkowski@gmail.com/Santa_DB_connection_param


## Dzień 1

In [0]:
# importy
from pyspark.sql.functions import col, desc, asc, count, lit, lag, lead, datediff, max as spark_max, when, sum as spark_sum, date_sub, min as spark_min
from pyspark.sql.window import Window

In [0]:
# połączenie


jdbc_url = (
    f"jdbc:sqlserver://{jdbc_hostname}:{jdbc_port};"
    f"database={jdbc_database};"
    "encrypt=true;trustServerCertificate=false;loginTimeout=30;"
)



In [0]:
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


In [0]:
tables_df = (spark.read
    .format("jdbc")
    .option("url", jdbc_url)
    .option("dbtable", "INFORMATION_SCHEMA.TABLES")
    .options(**connection_properties)
    .load())

display(tables_df)


In [0]:

query_counts = """
(
    SELECT 
        t.name AS NazwaTabeli,
        SUM(p.rows) AS LiczbaWierszy
    FROM sys.tables t
    JOIN sys.schemas s ON t.schema_id = s.schema_id
    JOIN sys.partitions p ON t.object_id = p.object_id
    WHERE p.index_id < 2 
    GROUP BY s.name, t.name
) as counts
"""

df_fast_counts = spark.read.jdbc(
    url=jdbc_url, 
    table=query_counts, 
    properties=connection_properties
)

df_sorted = df_fast_counts.orderBy(
    col("LiczbaWierszy").desc()
)

display(df_sorted)

## Dzień 2

In [0]:
query_holidays_poland = """
(
    SELECT 
        *
    FROM edw.HolidayCountry
    WHERE Country = 'Poland'
) as holiday_pl
"""

df_hc_pol = spark.read.jdbc(
    url=jdbc_url, 
    table=query_holidays_poland, 
    properties=connection_properties
)

display(df_hc_pol)

In [0]:
query_holidays_japan = """
(
    SELECT 
        *
    FROM edw.HolidayCountry
    WHERE Country = 'Japan'
) as holiday_jp
"""

df_hc_jap = spark.read.jdbc(
    url=jdbc_url, 
    table=query_holidays_japan, 
    properties=connection_properties
)
 
display(df_hc_jap)

In [0]:
query_date = """
(
    SELECT 
        *
    FROM edw.DimDate
) as date
"""

df_date = spark.read.jdbc(
    url=jdbc_url, 
    table=query_date, 
    properties=connection_properties
)

display(df_date)

In [0]:
df_pol_gotowy = df_hc_pol.withColumnRenamed("Country", "CountryPL")
df_jap_gotowy = df_hc_jap.withColumnRenamed("Country", "CountryJP")

In [0]:
df_date_pol = df_date.join(
    df_pol_gotowy, 
    on="DateKey", 
    how="left"
)

df_date_both = df_date_pol.join(
    df_jap_gotowy, 
    on="DateKey", 
    how="left"
)

In [0]:
df_finalny = (df_date_both
    .withColumn("PL_Holiday", 
        when(col("CountryPL").isNotNull(),
          lit(1)).otherwise(0)
    )
    .withColumn("JP_Holiday", 
        when(col("CountryJP").isNotNull(),
              lit(1)).otherwise(0)
    )
)

display(df_finalny)

In [0]:
count_start = df_date.count()
count_end = df_finalny.count()

print(f"Start: {count_start}, Koniec: {count_end}")

if count_start == count_end:
    print("Nie ma duplikatów")
else:
    print("Są duplikaty.")

print(count_end)

In [0]:
hd_jap_2024 = (df_finalny
               .filter(
                   (col("Year") == 2024) & (col("JP_Holiday") == 1))
               .count()
            )

hd_pol_2026 = (df_finalny
               .filter(
                   (col("Year") == 2026) & (col("PL_Holiday") == 1))
               .count()
            )

result_holidays = hd_jap_2024 - hd_pol_2026
print(result_holidays)

## Dzień 3

In [0]:
df_dim_dummy_elf = spark.read.jdbc(
    url=jdbc_url, 
    table="edw.DimElf_wDummyRow", 
    properties=connection_properties
)

display(df_dim_dummy_elf)

In [0]:
df_dummy_hiredate = (df_dim_dummy_elf
                     .filter(col("ElfKey") == -1)
                     .select("HireDateKey")
                    )

display(df_dummy_hiredate)

In [0]:
df_elf_time = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.FactElfTime", 
    properties=connection_properties
)

In [0]:
df_elf_dummy_count = df_elf_time.filter(col("ElfKey") == -1).count()

display(df_elf_dummy_count)

In [0]:
df_elf_dummy_time = (df_elf_time
                     .filter(col("ElfKey") == -1)
                     .agg(spark_sum("WorkedHours")
                          .alias("sum_WorkedHours"))
                    )

display(df_elf_dummy_time)

## Dzień 4 - wykonany w Power BI

## Dzień 5

In [0]:
df_wish_list = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.FactWish", 
    properties=connection_properties
)

df_dim_toy = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.DimToy", 
    properties=connection_properties
)

df_dim_child = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.DimChild", 
    properties=connection_properties
)

df_dim_wish_channel = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.DimWishChannel", 
    properties=connection_properties
)

df_dim_region = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.DimRegion", 
    properties=connection_properties
)

In [0]:
df_rejected = df_wish_list.filter(
    col("Status") == "Rejected"
)

df_toy_w_wish = (
    df_dim_toy.join(
        df_wish_list,
        on="ToyKey",
        how="inner"
    )
    .groupBy("ToyCategory")
    .agg(
        count("*").alias("wish_count")
    )
    .orderBy(
        col("wish_count").desc()
    )
)

display(df_toy_w_wish.limit(1))

In [0]:
df_child_w_wish = (
    df_wish_list.join(
        df_dim_child, 
        on="ChildKey", 
        how="inner"
    )
    .groupBy("ChildKey")
    .agg(
        count("*").alias("wish_count")
    )
    .orderBy(col("wish_count").asc()
    )
)

display(df_child_w_wish.limit(1))

In [0]:
total_wish = df_wish_list.count()
total_new_wish = df_wish_list.filter(col("WishStatus") == "New").count()

new_wish_percentage = (total_new_wish / total_wish) * 100
print(new_wish_percentage)

In [0]:
df_wish_poland = (
    df_wish_list.join(
        df_dim_child, 
        on = "ChildKey", 
        how = "inner")
    .join(
        df_dim_region, 
        on = "RegionKey", 
        how = "inner"
    )
    .filter(col("Country") == 'Poland')
    .join(
        df_dim_wish_channel, 
        on = "WishChannelKey", 
        how = "inner"
    )
    .groupBy("WishChannelName")
    .agg(
        count("*").alias("wish_count")
    )
    .orderBy(col("wish_count").desc())
)

display(df_wish_poland.limit(1))

In [0]:
df_wish_with_date = (
    df_wish_list.join(
        df_date,
        df_wish_list["WishDateKey"] == df_date["DateKey"],
        how = "inner"
    )
)

window = Window.partitionBy("ChildKey").orderBy(col("Date"))

df_calculated = (
    df_wish_with_date
        .withColumn(
            "NextWishDate", 
            lead(col("Date"), 1)
            .over(window)
            )
        .withColumn(
            "DaysDiff",
            datediff(col("NextWishDate"), 
                     col("Date"))
            )
        )

max_diff_row = df_calculated.orderBy(col("DaysDiff").desc()).first()

if max_diff_row:
    print(f"Najdłuższa różnica w dniach: {max_diff_row['DaysDiff']}")
    print(f"Dziecko ID: {max_diff_row['ChildKey']}")
    print(f"Data życzenia: {max_diff_row['Date']}, Data kolejnego: {max_diff_row['NextWishDate']}")
else:
    print("Brak danych do analizy.")

In [0]:
df_reinder_master = spark.read.jdbc(
    url=jdbc_url, 
    table="ReindeerMaster.ReindeerChanges", 
    properties=connection_properties
)

In [0]:
reindeer_window = Window.partitionBy("ReindeerID").orderBy("ChangeDate")

next_date_col = lead("ChangeDate", 1).over(reindeer_window)

df_reindeer_scd2 = (
    df_reinder_master
        .withColumn("NextDate", next_date_col)
        .withColumn("ValidTo", date_sub(col("NextDate"), 1))
        .withColumn("IsCurrent", when(col("NextDate").isNull(), lit(1)).otherwise(lit(0)))
        .withColumnRenamed("ChangeDate", "ValidFrom")
        .drop("NextDate")
)

display(df_reindeer_scd2)

## Dzien 10

In [0]:

query_cs = """
(
    SELECT 
        count (distinct AttributeName) as cnt
    FROM ReindeerMaster.CaseSensitivityXmasTest 
) as casesens
"""

df_cs = spark.read.jdbc(
    url=jdbc_url, 
    table=query_cs, 
    properties=connection_properties
)


display(df_cs)

## Dzień 11

In [0]:
df_factless = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.FactElfTrainingAttendance", 
    properties=connection_properties
)

df_dim_date = spark.read.jdbc(
    url=jdbc_url, 
    table="edw.DimDate", 
    properties=connection_properties
)

df_dim_traning = spark.read.jdbc(
    url=jdbc_url, 
    table="dbo.DimTraining", 
    properties=connection_properties
)

df_dim_elf = spark.read.jdbc(
    url=jdbc_url, 
    table="edw.DimElf", 
    properties=connection_properties
)

In [0]:
df_q1d11 = (
    df_factless
    .join(df_date, "DateKey")
    .groupBy(df_date.DayOfWeekName)
    .count()
    .orderBy(desc("count"))
)

display(df_q1d11)

In [0]:
display(df_dim_date)

In [0]:
display(df_dim_traning)

In [0]:
df_q2d11 = (
    df_factless
    .join(df_dim_date, "DateKey")
    .join(df_dim_traning, "TrainingKey")
    .filter((col("Month")== 12) & (col("DayOfWeekName") == "Monday"))
    .agg(spark_sum("DurationHours").alias("TotalHoursDecMondays"))
)

display(df_q2d11)

In [0]:
display(df_dim_elf)

In [0]:
missing_elves = df_factless.join(df_dim_elf, "ElfKey", "leftanti").count()

missing_trainings = df_factless.join(df_dim_traning, "TrainingKey", "left_anti").count()

print(f"Liczba rekordów bez pasującego Elfa: {missing_elves}")
print(f"Liczba rekordów bez pasującego Szkolenia: {missing_trainings}")

In [0]:
df_q4d11 = (
    df_factless
    .join(
        df_dim_traning,
        on="TrainingKey"
    )
    .join(df_dim_elf, on="ElfKey")
    .groupBy(
        df_dim_elf["ElfName"]
    )
    .agg(
        spark_sum("DurationHours").alias("TotalHours")
    )
    .orderBy(
        desc("TotalHours")
    )
)

display(df_q4d11)

In [0]:
training_stats = (
    df_factless
    .join(df_dim_traning, "TrainingKey")
    .groupBy("TrainingName")
    .agg(
        spark_sum("DurationHours").alias("TotalHours"))
)

df_q5d11 = training_stats.agg(
    (spark_max("TotalHours") - spark_min("TotalHours")).alias("HourDifference")
)

display(df_q5d11)