In [42]:
!pip install pyspark



In [43]:
from pyspark.sql import SparkSession

# Create the new session using the most forceful configuration
spark = SparkSession.builder \
    .appName("JDBCFix") \
    .config("spark.driver.extraClassPath", '/kaggle/input/jarfile/sqlite-jdbc-3.45.1.0.jar') \
    .getOrCreate()

print(spark.sparkContext.getConf().get("spark.driver.extraClassPath"))

/kaggle/input/jarfile/sqlite-jdbc-3.45.1.0.jar


In [44]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from py4j.java_gateway import java_import
import sqlite3
import pandas as pd
import os

if os.path.exists('/kaggle/working/ecommerce.db'):
    print('Already DB Exists')
    pass
else:
    sqlite3.connect('ecommerce.db')

db_file_path = "jdbc:sqlite:/kaggle/working/ecommerce.db"
# Converted into a dataframe
def sqltodf(query):
    df=spark.read.format("jdbc") \
        .option("url",f"{db_file_path}") \
        .option("driver","org.sqlite.JDBC") \
        .option("dbtable",f"{query}").load()
    return df

#Dataframe written into the table
def writetodb(df_to_write,tbl_name):
    tbl_col=sqltodf(f'(select * from {tbl_name} where 1=0)')
    df_write = df_to_write.select(*tbl_col.columns)
    df_write.write.format("jdbc") \
        .option("url",f"{db_file_path}") \
        .option("driver","org.sqlite.JDBC") \
        .option("dbtable",f"{tbl_name}") \
        .mode("append") \
        .save()

#Directly query executed 
def excutequery(query):
    gateway = spark.sparkContext._gateway
    java_import(gateway.jvm, 'java.sql.DriverManager')
    connection = gateway.jvm.DriverManager.getConnection(db_file_path)
    statement = connection.createStatement()
    statement.executeUpdate(query)

# Remove the database and dataset
def remove_database_set(database_file_path):
    if os.path.exists(database_file_path):
        os.remove(database_file_path)
        print(f"Database file '{database_file_path}' has been removed.")
    else:
        print(f"The file '{database_file_path}' does not exist.")

<function __main__.remove_database_set(database_file_path)>

In [36]:
#remove_database_set('/kaggle/working/ecommerce.db')

Database file '/kaggle/working/ecommerce.db' has been removed.


In [46]:
query="""
    CREATE TABLE IF NOT EXISTS stg_customers (
        customer_id TEXT,
        customer_unique_id TEXT,
        customer_zip_code_prefix INTEGER,
        customer_city TEXT,
        customer_state TEXT,
        file_name Text,
        file_process_date Date
     )"""
excutequery(query)

query="""
    CREATE TABLE IF NOT EXISTS per_customers (
        customer_id TEXT,
        customer_unique_id TEXT,
        customer_zip_code_prefix INTEGER,
        customer_city TEXT,
        customer_state TEXT,
        insert_updt_flag Text,
        insert_updt_ts timestamp,
        active_flg Text,
        file_name Text,
        created_date Date
     )"""
excutequery(query)

In [67]:
#Check the tables
df=sqltodf(f"(select tbl_name from sqlite_master)")
df.show(truncate=False)

for tbl in df.collect():
    #excutequery(f"drop table {tbl['tbl_name']}")
    #excutequery(f""" delete from {tbl['tbl_name']} """)

+-------------+
|tbl_name     |
+-------------+
|stg_customers|
|per_customers|
+-------------+



In [68]:
df_src=spark.read.csv("/kaggle/input/ecommerce-files/olist_customers_dataset.csv",header=True,sep=',')
df_src=df_src.withColumn('file_name',lit('olist_customers_dataset'))
df_src=df_src.withColumn('file_process_date',lit(current_date()).cast(DateType()))
df_src.show(5,truncate=False)
print(df_src.count())
#df_src=df_src.limit(100)

