In [81]:
from pyspark.sql import SparkSession

In [82]:
spark = SparkSession.builder.appName("Daily transactions processing") \
            .config("spark.jars", "postgresql-42.6.2.jar") \
            .getOrCreate()

# Thiết lập thông tin kết nối JDBC
# spark.conf.set("spark.driver.extraClassPath", "postgresql-42.6.2.jar")  # Đường dẫn tới PostgreSQL JDBC driver
# spark.conf.set("spark.executor.extraClassPath", "postgresql-42.6.2.jar")  # Đường dẫn tới PostgreSQL JDBC driver


In [83]:
# spark.stop()

In [84]:
#khai báo có 2 cách
url = 'jdbc:postgresql://localhost:5432/postgres?currentSchema=dwh'

### cách 1
properties = {
    "user": "postgres",
    "password": "login1234",
    "driver": "org.postgresql.Driver"
    # "schema": "dwh"  # Đặt schema ở đây
}

### lấy dữ liệu transaction
acc_table = 'account'
trans_table = 'transaction'
currency_table = 'currency'
product_table = 'product'
booking_date = '2024-01-31'

query = f"""
    (
    SELECT a.account_number , a.customer_id , a.opening_date , a.product_id , p.product_name , a.currency , c.rate , t.trans_amount , t.channel , t.booking_date 
    from {acc_table} a 
    join {trans_table} t  
    on a.account_number = t.account_number
    join {currency_table} c 
    on a.currency = c.currency 
    join {product_table} p 
    on a.product_id = p.product_id 
    where booking_date = '{booking_date}'
) as subquery
"""
df = spark.read.jdbc(url= url, table = query, properties= properties)

### cách 2
# trans_df = spark.read.format("jdbc").\
#     options(url=url,
#      dbtable='transaction',
#      user='postgres',
#      password='login1234',
#      driver='org.postgresql.Driver').\
#     load()

df.show()

+--------------+-----------+------------+----------+--------------------+--------+-------+------------+-------+------------+
|account_number|customer_id|opening_date|product_id|        product_name|currency|   rate|trans_amount|channel|booking_date|
+--------------+-----------+------------+----------+--------------------+--------+-------+------------+-------+------------+
|  744761900043|   58000981|  2022-02-14|      1001|Tai khoan thong t...|     VND|    1.0|      5127.0|EWALLET|  2024-01-31|
|  555286461200|   60035164|  2023-03-23|      1001|Tai khoan thong t...|     VND|    1.0|      4509.0|     MB|  2024-01-31|
|  539668218911|   89506712|  2023-06-29|      1001|Tai khoan thong t...|     VND|    1.0|      2628.0|     MB|  2024-01-31|
|  482534465189|   25136560|  2023-05-24|      1001|Tai khoan thong t...|     VND|    1.0|     -9003.0|     MB|  2024-01-31|
|  119790767773|   23819437|  2022-04-03|      1001|Tai khoan thong t...|     VND|    1.0|      7180.0|     IB|  2024-01-31|


In [85]:
df.printSchema()

