In [1]:
import os
import schedule
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
spark = SparkSession.builder \
                    .config("spark.jar", "C:\\Users\\thehu\\OneDrive\\Mia_town\\IT\\Data\\DE\\study_de\\postgresql-42.7.5.jar") \
                    .config("spark.driver.memory", "8g") \
                    .getOrCreate()

#DATABASE
host = 'localhost'
dbname = 'miatown'
user = 'postgres'
password = input('hay nhap pass')
port = '5432'
driver = "org.postgresql.Driver"

conn = psycopg2.connect(f'host= {host} \
                        dbname= {dbname} \
                        user= {user} \
                        password= {password} \
                        ')

#set commit automaticaly
conn.set_session(autocommit=True)
cur = conn.cursor()


#PATH
path = 'C:\\Users\\thehu\\OneDrive\\Mia_town\\IT\\Data\\MIATOWN\\raw\\'

### SOURCE CODE

In [None]:
# #CREATE TABLE
# conn = psycopg2.connect(f'host= {host} \
#                         dbname= {dbname} \
#                         user= {user} \
#                         password= {password} \
#                         ')

# #set commit automaticaly
# conn.set_session(autocommit=True)
# cur = conn.cursor()
def create_database():
    try:
        cur.execute("""
                    CREATE TABLE IF NOT EXISTS transaction(
                    area VARCHAR,
                    sub_area VARCHAR,
                    table_id VARCHAR,
                    table_name VARCHAR,
                    trans_id VARCHAR,
                    amount_origin INT,
                    voucher_amount_paid INT,
                    total_amount INT,
                    trans_date DATE,
                    voucher_name VARCHAR,
                    customer_name VARCHAR,
                    customer_phone VARCHAR);
                    
                    CREATE TABLE IF NOT EXISTS product(
                    trans_id VARCHAR,
                    trans_date DATE,
                    item_name VARCHAR,
                    category VARCHAR,
                    quantity FLOAT,
                    amount INT,
                    amount_discount_on_price INT);

                    """)
    except psycopg2.Error as e:
        print('error in creating table, table is existed')
    return print("Database work finished")