+--------------------------------+--------------------------------+------------------------+---------------------+--------------+-----------------------+-----------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city        |customer_state|file_name              |file_process_date|
+--------------------------------+--------------------------------+------------------------+---------------------+--------------+-----------------------+-----------------+
|06b8999e2fba1a1fbc88172c00ba8bc7|861eff4711a542e4b93843c6dd7febb0|14409                   |franca               |SP            |olist_customers_dataset|2025-11-16       |
|18955e83d337fd6b2def6b18a428ac77|290c77bc529b7ac935b93aa66c333dc3|09790                   |sao bernardo do campo|SP            |olist_customers_dataset|2025-11-16       |
|4e7b3e00288586ebd08712fdd0374a03|060e732b5b29e8181a18229c7b0b2b5e|01151                   |sao paulo            |SP            |olist_custo

In [69]:
excutequery(""" delete from stg_customers """) 
writetodb(df_src,'stg_customers')

25/11/16 07:39:16 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8
25/11/16 07:39:16 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8
25/11/16 07:39:16 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8
                                                                                

In [70]:
sqltodf(f"(select cast(count(*) as int) from stg_customers)").show()
sqltodf(f"(select * from stg_customers)").show(5)

+---------------------+
|cast(count(*) as int)|
+---------------------+
|                99441|
+---------------------+

+--------------------+--------------------+------------------------+--------------------+--------------+--------------------+-----------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|           file_name|file_process_date|
+--------------------+--------------------+------------------------+--------------------+--------------+--------------------+-----------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|olist_customers_d...|       2025-11-16|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|olist_customers_d...|       2025-11-16|
|4e7b3e00288586ebd...|060e732b5b29e8181...|                    1151|           sao paulo|            SP|olist_customers_d...|       2025-11-16|
|b2b6027bc5c510

In [71]:
sqltodf(f"(select cast(count(*) as int) from per_customers)").show()
sqltodf(f"(select * from per_customers)").show(5)

+---------------------+
|cast(count(*) as int)|
+---------------------+
|                    0|
+---------------------+

+-----------+------------------+------------------------+-------------+--------------+----------------+--------------+----------+---------+------------+
|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|insert_updt_flag|insert_updt_ts|active_flg|file_name|created_date|
+-----------+------------------+------------------------+-------------+--------------+----------------+--------------+----------+---------+------------+
+-----------+------------------+------------------------+-------------+--------------+----------------+--------------+----------+---------+------------+



In [72]:
# SCD Type 2 - Active flg as N if existing records not matched

query= """
UPDATE per_customers as tgt
set active_flg='N',
insert_updt_flag='U',
insert_updt_ts=current_timestamp
where exists (
    select 1 from stg_customers as stg 
    where stg.customer_unique_id=tgt.customer_unique_id 
    and stg.customer_id=tgt.customer_id
    and (
        COALESCE(cast(stg.customer_zip_code_prefix as int),'a')<>COALESCE(cast(tgt.customer_zip_code_prefix as int),'a') or
        COALESCE(stg.customer_city,'a') <> COALESCE(tgt.customer_city,'a') or
        COALESCE(stg.customer_state,'a') <> COALESCE(tgt.customer_state,'a')
    )
)
"""
excutequery(query)

In [73]:
# Inserted the new record and the updated record
query= """
INSERT INTO per_customers (customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,insert_updt_flag,
insert_updt_ts,active_flg,file_name,created_date)
with per as (
select tgt.*,
row_number()over(partition by tgt.customer_unique_id, tgt.customer_id order by tgt.insert_updt_ts desc) as rnk
from per_customers tgt
    where exists (
        select 1 from stg_customers as stg 
        where stg.customer_unique_id=tgt.customer_unique_id 
        and stg.customer_id=tgt.customer_id
    )
)
select stg.customer_id,stg.customer_unique_id,stg.customer_zip_code_prefix,stg.customer_city,stg.customer_state,
    case when per.customer_unique_id is null then 'I' else 'U' end as insert_updt_flag,
    current_timestamp as insert_updt_ts, 'Y' as active_flg,stg.file_name,
    case when per.customer_unique_id is null then current_timestamp else per.created_date end as created_date 
from stg_customers stg left join per 
on stg.customer_unique_id=per.customer_unique_id 
and stg.customer_id=per.customer_id 
and per.rnk=1
where (
        COALESCE(cast(stg.customer_zip_code_prefix as int),'a')<>COALESCE(cast(per.customer_zip_code_prefix as int),'a') or
        COALESCE(stg.customer_city,'a') <> COALESCE(per.customer_city,'a') or
        COALESCE(stg.customer_state,'a') <> COALESCE(per.customer_state,'a')
    ) or per.customer_unique_id is null

"""
excutequery(query)

