In [0]:
first_full_load=False # True only if first historical full load, otherwise leave as False for incremental load

etl_version='V0.3'

In [0]:
landing_path='/FileStore/mnt/test_bucket/landing/'
archive_path='/FileStore/mnt/test_bucket/archive/'
silver_path='/FileStore/mnt/test_bucket/silver/'
gold_path='/FileStore/mnt/test_bucket/gold/'

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os
from datetime import datetime

In [0]:
run_time=datetime.now()
start_run=run_time
run_time

Out[46]: datetime.datetime(2023, 11, 30, 19, 21, 3, 878580)

In [0]:
# https://copdips.com/2022/07/databricks-job-context.html
notebook_name=dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split('/')[-1]
print(notebook_name)
user_name=dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
print(user_name)

003 updated etl
juancitopepe965@gmail.com


# Process products

In [0]:
# load and process files

# PRODUCTS TABLE
if len(dbutils.fs.ls(landing_path + 'products')) > 0:
    # process files
    table_name = 'products'
    table_schema=StructType([StructField('product_code', StringType(), False), StructField('product_name', StringType(), True), StructField('category', StringType(), True), StructField('description', StringType(), True), StructField('gender', StringType(), True), StructField('color', StringType(), True), StructField('size', StringType(), True)])

    df=(spark.read.csv(landing_path + table_name, header=True, schema=table_schema).select('*', '_metadata.*').drop('file_name'))

    # Remove any possible duplicates
    #df=df.dropDuplicates(subset=['product_code', 'product_name', 'category', 'description', 'gender', 'color', 'size'])
    df=df.dropDuplicates(subset=['product_code'])

    if first_full_load:
        # add columns for scd 2
        df=df.withColumn("active_status", lit('Y')).withColumn("valid_from", lit("2023-01-01")).withColumn("valid_to", lit("9999-12-31"))
        # create temp view
        df.createOrReplaceTempView('df')
        # insert values
        spark.sql('''insert into silver.products_silver (product_code, product_name, category, description, gender, color, size, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
            select * from df''')        

        print('*** Full historical load for Products table completed. ***')

    else:
        spark.sql('create database if not exists silver')
        spark.sql('''create table if not exists silver.products_silver using delta
            location "/FileStore/mnt/test_bucket/silver/products_silver"''')
        
        join_columns=["product_code"]
        main_columns_source=["product_name", "category", "description", "gender", "color", "size"]
        main_columns_target=[f"target_{x}" for x in main_columns_source]
        all_columns=["ProductId", "product_code", "product_name", "category", "description", "gender", "color", "size", "file_path", "file_size", "file_modification_time", "active_status", "valid_from", "valid_to"]

        targetDF=spark.read.format('delta').table('silver.products_silver')
        sourceDF=df

        # Step 1 - left join to new data the main table
        joinDF=(
            sourceDF.join(
                targetDF.where(col("active_status")=='Y'),
                *join_columns,
                'leftouter'
            )
            .select(
                sourceDF['*'],
                targetDF["ProductId"].alias("target_ProductId"),
                targetDF["product_code"].alias("target_product_code"), targetDF["product_name"].alias("target_product_name"), targetDF["category"].alias("target_category"), targetDF["description"].alias("target_description"), targetDF["gender"].alias("target_gender"), targetDF["color"].alias("target_color"), targetDF["size"].alias("target_size")
            )
        )

        # Step 2 - Filter out only changed records
        filterDF=(joinDF.filter(
                xxhash64(col("product_name"), col("category"), col("description"), col("gender"), col("color"), col("size"))
                != xxhash64(col("target_product_name"), col("target_category"), col("target_description"), col("target_gender"), col("target_color"), col("target_size"))
            ))

        # Step 3 - create merge key
        mergeDF=filterDF.withColumn('MERGEKEY', concat(*join_columns))

        # Step 4 - create null merge key for matching records
        dummyDF=filterDF.filter('target_ProductId is not null').withColumn('MERGEKEY', lit(None))

        # Step 5 - combine step 3 and 4
        scdDF=mergeDF.union(dummyDF)
        scdDF.createOrReplaceTempView('scdDF')

        # Step 6 - apply merge statement
        spark.sql('''
            merge into silver.products_silver as target
            --merge into test_merge2 as target

            using scdDF as source
            on  target.product_code = source.MERGEKEY and target.active_status="Y"

            when matched then update set
                active_status='N',
                valid_to=date_sub(CAST(current_date() as DATE), 1)

            when not matched then insert (product_code, product_name, category, description, gender, color, size, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
                values(
                source.product_code, 
                source.product_name, 
                source.category, 
                source.description, 
                source.gender, 
                source.color, 
                source.size, 
                source.file_path, 
                source.file_size, 
                source.file_modification_time, 
                'Y', 
                current_date(),
                '9999-12-31')'''
        )

        print('Daily load for Products table completed.')
      

    # send file/s to archive folder
    for f in dbutils.fs.ls(f'dbfs:{landing_path}{table_name}'):
        dbutils.fs.mv(f.path, os.path.join(f'dbfs:{archive_path}{table_name}', os.path.basename(f.path)))

