In [0]:
from pyspark.sql import functions as F

s = "2025-01-01 12:00:00 +03:00"  # preferred normalized form
df = spark.range(1).select(F.to_timestamp(F.lit(s), "yyyy-MM-dd HH:mm:ss XXX").alias("ts"))

# ------------
spark.conf.set("spark.sql.session.timeZone", "UTC")
df.show(truncate=False)
# ------------
spark.conf.set("spark.sql.session.timeZone", "America/Toronto")
df.show(truncate=False)

# ------------
s = "2025-01-01 12:00:00 +03:00"  # preferred normalized form
df = spark.range(1).select(F.to_utc_timestamp(F.to_timestamp(F.lit(s), "yyyy-MM-dd HH:mm:ss XXX"),"America/Toronto").alias("ts"))

df.show(truncate=False)

In [0]:

ts_str = "2022-01-01 00:30:00"

spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.range(1).select(F.to_date(F.lit(ts_str).cast("timestamp")).alias("day")).show()

spark.conf.set("spark.sql.session.timeZone", "America/Toronto")
spark.range(1).select(F.to_date(F.lit(ts_str).cast("timestamp")).alias("day")).show()


In [0]:
from pyspark.sql.types import StructType, IntegerType, StringType, DateType, TimestampNTZType, StructField, TimestampType
from pyspark.sql.window import Window


schema = StructType([ StructField("user_id",IntegerType(), True),
             StructField("timestamp",StringType(), True),
             StructField("page",StringType(), True),
             StructField("duration_seconds",IntegerType(), True)
            
            ] )


df = spark.createDataFrame(
 [  (1,"2022-01-01 12:00:00","home",30),
    (2,"2022-01-01 12:05:00","dashboard",45),
    (3,"2022-01-01 12:10:00","profile",60),
    (1,"2022-01-01 12:15:00","home",20),
    (2,"2022-01-01 12:20:00","profile",30),
    (3,"2022-01-01 12:25:00","dashboard",40)],
 schema =   schema
)


w = Window.partitionBy("page").orderBy("timestamp")

# per-row window sum (same sum repeated on every row in that page)
with_sum = df.withColumn(
    "sum_per_page",
    F.sum("duration_seconds").over(w)
) # .withColumn("rn", F.row_number().over(w))


display( with_sum)

In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/csv/user_engagement.csv")

display( df1)

In [0]:
# tests/test_user_engagement_dq_string_ts.py
import pyspark.sql.functions as F

PATTERN = "yyyy-MM-dd HH:mm:ss"
MIN_DT = "1900-01-01 00:00:00"
MAX_DT = "2999-12-31 23:59:59"

def _sample_df(spark):
    data = [
        (1, "2022-01-01 12:00:00", "home", 30),
        (2, "2022-01-01 12:05:00", "dashboard", 45),
        (3, "2022-01-01 12:10:00", "profile", 60),
        (1, "2022-01-01 12:15:00", "home", 20),
        (2, "2022-01-01 12:20:00", "profile", 30),
        (3, "2022-01-01 12:25:00", "dashboard", 40),
    ]
    return spark.createDataFrame(data, ["user_id","timestamp","page","duration_seconds"])

def test_not_nulls(spark):
    df = _sample_df(spark)
    null_viol = df.filter(
        F.col("user_id").isNull() |
        F.col("timestamp").isNull() |
        F.col("page").isNull() |
        F.col("duration_seconds").isNull()
    )
    assert null_viol.count() == 0, "Nulls found in required columns"

def test_user_id_and_duration_min_values(spark):
    df = _sample_df(spark)
    viol = df.filter(
        (F.col("user_id") < -1) | (F.col("duration_seconds") < -1)
    )
    assert viol.count() == 0, "Values < -1 detected in user_id/duration_seconds"

def test_timestamp_format_and_parseable(spark):
    df = _sample_df(spark)
    # parse string → timestamp using the required pattern
    parsed = df.withColumn("ts_parsed", F.to_timestamp("timestamp", PATTERN))
    bad_fmt = parsed.filter(F.col("ts_parsed").isNull())
    assert bad_fmt.count() == 0, f"Timestamp not matching pattern {PATTERN}"

def test_timestamp_in_range(spark):
    df = _sample_df(spark)
    parsed = df.withColumn("ts_parsed", F.to_timestamp("timestamp", PATTERN))
    viol = parsed.filter(
        (F.col("ts_parsed") < F.to_timestamp(F.lit(MIN_DT))) |
        (F.col("ts_parsed") > F.to_timestamp(F.lit(MAX_DT)))
    )
    assert viol.count() == 0, "Timestamps out of allowed range [1900-01-01, 2999-12-31]"


In [0]:
%pip install pytest


In [0]:
import os, shutil, pytest

SRC_TEST = "/Workspace/Users/andrewkravchuk@outlook.com/Gore Mutual/tests/test_basic.py"
DST_DIR  = "/tmp/tests"
DST_TEST = f"{DST_DIR}/test_basic.py"

os.makedirs(DST_DIR, exist_ok=True)
shutil.copy(SRC_TEST, DST_TEST)

# Add -W filters (you can stack multiple)
exit_code = pytest.main([
    DST_TEST,
    "-v",
    "-rP",                        # ← extra summary for Passed tests
    "-o", "cache_dir=/tmp/pytest_cache",
    "-W", "ignore:distutils Version classes are deprecated:DeprecationWarning",
    "-W", "ignore::DeprecationWarning:pyspark.sql.pandas.utils",
])