In [74]:
query=f"""(
    select insert_updt_ts,active_flg,insert_updt_flag, cast(count(*) as int) as cnt 
    from per_customers group by insert_updt_ts,active_flg,insert_updt_flag
)"""
sqltodf(query).show(5)

# Per Table is empty, so all records are inserted

+-------------------+----------+----------------+--------------------+
|     insert_updt_ts|active_flg|insert_updt_flag|                 cnt|
+-------------------+----------+----------------+--------------------+
|2025-11-16 07:40:13|         Y|               I|99441.00000000000...|
+-------------------+----------+----------------+--------------------+



In [75]:
sqltodf(f"(select cast(count(*) as int) from per_customers)").show()
sqltodf(f"(select * from per_customers)").show(5)

+---------------------+
|cast(count(*) as int)|
+---------------------+
|                99441|
+---------------------+

+--------------------+--------------------+------------------------+--------------------+--------------+----------------+-------------------+----------+--------------------+------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|insert_updt_flag|     insert_updt_ts|active_flg|           file_name|created_date|
+--------------------+--------------------+------------------------+--------------------+--------------+----------------+-------------------+----------+--------------------+------------+
|06b8999e2fba1a1fb...|861eff4711a542e4b...|                   14409|              franca|            SP|               I|2025-11-16 07:40:13|         Y|olist_customers_d...|  2025-11-16|
|18955e83d337fd6b2...|290c77bc529b7ac93...|                    9790|sao bernardo do c...|            SP|               I|2025-11-16

In [76]:
df_src=spark.read.csv("/kaggle/input/ecommerce-file/olist_customers_dataset_20251116.csv",header=True,sep=',')
df_src=df_src.withColumn('file_name',lit('olist_customers_dataset_20251116'))
df_src=df_src.withColumn('file_process_date',lit(current_date()).cast(DateType()))
df_src.show(5,truncate=False)
print(df_src.count())
excutequery(""" delete from stg_customers """) 
writetodb(df_src,'stg_customers')

+--------------------------------+--------------------------------+------------------------+---------------------+--------------+--------------------------------+-----------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city        |customer_state|file_name                       |file_process_date|
+--------------------------------+--------------------------------+------------------------+---------------------+--------------+--------------------------------+-----------------+
|06b8999e2fba1a1fbc88172c00ba8bc7|861eff4711a542e4b93843c6dd7febb0|14419                   |franca               |SP            |olist_customers_dataset_20251116|2025-11-16       |
|18955e83d337fd6b2def6b18a428ac77|290c77bc529b7ac935b93aa66c333dc3|09791                   |sao bernardo do campo|SP            |olist_customers_dataset_20251116|2025-11-16       |
|4e7b3e00288586ebd08712fdd0374a03|060e732b5b29e8181a18229c7b0b2b5e|01152                   |sao

25/11/16 07:40:44 WARN JdbcUtils: Requested isolation level 1 is not supported; falling back to default isolation level 8


In [77]:
# SCD Type 2 - Active flg as N if existing records not matched

query= """
UPDATE per_customers as tgt
set active_flg='N',
insert_updt_flag='U',
insert_updt_ts=current_timestamp
where exists (
    select 1 from stg_customers as stg 
    where stg.customer_unique_id=tgt.customer_unique_id 
    and stg.customer_id=tgt.customer_id
    and (
        COALESCE(cast(stg.customer_zip_code_prefix as int),'a')<>COALESCE(cast(tgt.customer_zip_code_prefix as int),'a') or
        COALESCE(stg.customer_city,'a') <> COALESCE(tgt.customer_city,'a') or
        COALESCE(stg.customer_state,'a') <> COALESCE(tgt.customer_state,'a')
    )
)
"""
excutequery(query)