else:
    print('No files to process for "Products"')




Daily load for Products table completed.


# Process brands

In [0]:
if len(dbutils.fs.ls(landing_path + 'brands')) > 0:
    # process files
    table_name = 'brands'
    table_schema=StructType([StructField('brand_code', StringType(), True), StructField('name', StringType(), True), StructField('contact', StringType(), True), StructField('country', StringType(), True)])

    df=(spark.read.csv(landing_path + table_name, header=True, schema=table_schema).select('*', '_metadata.*').drop('file_name'))

    # Remove any possible duplicates
    df=df.dropDuplicates(subset=['brand_code'])

    if first_full_load:
        # add columns for scd 2
        df=df.withColumn("active_status", lit('Y')).withColumn("valid_from", lit("2023-01-01")).withColumn("valid_to", lit("9999-12-31"))
        # create temp view
        df.createOrReplaceTempView('df')
        # insert values
        spark.sql('''insert into silver.brands_silver (brand_code, name, contact, country, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
            select * from df''')        

        print('*** Full historical load for Brands table completed. ***')

    else:
        spark.sql('create database if not exists silver')
        spark.sql('''create table if not exists silver.brands_silver using delta
            location "/FileStore/mnt/test_bucket/silver/brands_silver"''')
        
        join_columns=["brand_code"]
        main_columns_source=["name", "contact", "country"]
        main_columns_target=[f"target_{x}" for x in main_columns_source]
        all_columns=["BrandId", "brand_code", "name", "contact", "country", "file_path", "file_size", "file_modification_time", "active_status", "valid_from", "valid_to"]

        targetDF=spark.read.format('delta').table('silver.brands_silver')
        sourceDF=df

        # Step 1 - left join to new data the main table
        joinDF=(
            sourceDF.join(
                targetDF.where(col("active_status")=='Y'),
                *join_columns,
                'leftouter'
            )
            .select(
                sourceDF['*'],
                targetDF["BrandId"].alias("target_BrandId"),
                targetDF["brand_code"].alias("target_brand_code"), targetDF["name"].alias("target_name"), targetDF["contact"].alias("target_contact"), targetDF["country"].alias("target_country")
            )
        )

        # Step 2 - Filter out only changed records
        filterDF=(joinDF.filter(
                xxhash64(col("name"), col("contact"), col("country"))
                != xxhash64(col("target_name"), col("target_contact"), col("target_country"))
            ))

        # Step 3 - create merge key
        mergeDF=filterDF.withColumn('MERGEKEY', concat(*join_columns))

        # Step 4 - create null merge key for matching records
        dummyDF=filterDF.filter('target_BrandId is not null').withColumn('MERGEKEY', lit(None))

        # Step 5 - combine step 3 and 4
        scdDF=mergeDF.union(dummyDF)
        scdDF.createOrReplaceTempView('scdDF')

        # Step 6 - apply merge statement
        spark.sql('''
            merge into silver.brands_silver as target
            --merge into test_merge2 as target

            using scdDF as source
            on  target.brand_code = source.MERGEKEY and target.active_status="Y"

            when matched then update set
                active_status='N',
                valid_to=date_sub(CAST(current_date() as DATE), 1)

            when not matched then insert (brand_code, name, contact, country, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
                values(
                source.brand_code, 
                source.name, 
                source.contact, 
                source.country,
                source.file_path, 
                source.file_size, 
                source.file_modification_time, 
                'Y', 
                current_date(),
                '9999-12-31')'''
        )

        print('Daily load for Brands table completed.')
      

    # send file/s to archive folder
    for f in dbutils.fs.ls(f'dbfs:{landing_path}{table_name}'):
        dbutils.fs.mv(f.path, os.path.join(f'dbfs:{archive_path}{table_name}', os.path.basename(f.path)))