def data_extract(path):
    transaction_combine = None 
    print('----------  combine file  ----------')
    print('----------  create transaction data  ----------')
    json_file = [f'{path}{file}' for file in os.listdir(path) if file.endswith('.json')]        #print a list of exactly json file from folder
    for file in json_file:
        print(f'----------  Selecting from {file}  ----------')
        raw = spark.read.json(file)
        transaction = (
                raw.withColumn('customer_name', col('extra_data.customer_name')) \
                    .withColumn('customer_phone', col('extra_data.customer_phone')) \
                    .withColumn('trans_id', col('tran_id')) \
                    # .withColumn('trans_id', col('sale_detail').getItem(0).getItem('tran_id')) \
                    .withColumn("trans_date", (from_unixtime(col("created_at") / 1000)).cast('timestamp') - expr("INTERVAL 7 HOURS")) #set timezone manually
                    .withColumn("area", 
                                when((col("table_name").contains("CAFE KIDS")) | (col("table_name").contains("QUẦY VÉ")), "KID") \
                                .when(col("table_name").contains("BIDA"), "BIDA") \
                                .when(col("table_name").contains("PS5"), "GAMING") \
                                .otherwise(col("table_name"))
                                )
                    .withColumn("sub_area",
                                when(col("table_name").contains("NHÂN VIÊN"), "NHÂN VIÊN") \
                                .when(col("table_name").contains("BIDA LỖ"), "BIDA LỖ") \
                                .when(col("table_name").contains("BIDA BĂNG"), "BIDA BĂNG") \
                                .when(col("table_name").contains("BIDA LIBRE"), "BIDA LIBRE") \
                                .when(col("table_name").contains("ROOM"), "ROOM") \
                                .when(col("table_name").contains("GHẾ"), "GHẾ") \
                                .when(col("table_name").contains("CAFE KIDS"), "CAFE KIDS") \
                                .when(col("table_name").contains("QUẦY VÉ"), "QUẦY VÉ")
                                .otherwise(col("table_name"))
                                )
        )
        transaction = transaction.select(
                    'area',
                    'sub_area',
                    'table_id',
                    'table_name', 
                    'trans_id', 
                    'amount_origin',
                    'voucher_amount_paid', 
                    'total_amount', 
                    'trans_date', 
                    'voucher_name', 
                    'customer_name',
                    'customer_phone' )
        print(f'----------  Union {file}  ----------')
        transaction_combine = transaction if transaction_combine is None else transaction_combine.union(transaction)
    print('----------  DONE  ----------')
    

    product_combine = None 
    print('----------  combine file  ----------')
    print('----------  CREATE PRODUCT DATA  ----------')
    json_file = [f'{path}{file}' for file in os.listdir(path) if file.endswith('.json')]        #print a list of exactly json file from folder
    for file in json_file:
        print(f'----------  Selecting from {file}  ----------')
        raw = spark.read.json(file)
        raw = raw.withColumn("sale", explode(col("sale_detail")))
        raw = raw.withColumn("topping", explode_outer(col("sale.toppings")))
                
        product = raw.select(
                                col("tran_id").alias("trans_id"),
                                ((from_unixtime(col("created_at") / 1000)).cast('timestamp') - expr("INTERVAL 7 HOURS")).alias("trans_date"),
                                "sale.item_name",
                                when(col("sale.item_name").contains("COMBO"), "COMBO").otherwise(col("sale.item_class_name")).alias("category"),
                                "sale.quantity",
                                "sale.amount",
                                "sale.amount_discount_on_price"
                                )

        topping = raw.select(
                                col("tran_id").alias("trans_id"),
                                ((from_unixtime(col("created_at") / 1000)).cast('timestamp') - expr("INTERVAL 7 HOURS")).alias("trans_date"),
                                "topping.item_name",
                                when(col("sale.item_name").contains("COMBO"), "COMBO").otherwise(col("sale.item_class_name")).alias("category"),
                                "topping.quantity",
                                "topping.amount",
                                "topping.amount_discount_on_price"
                    )
        windowSpec = Window.partitionBy("trans_id", "item_name").orderBy(col("trans_id"))
        product = product.withColumn("row_num", row_number().over(windowSpec))
        topping = topping.withColumn("row_num", row_number().over(windowSpec))

        #UPDATE LẠI SỐ LƯỢNG COMBO
        combo_qty = product.groupBy("trans_id").agg(max(col("row_num")))
        product = product.join(combo_qty, "trans_id", "left") 
        product = product.withColumn(
                                    "quantity", 
                                    when(col("item_name").contains("COMBO"),
                                            when(col("max(row_num)") == 1, col("quantity")).otherwise(                                                                            
                                            when((col("max(row_num)") % 2 != 0) & (col("max(row_num)") > 1), col("max(row_num)") / 3).otherwise(col("max(row_num)") / 2)
                                            )
                                        ).otherwise(col("quantity"))
            ) \
                        .withColumn(
                            "amount",
                            when(col("item_name").contains("COMBO"), col("quantity")*col("amount")).otherwise(col("amount"))
                        )
        
        #Filtering
        product = product.filter((col("item_name") != "COMBO SÁNG BIDA") | (col("row_num") == 1))
        product = product.drop("max(row_num)")
        topping = topping.filter(col("topping.item_name").isNotNull())
        final_product = product.union(topping)
        final_product = final_product.drop("row_num")

        print(f'----------  Union {file}  ----------')
        product_combine = final_product if product_combine is None else product_combine.union(final_product)   

    print('----------  DONE  ----------')

    return transaction_combine, product_combine