In [83]:
# Inserted the new record and the updated record
query= """
INSERT INTO per_customers (customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,insert_updt_flag,
insert_updt_ts,active_flg,file_name,created_date)
with per as (
select tgt.*,
row_number()over(partition by tgt.customer_unique_id, tgt.customer_id order by tgt.insert_updt_ts desc) as rnk
from per_customers tgt
    where exists (
        select 1 from stg_customers as stg 
        where stg.customer_unique_id=tgt.customer_unique_id 
        and stg.customer_id=tgt.customer_id
    )
)
select stg.customer_id,stg.customer_unique_id,stg.customer_zip_code_prefix,stg.customer_city,stg.customer_state,
    case when per.customer_unique_id is null then 'I' else 'U' end as insert_updt_flag,
    current_timestamp as insert_updt_ts, 'Y' as active_flg,stg.file_name,
    case when per.customer_unique_id is null then current_timestamp else per.created_date end as created_date 
from stg_customers stg left join per 
on stg.customer_unique_id=per.customer_unique_id 
and stg.customer_id=per.customer_id 
and per.rnk=1
where (
        COALESCE(cast(stg.customer_zip_code_prefix as int),'a')<>COALESCE(cast(per.customer_zip_code_prefix as int),'a') or
        COALESCE(stg.customer_city,'a') <> COALESCE(per.customer_city,'a') or
        COALESCE(stg.customer_state,'a') <> COALESCE(per.customer_state,'a')
    ) or per.customer_unique_id is null

"""
excutequery(query)

In [84]:
query=f"""(
    select insert_updt_ts,active_flg,insert_updt_flag, cast(count(*) as int) as cnt 
    from per_customers group by insert_updt_ts,active_flg,insert_updt_flag
)"""
sqltodf(query).show(10)

sqltodf(f"(select cast(count(*) as int) from per_customers)").show()
sqltodf(f"(select * from per_customers)").show(5)

+-------------------+----------+----------------+--------------------+
|     insert_updt_ts|active_flg|insert_updt_flag|                 cnt|
+-------------------+----------+----------------+--------------------+
|2025-11-16 07:40:13|         Y|               I|99437.00000000000...|
|2025-11-16 07:40:51|         N|               U|4.000000000000000000|
|2025-11-16 07:41:16|         Y|               I|2.000000000000000000|
|2025-11-16 07:42:06|         Y|               U|4.000000000000000000|
+-------------------+----------+----------------+--------------------+

+---------------------+
|cast(count(*) as int)|
+---------------------+
|                99447|
+---------------------+

+--------------------+--------------------+------------------------+--------------------+--------------+----------------+-------------------+----------+--------------------+------------+
|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|insert_updt_flag|  

In [85]:
query=f"""(
    select *
    from per_customers where insert_updt_flag='U' order by customer_id desc
)"""
sqltodf(query).show(10,truncate=False)

+--------------------------------+--------------------------------+------------------------+---------------------+--------------+----------------+-------------------+----------+--------------------------------+------------+
|customer_id                     |customer_unique_id              |customer_zip_code_prefix|customer_city        |customer_state|insert_updt_flag|insert_updt_ts     |active_flg|file_name                       |created_date|
+--------------------------------+--------------------------------+------------------------+---------------------+--------------+----------------+-------------------+----------+--------------------------------+------------+
|b2b6027bc5c5109e529d4dc6358b12c3|259dac757896d24d7702b9acbbff3f3c|8775                    |mogi das cruzes      |SP            |U               |2025-11-16 07:40:51|N         |olist_customers_dataset         |2025-11-16  |
|b2b6027bc5c5109e529d4dc6358b12c3|259dac757896d24d7702b9acbbff3f3c|8775                    |mogi das cru

In [88]:
query=f"""(
    select customer_unique_id,customer_id,count(*)
    from per_customers where active_flg='Y'
    group by customer_unique_id,customer_id
    having count(*)>1
)"""
sqltodf(query).show(10,truncate=False)

+------------------+-----------+--------+
|customer_unique_id|customer_id|count(*)|
+------------------+-----------+--------+
+------------------+-----------+--------+



In [9]:
# Per Table is empty - 1st time insert
query= """
INSERT INTO per_customers select customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state,
'I' as insert_updt_flag,current_timestamp as insert_updt_ts,'Y' as active_flg,file_name,current_timestamp as created_date from stg_customers
"""
excutequery(query) 