else:
    print('No files to process for "Brands"')




No files to process for "Brands"


# Process customers

In [0]:
# process files
table_name = 'customers'
table_schema=StructType([StructField('customer_code', StringType(), True), StructField('name', StringType(), True), StructField('surname', StringType(), True), StructField('email', StringType(), True), StructField('country', StringType(), True), StructField('city', StringType(), True), StructField('address', StringType(), True), StructField('county', StringType(), True), StructField('state', StringType(), True), StructField('zip', StringType(), True)])

df=(spark.read.csv(landing_path + table_name, header=True, schema=table_schema).select('*', '_metadata.*').drop('file_name'))

# Remove any possible duplicates
df=df.dropDuplicates(subset=['customer_code'])

if first_full_load:
    # add columns for scd 2
    df=df.withColumn("active_status", lit('Y')).withColumn("valid_from", lit("2023-01-01")).withColumn("valid_to", lit("9999-12-31"))
    # create temp view
    df.createOrReplaceTempView('df')
    # insert values
    spark.sql('''insert into silver.customers_silver (customer_code, name, surname, email, country, city, address, county, state, zip, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
        select * from df''')        

    print('*** Full historical load for customers table completed. ***')

else:
    spark.sql('create database if not exists silver')
    spark.sql('''create table if not exists silver.customers_silver using delta
        location "/FileStore/mnt/test_bucket/silver/customers_silver"''')
    
    join_columns=["customer_code"]
    main_columns_source=['name', 'surname', 'email', 'country', 'city', 'address', 'county', 'state', 'zip']
    main_columns_target=[f"target_{x}" for x in main_columns_source]
    all_columns=['customer_code', 'name', 'surname', 'email', 'country', 'city', 'address', 'county', 'state', 'zip', 'file_path', 'file_size', 'file_modification_time', "active_status", "valid_from", "valid_to"]

    targetDF=spark.read.format('delta').table('silver.customers_silver')
    sourceDF=df
    
    # Step 1 - left join to new data the main table
    joinDF=(
        sourceDF.join(
            targetDF.where(col("active_status")=='Y'),
            *join_columns,
            'leftouter'
        )
        .select(
            sourceDF['*'],
            targetDF["CustomerId"].alias("target_CustomerId"),
            targetDF["customer_code"].alias("target_customer_code"), targetDF["name"].alias("target_name"), targetDF["surname"].alias("target_surname"), targetDF["email"].alias("target_email"), targetDF["country"].alias("target_country"), targetDF["city"].alias("target_city"), targetDF["address"].alias("target_address"), targetDF["county"].alias("target_county"), targetDF["state"].alias("target_state"), targetDF["zip"].alias("target_zip")
        )
    )

    # Step 2 - Filter out only changed records
    filterDF=(joinDF.filter(
            xxhash64(col("name"), col("surname"), col("email"), col("country"), col("city"), col("address"), col("county"), col("state"), col("zip"))
            != xxhash64(col("target_name"), col("target_surname"), col("target_email"), col("target_country"), col("target_city"), col("target_address"), col("target_county"), col("target_state"), col("target_zip"))
        ))

    # Step 3 - create merge key
    mergeDF=filterDF.withColumn('MERGEKEY', concat(*join_columns))

    # Step 4 - create null merge key for matching records
    dummyDF=filterDF.filter('target_CustomerId is not null').withColumn('MERGEKEY', lit(None))

    # Step 5 - combine step 3 and 4
    scdDF=mergeDF.union(dummyDF)

In [0]:
display(filterDF.select(col("name"), col("surname"), col("email"), col("country"), col("city"), col("address"), col("county"), col("state"), col("zip")).printSchema())

root
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- email: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- address: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)



In [0]:
display(filterDF.select(col("target_name"), col("target_surname"), col("target_email"), col("target_country"), col("target_city"), col("target_address"), col("target_county"), col("target_state"), col("target_zip")).printSchema())

root
 |-- target_name: string (nullable = true)
 |-- target_surname: string (nullable = true)
 |-- target_email: string (nullable = true)
 |-- target_country: string (nullable = true)
 |-- target_city: string (nullable = true)
 |-- target_address: string (nullable = true)
 |-- target_county: string (nullable = true)
 |-- target_state: string (nullable = true)
 |-- target_zip: string (nullable = true)