def load_data_to_database(transaction_combine, product_combine): ## ĐỂ Ý LẠI LÚC 2 BẢNG CHÊNH LỆCH DATA KHI IMPORT VÀO, TỐI ƯU LẠI 
    print('Connect to database')
    url = f"jdbc:postgresql://{host}:{port}/{dbname}"
    properties = {
        "user": f"{user}",
        "password": f"{password}",
        "driver": f"{driver}"
    }
    print('Completed')

    print("Checking database")
    transaction_query = "(SELECT COUNT(*) AS row_count FROM transaction) as temp"
    count_old_data_trans = spark.read.jdbc(url, transaction_query, properties=properties)
    transaction_row_count = count_old_data_trans.collect()[0]["row_count"]
    old_data_trans = spark.read.jdbc(url ,"(SELECT trans_id FROM transaction)", properties = properties)

    if transaction_row_count == 0:
        print("Table 'transaction' is empty. Appending new data...")
        # Append data
        transaction_combine.write.jdbc(url, "transaction", mode = 'append', properties = properties)
        print("Data successfully appended to 'TRANSACTION' table.")

    else:
        print(f"Table 'transaction' already contains {transaction_row_count} rows. Checking for new data.")
        transaction_combine.createOrReplaceTempView("new_data_trans")
        old_data_trans.createOrReplaceTempView("old_data_trans")
        trans_checking = spark.sql( """
                SELECT trans_id from new_data_trans
                EXCEPT
                SELECT trans_id from old_data_trans
    """)
        new_trans_data = spark.sql("""
                                   with id AS(
                                    SELECT trans_id from new_data_trans
                                    EXCEPT
                                    SELECT trans_id from old_data_trans
                                   )
                                   SELECT ndt.* 
                                   FROM id i
                                   INNER JOIN new_data_trans ndt
                                   ON i.trans_id = ndt.trans_id
    """)
        if trans_checking.count() == 0:
            print("No new data, finished process")
        else:
            print(f"There's {trans_checking.count()} new data in transaction, appending...." )
            new_trans_data.write.jdbc(url, "transaction", mode = 'append', properties = properties )
            print('Completed')


    product_query = "(SELECT COUNT(*) AS row_count FROM product) as temp_"
    count_old_data_prod = spark.read.jdbc(url, product_query, properties=properties)
    product_row_count = count_old_data_prod.collect()[0]["row_count"]
    old_data_prod = spark.read.jdbc(url ,"(SELECT trans_id FROM product)", properties = properties)

    if product_row_count == 0:
        print("Table 'product' is empty. Appending new data...")
        # Append data
        product_combine.write.jdbc(url, "product", mode = 'append', properties = properties)
        print("Data successfully appended to 'PRODUCT' table.")

    else:
        print(f"Table 'PRODUCT' already contains {product_row_count} rows. Checking for new data.")
        product_combine.createOrReplaceTempView("new_data_prod")
        old_data_prod.createOrReplaceTempView("old_data_prod")
        prod_checking = spark.sql( """
                SELECT trans_id from new_data_prod
                EXCEPT
                SELECT trans_id from old_data_prod
    """)
        new_prod_data = spark.sql("""
                                   with id AS(
                                    SELECT trans_id from new_data_prod
                                    EXCEPT
                                    SELECT trans_id from old_data_prod
                                   )
                                   SELECT ndp.* 
                                   FROM id i
                                   INNER JOIN new_data_prod ndp
                                   ON i.trans_id = ndp.trans_id
    """)
        if prod_checking.count() == 0:
            print("No new data, finished process")
        else:
            print(f"There's {prod_checking.count()} new data in product, appending...." )
            new_prod_data.write.jdbc(url, "product", mode = 'append', properties = properties )
            print('Completed')
    print("----------EXTRACT COMPLETE----------")
    # return new_trans_data, new_prod_data



def main_task(path):
    create_database()
    transaction_combine, product_combine = data_extract(path)
    result = load_data_to_database(transaction_combine, product_combine)
    print("Finished")
    

    return result




In [48]:
path = 'C:\\Users\\thehu\\OneDrive\\Mia_town\\IT\\Data\\MIATOWN\\raw\\'
main_task(path)

