In [None]:

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, length, lit, concat, to_timestamp
from snowflake.snowpark.types import (
    StructType, StructField, IntegerType, DoubleType, StringType, 
    TimestampType, DateType, LongType, FloatType
)
from snowflake.snowpark import functions as F
from snowflake.snowpark.window import Window


from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:

session.use_warehouse("COMPUTE_WH")
customers_csv_file = "@MY_GCS_STAGE/customers.csv"


df = session.read.option("FIELD_OPTIONALLY_ENCLOSED_BY", '"').option("FIELD_DELIMITER", ',').option("SKIP_HEADER", 1).csv(customers_csv_file)
df.printSchema()

Data Type Transformations

In [None]:
# Defining schema
schema = StructType([
    StructField("ssn", StringType(), True),
    StructField("cc_num", StringType(), True),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", StringType(), True),
    StructField("lat", FloatType(), True),
    StructField("long", FloatType(), True),
    StructField("city_pop", LongType(), True),
    StructField("job", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("acct_num", StringType(), True),
    StructField("profile", StringType(), True)
])


df = session.read.options({
    "FIELD_OPTIONALLY_ENCLOSED_BY": '"',
    "SKIP_HEADER": 1,
    "FIELD_DELIMITER" : ','
}).schema(schema).csv(customers_csv_file)

df.printSchema()


df.show(5)


In [None]:
df.count()

Duplicate Check

In [None]:
partition = Window.partition_by(*df.columns)

duplicates = df.select("*", F.count("*").over(partition).alias("duplicate_records")) \
                .filter(F.col("duplicate_records") > 1)

duplicates.show()

Completeness Check

In [None]:
total_rows = df.count()
completeness_check = df.select(
    *[
        (F.count(F.when(F.col(c).isNull(), 1)) / total_rows).alias(f"{c}_null_ratio")
        for c in df.columns
    ]
)

completeness_check.show()

Length Checks

In [None]:
def length_check(df, col_name, required_length=None, min_length=None, max_length=None):
    if required_length:
        return df.filter(length(col(col_name)) == required_length)
    elif min_length is not None and max_length is not None:
        return df.filter(length(col(col_name)).between(min_length, max_length))
    else:
        return df

df_new = length_check(df, 'cc_num', required_length=16)
df_new = length_check(df_new, 'ssn', required_length=11)
df_new = length_check(df_new, 'acct_num', min_length=8, max_length=12)

df_new.show(5)


In [None]:
df_new.count()

In [None]:

df_new.write.mode("append").save_as_table("CUSTOMERS_INTERNAL")