In [0]:
if len(dbutils.fs.ls(landing_path + 'customers')) > 0:
    # process files
    table_name = 'customers'
    table_schema=StructType([StructField('customer_code', StringType(), True), StructField('name', StringType(), True), StructField('surname', StringType(), True), StructField('email', StringType(), True), StructField('country', StringType(), True), StructField('city', StringType(), True), StructField('address', StringType(), True), StructField('county', StringType(), True), StructField('state', StringType(), True), StructField('zip', IntegerType(), True)])

    df=(spark.read.csv(landing_path + table_name, header=True, schema=table_schema).select('*', '_metadata.*').drop('file_name'))

    # Remove any possible duplicates
    df=df.dropDuplicates(subset=['customer_code'])

    if first_full_load:
        # add columns for scd 2
        df=df.withColumn("active_status", lit('Y')).withColumn("valid_from", lit("2023-01-01")).withColumn("valid_to", lit("9999-12-31"))
        # create temp view
        df.createOrReplaceTempView('df')
        # insert values
        spark.sql('''insert into silver.customers_silver (customer_code, name, surname, email, country, city, address, county, state, zip, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
            select * from df''')        

        print('*** Full historical load for customers table completed. ***')

    else:
        spark.sql('create database if not exists silver')
        spark.sql('''create table if not exists silver.customers_silver using delta
            location "/FileStore/mnt/test_bucket/silver/customers_silver"''')
        
        join_columns=["customer_code"]
        main_columns_source=['name', 'surname', 'email', 'country', 'city', 'address', 'county', 'state', 'zip']
        main_columns_target=[f"target_{x}" for x in main_columns_source]
        all_columns=['customer_code', 'name', 'surname', 'email', 'country', 'city', 'address', 'county', 'state', 'zip', 'file_path', 'file_size', 'file_modification_time', "active_status", "valid_from", "valid_to"]

        targetDF=spark.read.format('delta').table('silver.customers_silver')
        sourceDF=df
        
        # Step 1 - left join to new data the main table
        joinDF=(
            sourceDF.join(
                targetDF.where(col("active_status")=='Y'),
                *join_columns,
                'leftouter'
            )
            .select(
                sourceDF['*'],
                targetDF["CustomerId"].alias("target_CustomerId"),
                targetDF["customer_code"].alias("target_customer_code"), targetDF["name"].alias("target_name"), targetDF["surname"].alias("target_surname"), targetDF["email"].alias("target_email"), targetDF["country"].alias("target_country"), targetDF["city"].alias("target_city"), targetDF["address"].alias("target_address"), targetDF["county"].alias("target_county"), targetDF["state"].alias("target_state"), targetDF["zip"].alias("target_zip")
            )
        )

        # Step 2 - Filter out only changed records
        filterDF=(joinDF.filter(
                xxhash64(col("name"), col("surname"), col("email"), col("country"), col("city"), col("address"), col("county"), col("state"), col("zip"))
                != xxhash64(col("target_name"), col("target_surname"), col("target_email"), col("target_country"), col("target_city"), col("target_address"), col("target_county"), col("target_state"), col("target_zip"))
            ))

        # Step 3 - create merge key
        mergeDF=filterDF.withColumn('MERGEKEY', concat(*join_columns))

        # Step 4 - create null merge key for matching records
        dummyDF=filterDF.filter('target_CustomerId is not null').withColumn('MERGEKEY', lit(None))

        # Step 5 - combine step 3 and 4
        scdDF=mergeDF.union(dummyDF)
        scdDF.createOrReplaceTempView('scdDF')

        # Step 6 - apply merge statement
        spark.sql('''
            merge into silver.customers_silver as target
            --merge into test_merge2 as target

            using scdDF as source
            on  target.customer_code = source.MERGEKEY and target.active_status="Y"

            when matched then update set
                active_status='N',
                valid_to=date_sub(CAST(current_date() as DATE), 1)

            when not matched then insert (customer_code, name, surname, email, country, city, address, county, state, zip, file_path, file_size, file_modification_time, active_status, valid_from, valid_to)
                values(
                source.customer_code, 
                source.name, 
                source.surname, 
                source.email,
                source.country, 
                source.city, 
                source.address,
                source.county, 
                source.state,
                source.zip,
                source.file_path, 
                source.file_size, 
                source.file_modification_time, 
                'Y', 
                current_date(),
                '9999-12-31')'''
        )

        print('Daily load for customers table completed.')
      

    # send file/s to archive folder
    for f in dbutils.fs.ls(f'dbfs:{landing_path}{table_name}'):
        dbutils.fs.mv(f.path, os.path.join(f'dbfs:{archive_path}{table_name}', os.path.basename(f.path)))