root
 |-- account_number: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- opening_date: date (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- rate: double (nullable = true)
 |-- trans_amount: double (nullable = true)
 |-- channel: string (nullable = true)
 |-- booking_date: date (nullable = true)



In [86]:
account = f"""
(SELECT * from {acc_table}) as subquery
"""
acc_df = spark.read.jdbc(url= url, table = account, properties= properties)
acc_df.show()


+--------------+-----------+------------+----------+--------+
|account_number|customer_id|opening_date|product_id|currency|
+--------------+-----------+------------+----------+--------+
|  744761900043|   58000981|  2022-02-14|      1001|     VND|
|  555286461200|   60035164|  2023-03-23|      1001|     VND|
|  539668218911|   89506712|  2023-06-29|      1001|     VND|
|  482534465189|   25136560|  2023-05-24|      1001|     VND|
|  119790767773|   23819437|  2022-04-03|      1001|     VND|
|  691818423482|   22193436|  2022-11-06|      1001|     VND|
|  722018019543|   45327340|  2022-06-13|      1001|     VND|
|  646298294265|   61795072|  2024-01-13|      1001|     VND|
|  728044457443|   55877657|  2023-09-17|      1023|     VND|
|  016035655440|   38322959|  2020-07-08|      1001|     VND|
|  632472381170|   19657954|  2024-03-04|      1001|     VND|
|  370463821619|   70721257|  2020-08-18|      1001|     VND|
|  518435934032|   80618324|  2022-10-06|      1023|     VND|
|  36535

In [87]:
currency = f"""
(SELECT * from {currency_table}) as subquery
"""
currency_df = spark.read.jdbc(url= url, table = currency, properties= properties)
currency_df = currency_df.withColumnRenamed('currency', 'currency_code')
currency_df.show()


+-------------+-------+
|currency_code|   rate|
+-------------+-------+
|          EUR|27156.0|
|          USD|24595.0|
|          VND|    1.0|
+-------------+-------+



In [88]:
product = f"""
(SELECT * from {product_table}) as subquery
"""
product_df = spark.read.jdbc(url= url, table = product, properties= properties)
product_df = product_df.withColumnRenamed('product_id', 'product_code')
product_df.show()


+------------+--------------------+
|product_code|        product_name|
+------------+--------------------+
|        1001|Tai khoan thong t...|
|        1023|    Tai khoan so dep|
+------------+--------------------+



In [89]:
trans_query = f"""
    (SELECT * from {trans_table} where booking_date = '{booking_date}') as subquery
"""
trans_df = spark.read.jdbc(url= url, table = trans_query, properties= properties)
trans_df = trans_df.withColumnRenamed('account_number', 'account_id')
trans_df.show()


+------------+------------+-------+------------+
|  account_id|trans_amount|channel|booking_date|
+------------+------------+-------+------------+
|744761900043|      5127.0|EWALLET|  2024-01-31|
|555286461200|      4509.0|     MB|  2024-01-31|
|539668218911|      2628.0|     MB|  2024-01-31|
|482534465189|     -9003.0|     MB|  2024-01-31|
|119790767773|      7180.0|     IB|  2024-01-31|
|691818423482|      3994.0|EWALLET|  2024-01-31|
|722018019543|     -9740.0|     MB|  2024-01-31|
|646298294265|     -7321.0|     MB|  2024-01-31|
|728044457443|      4446.0|     MB|  2024-01-31|
|016035655440|      2514.0|     MB|  2024-01-31|
|632472381170|      -707.0|EWALLET|  2024-01-31|
|370463821619|     -1335.0|EWALLET|  2024-01-31|
|518435934032|     -2914.0|     MB|  2024-01-31|
|365359460293|      7218.0|     MB|  2024-01-31|
|257405675114|      3460.0|EWALLET|  2024-01-31|
|513375293172|     -1426.0|EWALLET|  2024-01-31|
|756859177628|     -3977.0|     MB|  2024-01-31|
|771350308703|      

In [90]:
# customer_db = 'customer'
# target_db = 'target'
   # left join target t
    # on c.target = t.target_code

customer_query = """
    (SELECT c.*, target_name from customer c left join target t on c.target_code = t.target_code
    )
"""
customer_df = spark.read.jdbc(url= url, table = customer_query, properties= properties)
customer_df = customer_df.withColumnRenamed('customer_id', 'customer_code')
customer_df.show()


+-------------+-----------------+-----------+------+----------+-------+-----------+
|customer_code|        full_name|target_code|gender|       dob|segment|target_name|
+-------------+-----------------+-----------+------+----------+-------+-----------+
|     63205802|     Daniel Lewis|       4000|FEMALE|1981-11-30|   KHUT|       KHCN|
|     83990555|      James Lewis|       4000|  MALE|1996-04-04|   KHUT|       KHCN|
|     62980626|       John Davis|       4000|FEMALE|1995-07-02|   KHCL|       KHCN|
|     59803307|     Emily Garcia|       4000|  MALE|1988-12-07|   KHUT|       KHCN|
|     67706131|      James White|       4000|FEMALE|1985-07-27|   KHUT|       KHCN|
|     56993630|Abigail Hernandez|       2000|  MALE|1984-09-17|   KHCL|       KHDN|
|     71320427|     Evelyn Davis|       4000|  MALE|1991-10-24|   KHTT|       KHCN|
|     98154506|    James Jackson|       4000|  MALE|2001-07-23|   KHUT|       KHCN|
|     19035426|     Ava Martinez|       4000|FEMALE|2005-10-21|   KHUT|     

In [91]:
summary_df = acc_df.join(customer_df, acc_df['customer_id'] == customer_df['customer_code'], how='inner')\
                    .join(trans_df, acc_df['account_number'] == trans_df['account_id'], how ='left')\
                    .join(product_df, acc_df['product_id'] == product_df['product_code'], how ='left')\
                    .join(currency_df, acc_df['currency'] == currency_df['currency_code'], how = 'left')
                # .selectExpr("account_number")#, 'acc_df.customer_id', 'acc_df.opening_date', 'acc_df.product_id', 'acc_df.currency',
                            # 'trans_df.trans_amount', 'trans_df.channel', 'trans_df.booking_date')
summary_df = summary_df.selectExpr('account_number', 'customer_id', 'full_name', 'target_code', 'target_name', 'gender', 'dob', 'segment',
                                   'opening_date', 'product_id','product_name', 'currency', 'rate',
                            'trans_amount', 'channel', 'booking_date')
summary_df.show()

+--------------+-----------+----------------+-----------+-----------+------+----------+-------+------------+----------+--------------------+--------+-------+------------+-------+------------+
|account_number|customer_id|       full_name|target_code|target_name|gender|       dob|segment|opening_date|product_id|        product_name|currency|   rate|trans_amount|channel|booking_date|
+--------------+-----------+----------------+-----------+-----------+------+----------+-------+------------+----------+--------------------+--------+-------+------------+-------+------------+
|  052685520990|   59082550|   John Gonzalez|       4000|       KHCN|  MALE|2001-06-21|   KHCL|  2024-01-30|      1001|Tai khoan thong t...|     USD|24595.0|     -5955.0|     MB|  2024-01-31|
|  062636477133|   52240487|    Joseph Smith|       4000|       KHCN|FEMALE|1999-09-15|   KHTT|  2023-03-11|      1001|Tai khoan thong t...|     VND|    1.0|      3760.0|     IB|  2024-01-31|
|  068832082487|   77967137|Isabella Jac

In [93]:
test_query = 'select * from rptb_transaction'

test_Df = spark.read.jdbc(url= url, table = test_query, properties= properties)
# customer_df = customer_df.withColumnRenamed('customer_id', 'customer_code')
test_Df.show()


# if summary_df is not None:
#     summary_df.createOrReplaceTempView("temp_transaction")
#     spark.sql("INSERT INTO rptb_transaction SELECT * FROM temp_transaction")
# else:
#     print("Error: summary_df is None or empty.")

Py4JJavaError: An error occurred while calling o822.jdbc.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "select"
  Position: 15
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
	at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:249)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