## CHECK LẠI CÁI UNION TRONG FUNC COMBINE


Database work finished
----------  combine file  ----------
----------  create transaction data  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_18t05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_18t05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_19_05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_19_05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_1_17t05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_1_17t05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_t01_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_t01_2025.json  ----------
----------  Selecting fr

In [44]:
transaction_combine.filter(col("trans_id").contains("TIDM1")).show()

+----+--------+----------+---------------+--------------------+-------------+-------------------+------------+-------------------+------------+-------------+--------------+
|area|sub_area|  table_id|     table_name|            trans_id|amount_origin|voucher_amount_paid|total_amount|         trans_date|voucher_name|customer_name|customer_phone|
+----+--------+----------+---------------+--------------------+-------------+-------------------+------------+-------------------+------------+-------------+--------------+
|BIDA| BIDA LỖ|TABLE-1M2D|Bàn 9 - BIDA LỖ|4PW46ZJ6VPRKF3AKY...|     165945.0|                0.0|    165945.0|2025-04-01 05:28:42|            |    Giang Kuu|   84931915988|
+----+--------+----------+---------------+--------------------+-------------+-------------------+------------+-------------------+------------+-------------+--------------+



In [4]:
# import schedule
# import time
# import os

# def run_etl():
#     print("Starting ETL Process...")
#     os.system("python C:\\Users\\thehu\\OneDrive\\Mia_town\\IT\\Data\\ETL\\etl_script.py")
#     print("ETL Process Completed.")

# # Schedule the job to run daily at 11:30 AM
# schedule.every().day.at("11:30").do(run_etl)

# while True:
#     schedule.run_pending()
#     time.sleep(60)  # Check every minute

In [None]:
# product_combine.filter(col("item_name").contains("COMBO SÁNG")).show()

+--------------------+-------------------+---------------+--------+--------+--------+------------------------+-------+
|             tran_id|         trans_date|      item_name|category|quantity|  amount|amount_discount_on_price|row_num|
+--------------------+-------------------+---------------+--------+--------+--------+------------------------+-------+
|4PW46ZJ6VPRK20SFX...|2025-05-02 09:09:14|COMBO SÁNG BIDA|   COMBO|     1.0| 58000.0|                       0|      1|
|4PW46ZJ6VPRKK7WGT...|2025-05-02 09:58:49|COMBO SÁNG BIDA|   COMBO|     1.0| 58000.0|                       0|      1|
|4PW46ZJ6VPRKK7WH2...|2025-05-02 10:02:46|COMBO SÁNG BIDA|   COMBO|     1.0| 58000.0|                       0|      1|
|P63JDN7MGD2P7A1LN...|2025-05-02 14:12:35|COMBO SÁNG BIDA|   COMBO|     4.0|232000.0|                       0|      1|
+--------------------+-------------------+---------------+--------+--------+--------+------------------------+-------+