else:
    print('No files to process for "customers"')





Daily load for customers table completed.


# Process sales

In [0]:
if len(dbutils.fs.ls(landing_path + 'sales')) > 0:
    numb_of_files=len(dbutils.fs.ls(landing_path + 'sales'))
    print(numb_of_files)

    # process files
    table_name = 'sales'
    table_schema=StructType([StructField('order_num', IntegerType(), True), StructField('order_ln', IntegerType(), True), StructField('date', DateType(), True), StructField('price', DoubleType(), True), StructField('qty', IntegerType(), True), StructField('discount', StringType(), True), StructField('product_code', StringType(), True), StructField('brand_code', StringType(), True), StructField('customer_code', StringType(), True), StructField('cost', DoubleType(), True)])

    df=(spark.read.csv(landing_path + table_name, header=True, schema=table_schema).select('*', '_metadata.*').drop('file_name'))
    #df=(spark.read.csv(landing_path + table_name, header=True, inferSchema=True).select('*', '_metadata.*').drop('file_name'))
    numb_of_rows_read=df.count()
    # Remove any possible duplicates
    #display(df.orderBy(col('order_num').desc(), col('order_ln').asc(), col('date').asc()))
    df=df.orderBy('order_num', 'order_ln', 'date').dropDuplicates(subset=['order_num', 'order_ln'])
    numb_of_rows_processed=df.count()


    df=(df
        .fillna({'customer_code':'UNKN'})
        .withColumn(
            'discount', regexp_replace(col('discount'), '%', '').cast('float')
            )
        .withColumn(
            'date_key', date_format(col('date'), 'yyyyMMdd')
            )
    )

    # audit table 1
    spark.sql(f'''
          insert into silver.dim_audit (etl_version, notebook_name, user, numb_of_rows_processed, numb_of_rows_read, files_read, run_time)
          values ('{etl_version}', '{notebook_name}', '{user_name}', {numb_of_rows_processed}, {numb_of_rows_read}, {numb_of_files}, '{run_time}')
          ''')
    
    # # create temp view
    df.createOrReplaceTempView('sales')      

    # in case is a new cluster
    spark.sql('create database if not exists silver')
    spark.sql('''create table if not exists silver.sales_silver using delta
    location "/FileStore/mnt/test_bucket/silver/sales_silver"''')

        
    # insert into table and assign SKs
    spark.sql('''
            insert into silver.sales_silver (DateKey, ProductId, BrandId,  CustomerId,  AuditId ,  order_number,  order_line,  price, qty, discount_perc, cost, file_path, file_size, file_modification_time)
            select
                s.date_key as DateKey,
                pr.ProductId,
                br.BrandId,
                cus.CustomerId,
                (select max(AuditId) from silver.dim_audit) as AuditId,
                s.order_num,
                s.order_ln,
                s.price,
                s.qty,
                s.discount as discount_perc,
                s.cost,
                s.file_path,
                s.file_size,
                s.file_modification_time

            from sales s

            left join silver.products_silver pr
                on s.product_code=pr.product_code and pr.active_status != 'N'

            left join silver.brands_silver br
                on s.brand_code=br.brand_code and br.active_status != 'N'

            left join silver.customers_silver cus
                on s.customer_code=cus.customer_code and cus.active_status != 'N'
              ''')
 

    # audit table 2 # add run successfull column?!
    end_time=datetime.now()-start_run
    time_taken=float(f"{end_time.seconds}.{end_time.microseconds}")
    spark.sql(f'''
          update silver.dim_audit
          set seconds_taken = {time_taken}
          where AuditId=(select max(AuditId) from silver.dim_audit)
          ''')



    ### QUIZAS GUARDAR EN SILVER SALES TABLE CON IDS Q VIENENE Y ASIGNAR SKS EN LA GLD SALES TABEL , YA TENIENDO LA AUDIT TABLE COMPLETA?
    ### PRBLEMA Q HAAGO CON CDF, NO LA USO PARA FACT SALES?



    # send file/s to archive folder
    for f in dbutils.fs.ls(f'dbfs:{landing_path}{table_name}'):
        dbutils.fs.mv(f.path, os.path.join(f'dbfs:{archive_path}{table_name}', os.path.basename(f.path)))


    print('Load for Sales table completed.')
    
    

1


In [0]:
    spark.sql('create database if not exists silver')
    spark.sql('''create table if not exists silver.sales_silver using delta
    location "/FileStore/mnt/test_bucket/silver/sales_silver"''')

In [0]:
%sql
select * from gold.dim_calendar

Date_Key,Date,Day_Number,Month_Number,Year,Month,MMM_YYYY,Day_Name,Is_weekend
20230101,2023-01-01,1,1,2023,January,Jan-2023,Sunday,1
20230102,2023-01-02,2,1,2023,January,Jan-2023,Monday,0
20230103,2023-01-03,3,1,2023,January,Jan-2023,Tuesday,0
20230104,2023-01-04,4,1,2023,January,Jan-2023,Wednesday,0
20230105,2023-01-05,5,1,2023,January,Jan-2023,Thursday,0
20230106,2023-01-06,6,1,2023,January,Jan-2023,Friday,0
20230107,2023-01-07,7,1,2023,January,Jan-2023,Saturday,1
20230108,2023-01-08,8,1,2023,January,Jan-2023,Sunday,1
20230109,2023-01-09,9,1,2023,January,Jan-2023,Monday,0
20230110,2023-01-10,10,1,2023,January,Jan-2023,Tuesday,0


In [0]:
%sql
select c.Date, p.product_code, *
from silver.sales_silver s
join gold.dim_calendar c
on s.DateKey=c.Date_Key
join silver.products_silver p
on s.ProductId=p.ProductId

order by s.order_number, s.order_line;

Date,product_code,Id,DateKey,ProductId,BrandId,CustomerId,AuditId,order_number,order_line,price,qty,discount_perc,cost,file_path,file_size,file_modification_time,Date_Key,Date.1,Day_Number,Month_Number,Year,Month,MMM_YYYY,Day_Name,Is_weekend,ProductId.1,product_code.1,product_name,category,description,gender,color,size,file_path.1,file_size.1,file_modification_time.1,active_status,valid_from,valid_to
2022-06-05,PR0001,1,20220605,1,2,4,1,1001,1,50,1,0.0,32,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220605,2022-06-05,5,6,2022,June,Jun-2022,Sunday,1,1,PR0001,Product 1,shoes,running shoes,unisex,black,8,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-05,PR0002,2,20220605,2,2,1,1,1002,1,30,2,0.0,13,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220605,2022-06-05,5,6,2022,June,Jun-2022,Sunday,1,2,PR0002,Product 2,pants,long pants,female,white,42,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-06,PR0003,3,20220606,3,3,1,1,1003,1,25,1,0.0,10,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220606,2022-06-06,6,6,2022,June,Jun-2022,Monday,0,3,PR0003,Product 3,underwear,underwear male,male,red,M,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-06,PR0001,4,20220606,1,2,2,1,1004,1,35,1,0.0,32,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220606,2022-06-06,6,6,2022,June,Jun-2022,Monday,0,1,PR0001,Product 1,shoes,running shoes,unisex,black,8,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-06,PR0004,5,20220606,4,4,1,1,1005,1,26,3,10.0,11,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220606,2022-06-06,6,6,2022,June,Jun-2022,Monday,0,4,PR0004,Product 4,shoes,sport shoes,female,blue,7,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-06,PR0005,6,20220606,5,3,1,1,1005,2,17,1,0.0,8,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220606,2022-06-06,6,6,2022,June,Jun-2022,Monday,0,5,PR0005,Product 5,pants,long skinny,female,black,32,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-06,PR0001,7,20220606,1,2,1,1,1005,3,27,1,0.0,32,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220606,2022-06-06,6,6,2022,June,Jun-2022,Monday,0,1,PR0001,Product 1,shoes,running shoes,unisex,black,8,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-07,PR0004,8,20220607,4,4,3,1,1006,1,95,2,5.0,11,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220607,2022-06-07,7,6,2022,June,Jun-2022,Tuesday,0,4,PR0004,Product 4,shoes,sport shoes,female,blue,7,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-08,PR0006,9,20220608,6,5,4,1,1007,1,16,1,0.0,9,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220608,2022-06-08,8,6,2022,June,Jun-2022,Wednesday,0,6,PR0006,Product 6,accessories,accessories young girls,female,blue,UN,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31
2022-06-08,PR0002,10,20220608,2,2,4,1,1007,2,22,1,0.0,13,dbfs:/FileStore/mnt/test_bucket/landing/sales/2022_06_09_091745_sales.csv,541,2023-11-30T18:36:58.000+0000,20220608,2022-06-08,8,6,2022,June,Jun-2022,Wednesday,0,2,PR0002,Product 2,pants,long pants,female,white,42,dbfs:/FileStore/mnt/test_bucket/landing/products/2022_06_09_091705_products.csv,398,2023-11-30T18:36:52.000+0000,Y,2023-01-01,9999-12-31