In [35]:
def load_data_to_database(transaction_combine, product_combine): ## ĐỂ Ý LẠI LÚC 2 BẢNG CHÊNH LỆCH DATA KHI IMPORT VÀO, TỐI ƯU LẠI 
    print('Connect to database')
    url = f"jdbc:postgresql://{host}:{port}/{dbname}"
    properties = {
        "user": f"{user}",
        "password": f"{password}",
        "driver": f"{driver}"
    }
    print('Completed')

    print("Checking database")
    transaction_query = "(SELECT COUNT(*) AS row_count FROM transaction) as temp"
    count_old_data_trans = spark.read.jdbc(url, transaction_query, properties=properties)
    transaction_row_count = count_old_data_trans.collect()[0]["row_count"]
    old_data_trans = spark.read.jdbc(url ,"(SELECT trans_id FROM transaction)", properties = properties)

    if transaction_row_count == 0:
        print("Table 'transaction' is empty. Appending new data...")
        # Append data
        transaction_combine.write.jdbc(url, "transaction", mode = 'append', properties = properties)
        print("Data successfully appended to 'TRANSACTION' table.")

    else:
        print(f"Table 'transaction' already contains {transaction_row_count} rows. Checking for new data.")
        transaction_combine.createOrReplaceTempView("new_data_trans")
        old_data_trans.createOrReplaceTempView("old_data_trans")
        trans_checking = spark.sql( """
                SELECT trans_id from new_data_trans
                EXCEPT
                SELECT trans_id from old_data_trans
    """)
        new_trans_data = spark.sql("""
                                   with id AS(
                                    SELECT trans_id from new_data_trans
                                    EXCEPT
                                    SELECT trans_id from old_data_trans
                                   )
                                   SELECT ndt.* 
                                   FROM id i
                                   INNER JOIN new_data_trans ndt
                                   ON i.trans_id = ndt.trans_id
    """)
        if trans_checking.count() == 0:
            print("No new data, finished process")
        else:
            print(f"There's {trans_checking.count()} new data in transaction, appending...." )
            new_trans_data.write.jdbc(url, "transaction", mode = 'append', properties = properties )
            print('Completed')


    product_query = "(SELECT COUNT(*) AS row_count FROM product) as temp_"
    count_old_data_prod = spark.read.jdbc(url, product_query, properties=properties)
    product_row_count = count_old_data_prod.collect()[0]["row_count"]
    old_data_prod = spark.read.jdbc(url ,"(SELECT trans_id FROM product)", properties = properties)

    if product_row_count == 0:
        print("Table 'product' is empty. Appending new data...")
        # Append data
        product_combine.write.jdbc(url, "product", mode = 'append', properties = properties)
        print("Data successfully appended to 'PRODUCT' table.")

    else:
        print(f"Table 'PRODUCT' already contains {product_row_count} rows. Checking for new data.")
        product_combine.createOrReplaceTempView("new_data_prod")
        old_data_prod.createOrReplaceTempView("old_data_prod")
        prod_checking = spark.sql( """
                SELECT trans_id from new_data_prod
                EXCEPT
                SELECT trans_id from old_data_prod
    """)
        new_prod_data = spark.sql("""
                                   with id AS(
                                    SELECT trans_id from new_data_prod
                                    EXCEPT
                                    SELECT trans_id from old_data_prod
                                   )
                                   SELECT ndp.* 
                                   FROM id i
                                   INNER JOIN new_data_prod ndp
                                   ON i.trans_id = ndp.trans_id
    """)
        if prod_checking.count() == 0:
            print("No new data, finished process")
        else:
            print(f"There's {prod_checking.count()} new data in product, appending...." )
            new_prod_data.write.jdbc(url, "product", mode = 'append', properties = properties )
            print('Completed')
    print("----------EXTRACT COMPLETE----------")
    return new_trans_data, new_prod_data

In [36]:
path = 'C:\\Users\\thehu\\OneDrive\\Mia_town\\IT\\Data\\MIATOWN\\raw\\'
transaction_combine, product_combine = data_extract(path)



new_trans_data, new_prod_data = load_data_to_database(transaction_combine, product_combine)

----------  combine file  ----------
----------  create transaction data  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_18t05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_18t05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_19_05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_19_05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_1_17t05_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_1_17t05_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_t01_2025.json  ----------
----------  Union C:\Users\thehu\OneDrive\Mia_town\IT\Data\MIATOWN\raw\bida_t01_2025.json  ----------
----------  Selecting from C:\Users\thehu\OneDr

Py4JJavaError: An error occurred while calling o34951.jdbc.
: org.postgresql.util.PSQLException: ERROR: relation "product" does not exist
  Position: 50
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2733)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2420)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:517)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:434)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:194)
	at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:137)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:249)
	at jdk.internal.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:842)


In [None]:
new_prod_data.show()

+--------+----------+---------+--------+--------+------+------------------------+
|trans_id|trans_date|item_name|category|quantity|amount|amount_discount_on_price|
+--------+----------+---------+--------+--------+------+------------------------+
+--------+----------+---------+--------+--------+------+------------------------+