In [0]:
df2=spark.read.table('silver.sales_silver')
display(df2)

In [0]:
df2.withColumn('date_key', date_format(col('order_date'), 'yyyyMMdd')).printSchema()

In [0]:
%sql
select * from silver.customers_silver;

CustomerId,customer_code,name,surname,email,country,city,address,county,state,zip,file_path,file_size,file_modification_time,active_status,valid_from,valid_to
4,99,Kris,Marrier,kris@gmail.com,US,Baltimore,228 Runamuck Pl #2808,Baltimore City,MD,21224,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_09_091657_customers.csv,345.0,2023-11-30T18:36:45.000+0000,N,2023-01-01,2023-11-29
2,150,Minna,Amigon,minna_amigon@yahoo.com,US,Kulpsville,2371 Jerrold Ave,Montgomery,PA,19443,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_09_091657_customers.csv,345.0,2023-11-30T18:36:45.000+0000,Y,2023-01-01,9999-12-31
3,215,Abel,Maclead,amaclead@gmail.com,US,Middle Island,37275 St Rt 17m M,Suffolk,NY,11953,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_09_091657_customers.csv,345.0,2023-11-30T18:36:45.000+0000,Y,2023-01-01,9999-12-31
8,99,Kris,Marrier,kris@gmail.com,US,Metairie,426 Wolf St,Jefferson,LA,21224,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_11_091504_customers.csv,150.0,2023-11-30T19:20:35.000+0000,N,2023-11-30,2023-11-29
13,99,Kris,Marrier,kris@gmail.com,US,Metairie,426 Wolf St,Jefferson,LA,21224,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_11_091504_customers.csv,150.0,2023-11-30T19:33:27.000+0000,Y,2023-11-30,9999-12-31
5,201,Cammy,Albares,calbares@gmail.com,US,"Rousseaux, Michael Esq",56 E Morehead St,Wayne,MI,48180,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_10_091536_customers.csv,173.0,2023-11-30T19:05:44.000+0000,Y,2023-11-30,9999-12-31
1,UNKN,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,,,Y,2023-01-01,9999-12-31


In [0]:
%sql
SELECT *
	FROM silver.customers_silver
	where	'2023-11-30' between valid_from and valid_to;

CustomerId,customer_code,name,surname,email,country,city,address,county,state,zip,file_path,file_size,file_modification_time,active_status,valid_from,valid_to
2,150,Minna,Amigon,minna_amigon@yahoo.com,US,Kulpsville,2371 Jerrold Ave,Montgomery,PA,19443,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_09_091657_customers.csv,345.0,2023-11-30T18:36:45.000+0000,Y,2023-01-01,9999-12-31
3,215,Abel,Maclead,amaclead@gmail.com,US,Middle Island,37275 St Rt 17m M,Suffolk,NY,11953,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_09_091657_customers.csv,345.0,2023-11-30T18:36:45.000+0000,Y,2023-01-01,9999-12-31
8,99,Kris,Marrier,kris@gmail.com,US,Metairie,426 Wolf St,Jefferson,LA,21224,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_11_091504_customers.csv,150.0,2023-11-30T19:20:35.000+0000,Y,2023-11-30,9999-12-31
5,201,Cammy,Albares,calbares@gmail.com,US,"Rousseaux, Michael Esq",56 E Morehead St,Wayne,MI,48180,dbfs:/FileStore/mnt/test_bucket/landing/customers/2022_06_10_091536_customers.csv,173.0,2023-11-30T19:05:44.000+0000,Y,2023-11-30,9999-12-31
1,UNKN,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,not applicable,,,Y,2023-01-01,9999-12-31
