In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce,col

In [2]:
spark = SparkSession.builder.config('spark.jars.packages','com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

<b> Đọc data từ Cassandra</b>

In [3]:
data = spark.read.format("org.apache.spark.sql.cassandra").options(table = 'tracking',keyspace = 'study_de').load()

In [4]:
data.show(5)

+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+----+--------+--------------------+------+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id|  cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|  rl|      sr|                  ts|    tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+----+--------+--------------------+------+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|1dcc9e80-0cb0-1

<b> Đọc Data từ MySQL

In [5]:
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/data_engineering") \
    .option("dbtable", "events") \
    .option("user", "root") \
    .option("password", "1") \
    .load()

In [6]:
df.show(5)

+----+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+-----------+----------+---------+
|  id|job_id|     dates|hours|disqualified_application|qualified_application|conversion|company_id|group_id|campaign_id|publisher_id|bid_set|clicks|impressions|spend_hour|  sources|
+----+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+-----------+----------+---------+
|2089|    98|2022-07-08|    9|                       1|                    0|         1|         1|    null|          4|           1|    2.0|   108|       null|     216.0|Cassandra|
|2090|    98|2022-07-13|   15|                       1|                    0|         1|         1|    null|          4|           1|    2.0|     1|       null|       2.0|Cassandra|
|2091|   187|2022-07-08|    4|                       1|                    0|         1|  

In [7]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- dates: date (nullable = true)
 |-- hours: integer (nullable = true)
 |-- disqualified_application: integer (nullable = true)
 |-- qualified_application: integer (nullable = true)
 |-- conversion: integer (nullable = true)
 |-- company_id: integer (nullable = true)
 |-- group_id: integer (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- publisher_id: integer (nullable = true)
 |-- bid_set: double (nullable = true)
 |-- clicks: integer (nullable = true)
 |-- impressions: string (nullable = true)
 |-- spend_hour: double (nullable = true)
 |-- sources: string (nullable = true)



<b> Load dữ liệu vào bảng Test trong MySQL

In [9]:
df.write.format('jdbc').option('url','jdbc:mysql://localhost:3306/data_engineering')\
    .option('driver','com.mysql.cj.jdbc.Driver')\
    .option('dbtable','Test')\
    .option('user','root')\
    .option('password','1').mode('append').save()

In [8]:
data = data.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
data = data.filter(data.job_id.isNotNull())

In [9]:
data.show()

+--------------------+------+------------+---+-----------+--------+------------+
|                  ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+--------------------+------+------------+---+-----------+--------+------------+
|2022-07-25 09:24:...| 188.0|        null|1.0|       48.0|    34.0|         1.0|
|2022-07-24 14:08:...| 188.0|       click|1.0|       48.0|    34.0|         1.0|
|2022-07-25 09:51:...|1532.0|        null|0.0|      222.0|    null|         1.0|
| 2023-01-14 21:55:04| 311.0| unqualified|0.0|       53.0|    14.0|        12.0|
|2022-07-27 04:15:...|1530.0|        null|0.0|      222.0|    null|         1.0|
|2022-07-26 06:47:...| 258.0|       click|1.0|       93.0|    null|         1.0|
|2022-07-26 06:56:...| 273.0|       click|0.0|       48.0|    null|         1.0|
|2022-07-24 14:05:...| 188.0|        null|1.0|       48.0|    34.0|         1.0|
|2022-07-26 02:50:...| 273.0|        null|0.0|       48.0|    null|         1.0|
|2022-07-08 09:44:...|  98.0

In [10]:
data.select("custom_track").distinct().show()

+------------+
|custom_track|
+------------+
|  conversion|
|        null|
|       click|
| unqualified|
|       alive|
|   qualified|
+------------+



In [13]:
data

DataFrame[ts: string, job_id: double, custom_track: string, bid: double, campaign_id: double, group_id: double, publisher_id: double]

<b> Xử lý custom_track = click

In [11]:
click_data = data.filter(data.custom_track == 'click')
click_data.createOrReplaceTempView("click")

In [12]:
spark.sql("SELECT * FROM click").show(10)

+--------------------+------+------------+---+-----------+--------+------------+
|                  ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+--------------------+------+------------+---+-----------+--------+------------+
|2022-07-24 14:08:...| 188.0|       click|1.0|       48.0|    34.0|         1.0|
|2022-07-27 03:59:...| 188.0|       click|1.0|       48.0|    34.0|         1.0|
|2022-07-26 02:44:...| 440.0|       click|7.0|        1.0|    null|         1.0|
|2022-07-25 10:01:...|1527.0|       click|0.0|      222.0|    null|         1.0|
|2022-07-26 06:47:...| 258.0|       click|1.0|       93.0|    null|         1.0|
|2022-07-08 09:34:...|  98.0|       click|2.0|        4.0|    null|         1.0|
|2022-07-26 06:46:...| 258.0|       click|1.0|       93.0|    null|         1.0|
|2022-07-27 04:13:...|1529.0|       click|0.0|      222.0|    null|         1.0|
|2022-07-27 04:21:...|1532.0|       click|0.0|      222.0|    null|         1.0|
|2022-07-25 10:02:...|1527.0

In [13]:
click_tmp = spark.sql("""
                        SELECT 
                            DATE(ts) AS date,
                            HOUR(ts) AS hour,
                            job_id,
                            campaign_id,
                            group_id,
                            publisher_id,
                            ROUND(AVG(bid), 2) AS bid_set,
                            SUM(bid) AS spend_time,
                            COUNT(*) AS click
                        FROM
                            click
                        GROUP BY
                            date,
                            hour,
                            job_id,
                            campaign_id,
                            group_id,
                            publisher_id""")
click_tmp.show(10)

+----------+----+------+-----------+--------+------------+-------+----------+-----+
|      date|hour|job_id|campaign_id|group_id|publisher_id|bid_set|spend_time|click|
+----------+----+------+-----------+--------+------------+-------+----------+-----+
|2022-07-08|   4| 187.0|       48.0|    null|         1.0|    0.0|       0.0|    2|
|2023-01-14|  21|1592.0|       10.0|    32.0|        28.0|    1.0|       1.0|    1|
|2022-07-26|   3|1534.0|      222.0|    null|         1.0|    0.0|       0.0|   16|
|2022-07-25|   9| 188.0|       48.0|    34.0|         1.0|    1.0|      52.0|   52|
|2023-01-14|  21|1886.0|        1.0|    10.0|        13.0|    1.0|       1.0|    1|
|2022-07-26|   2|1527.0|      222.0|    null|         1.0|    0.0|       0.0|   10|
|2022-07-27|   4|1530.0|      222.0|    null|         1.0|    0.0|       0.0|   10|
|2023-01-14|  21|1156.0|       48.0|    10.0|        10.0|    1.0|       1.0|    1|
|2023-01-14|  21|1430.0|        1.0|    35.0|        32.0|    0.0|       0.0

<b>Xử lý custom_track = conversion

In [14]:
conversion_data = data.filter(data.custom_track == "conversion")
conversion_data.createOrReplaceTempView("conversion")

In [15]:
spark.sql("SELECT * FROM conversion").show(10)

+-------------------+------+------------+---+-----------+--------+------------+
|                 ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+-------------------+------+------------+---+-----------+--------+------------+
|2023-01-14 21:49:14| 247.0|  conversion|1.0|       12.0|    16.0|        20.0|
|2023-01-14 21:53:34|1806.0|  conversion|1.0|      101.0|    10.0|        17.0|
|2023-01-14 21:54:04|  81.0|  conversion|1.0|        1.0|    10.0|        11.0|
|2023-01-14 21:47:14| 355.0|  conversion|0.0|       61.0|    21.0|         2.0|
|2023-01-14 21:55:04|1723.0|  conversion|0.0|       79.0|    32.0|        13.0|
|2023-01-14 21:54:04|2009.0|  conversion|1.0|       48.0|    10.0|        31.0|
|2023-01-14 21:56:05| 125.0|  conversion|0.0|        1.0|    32.0|        29.0|
|2023-01-14 21:55:35| 672.0|  conversion|0.0|       61.0|    11.0|        15.0|
|2023-01-14 21:47:14| 858.0|  conversion|1.0|        1.0|    32.0|        15.0|
|2023-01-14 21:56:05|1686.0|  conversion

In [16]:
conversion_tmp = spark.sql("""
                            SELECT 
                                DATE(ts) AS date,
                                HOUR(ts) AS hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id,
                                COUNT(*) AS conversion
                            FROM
                                conversion
                            GROUP BY
                                date,
                                hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id""")
conversion_tmp.show(10)

+----------+----+------+-----------+--------+------------+----------+
|      date|hour|job_id|campaign_id|group_id|publisher_id|conversion|
+----------+----+------+-----------+--------+------------+----------+
|2023-01-14|  21| 125.0|        1.0|    32.0|        29.0|         1|
|2023-01-14|  21|1616.0|       48.0|    26.0|        31.0|         1|
|2023-01-14|  21| 458.0|       48.0|    25.0|        17.0|         1|
|2023-01-14|  21|1686.0|        1.0|    32.0|        32.0|         1|
|2023-01-14|  21| 355.0|       61.0|    21.0|         2.0|         1|
|2023-01-14|  21|2009.0|       48.0|    10.0|        31.0|         1|
|2023-01-14|  21|  81.0|        1.0|    10.0|        11.0|         1|
|2023-01-14|  21|1119.0|      223.0|    41.0|         3.0|         1|
|2023-01-14|  21| 672.0|       61.0|    11.0|        15.0|         1|
|2023-01-14|  21|1806.0|      101.0|    10.0|        17.0|         1|
+----------+----+------+-----------+--------+------------+----------+
only showing top 10 

<b>Xử lý custom_track = qualified

In [17]:
qualified_data = data.filter(data.custom_track == "qualified")
qualified_data.createOrReplaceTempView("qualified")

In [18]:
spark.sql("SELECT * FROM qualified").show(10)

+-------------------+------+------------+---+-----------+--------+------------+
|                 ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+-------------------+------+------------+---+-----------+--------+------------+
|2023-01-14 21:41:07| 245.0|   qualified|0.0|        1.0|    13.0|        24.0|
|2023-01-14 21:56:05| 782.0|   qualified|0.0|       48.0|    10.0|        17.0|
|2023-01-14 21:47:14| 192.0|   qualified|0.0|       51.0|    41.0|        15.0|
|2023-01-14 21:47:14| 339.0|   qualified|0.0|       12.0|    41.0|        29.0|
|2023-01-14 21:56:05|1656.0|   qualified|0.0|       58.0|    17.0|        12.0|
|2023-01-14 21:49:14|1400.0|   qualified|1.0|       79.0|    17.0|        30.0|
|2023-01-14 21:47:14|1981.0|   qualified|1.0|       13.0|    15.0|        30.0|
|2023-01-14 21:47:44| 671.0|   qualified|0.0|       12.0|    17.0|        34.0|
|2023-01-14 21:54:04| 496.0|   qualified|1.0|        1.0|    17.0|        38.0|
|2023-01-14 21:49:14| 379.0|   qualified

In [19]:
qualified_tmp = spark.sql("""
                            SELECT 
                                DATE(ts) AS date,
                                HOUR(ts) AS hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id,
                                COUNT(*) AS qualified
                            FROM
                                qualified
                            GROUP BY
                                date,
                                hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id""")
qualified_tmp.show(10)

+----------+----+------+-----------+--------+------------+---------+
|      date|hour|job_id|campaign_id|group_id|publisher_id|qualified|
+----------+----+------+-----------+--------+------------+---------+
|2023-01-14|  21| 339.0|       12.0|    41.0|        29.0|        1|
|2023-01-14|  21| 192.0|       51.0|    41.0|        15.0|        1|
|2023-01-14|  21|1086.0|       48.0|    13.0|        15.0|        1|
|2023-01-14|  21|1656.0|       58.0|    17.0|        12.0|        1|
|2023-01-14|  21| 379.0|       12.0|    22.0|         3.0|        1|
|2023-01-14|  21| 263.0|       59.0|    26.0|         3.0|        1|
|2023-01-14|  21|1400.0|       79.0|    17.0|        30.0|        1|
|2023-01-14|  21|1644.0|      141.0|    10.0|        22.0|        1|
|2023-01-14|  21| 496.0|        1.0|    17.0|        38.0|        1|
|2023-01-14|  21|1218.0|       33.0|    22.0|        12.0|        1|
+----------+----+------+-----------+--------+------------+---------+
only showing top 10 rows



<b>Xử lý custom_track = unqualified

In [20]:
unqualified_data = data.filter(data.custom_track == "unqualified")
unqualified_data.createOrReplaceTempView("unqualified")

In [21]:
spark.sql("SELECT * FROM unqualified").show(10)

+-------------------+------+------------+---+-----------+--------+------------+
|                 ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+-------------------+------+------------+---+-----------+--------+------------+
|2023-01-14 21:47:14| 512.0| unqualified|0.0|       61.0|    35.0|        17.0|
|2023-01-14 21:54:04|2002.0| unqualified|1.0|       15.0|    17.0|        33.0|
|2023-01-14 21:55:04|1945.0| unqualified|1.0|       13.0|    10.0|        33.0|
|2023-01-14 21:41:07|2051.0| unqualified|0.0|      122.0|    41.0|         9.0|
|2023-01-14 21:56:05| 538.0| unqualified|1.0|        5.0|    31.0|        36.0|
|2023-01-14 21:56:05| 285.0| unqualified|1.0|       57.0|    41.0|         9.0|
|2023-01-14 21:56:05| 990.0| unqualified|1.0|       70.0|    21.0|        10.0|
|2023-01-14 21:55:04| 311.0| unqualified|0.0|       53.0|    14.0|        12.0|
|2023-01-14 21:47:14|  76.0| unqualified|1.0|       54.0|    10.0|        33.0|
|2023-01-14 21:55:35| 760.0| unqualified

In [22]:
unqualified_tmp = spark.sql("""
                            SELECT 
                                DATE(ts) AS date,
                                HOUR(ts) AS hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id,
                                COUNT(*) AS unqualified
                            FROM
                                unqualified
                            GROUP BY
                                date,
                                hour,
                                job_id,
                                campaign_id,
                                group_id,
                                publisher_id""")
unqualified_tmp.show(10)

+----------+----+------+-----------+--------+------------+-----------+
|      date|hour|job_id|campaign_id|group_id|publisher_id|unqualified|
+----------+----+------+-----------+--------+------------+-----------+
|2023-01-14|  21|1688.0|       61.0|    26.0|        17.0|          1|
|2023-01-14|  21|1791.0|        5.0|    10.0|        10.0|          1|
|2023-01-14|  21|2002.0|       15.0|    17.0|        33.0|          1|
|2023-01-14|  21|1632.0|       58.0|    10.0|        13.0|          1|
|2023-01-14|  21| 760.0|       13.0|    32.0|        37.0|          1|
|2023-01-14|  21| 538.0|        5.0|    31.0|        36.0|          1|
|2023-01-14|  21| 885.0|       13.0|    15.0|        12.0|          1|
|2023-01-14|  21|2051.0|      122.0|    41.0|         9.0|          1|
|2023-01-14|  21|1130.0|      117.0|    19.0|        30.0|          1|
|2023-01-14|  21|1414.0|       10.0|    10.0|        24.0|          1|
+----------+----+------+-----------+--------+------------+-----------+
only s

<b> Output mong muốn: Trong một giờ(ngày) đó có bao nhiêu click, conversion, qualified, unqualified với mỗi id tương ứng

In [23]:
final_data = click_tmp.join(conversion_tmp, on = ['date', 'hour', 'job_id', 'campaign_id', 'group_id', 'publisher_id'], how = 'full').\
                       join(qualified_tmp, on = ['date', 'hour', 'job_id', 'campaign_id', 'group_id', 'publisher_id'], how = 'full').\
                       join(unqualified_tmp,on = ['date', 'hour', 'job_id', 'campaign_id', 'group_id', 'publisher_id'], how = 'full')

final_data.show(5)

+----------+----+------+-----------+--------+------------+-------+----------+-----+----------+---------+-----------+
|      date|hour|job_id|campaign_id|group_id|publisher_id|bid_set|spend_time|click|conversion|qualified|unqualified|
+----------+----+------+-----------+--------+------------+-------+----------+-----+----------+---------+-----------+
|2023-01-14|  21| 125.0|        1.0|    32.0|        29.0|   null|      null| null|         1|     null|       null|
|2023-01-14|  21| 339.0|       12.0|    41.0|        29.0|   null|      null| null|      null|        1|       null|
|2022-07-08|   4| 187.0|       48.0|    null|         1.0|    0.0|       0.0|    2|      null|     null|       null|
|2023-01-14|  21|1592.0|       10.0|    32.0|        28.0|    1.0|       1.0|    1|      null|     null|       null|
|2022-07-26|   3|1534.0|      222.0|    null|         1.0|    0.0|       0.0|   16|      null|     null|       null|
+----------+----+------+-----------+--------+------------+------

<b> merge company_id của table company vào output 

In [27]:
df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/data_engineering") \
    .option("dbtable", "company") \
    .option("user", "root") \
    .option("password", "1") \
    .load()

In [28]:
id_company = df.select(df.id.alias('id_company'),"publisher_id")
id_company.show(10)

+----------+------------+
|id_company|publisher_id|
+----------+------------+
|         1|        null|
|         2|        null|
|         3|        null|
|         4|        null|
|         8|        null|
|        10|        null|
|        11|        null|
|        12|        null|
|        13|        null|
|        14|        null|
+----------+------------+
only showing top 10 rows



In [29]:
output_ETL_final = final_data.join(id_company, on = "publisher_id", how = "full")
output_ETL_final.show()

+------------+----------+----+------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|publisher_id|      date|hour|job_id|campaign_id|group_id|bid_set|spend_time|click|conversion|qualified|unqualified|id_company|
+------------+----------+----+------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|        29.0|2023-01-14|  21| 125.0|        1.0|    32.0|   null|      null| null|         1|     null|       null|      null|
|        29.0|2023-01-14|  21| 339.0|       12.0|    41.0|   null|      null| null|      null|        1|       null|      null|
|        29.0|2023-01-14|  21| 737.0|       12.0|    19.0|    0.0|       0.0|    1|      null|     null|       null|      null|
|        null|      null|null|  null|       null|    null|   null|      null| null|      null|     null|       null|         1|
|        null|      null|null|  null|       null|    null|   null|      null| null|      null|     null|

In [30]:
output_ETL_final.printSchema()

root
 |-- publisher_id: double (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- job_id: double (nullable = true)
 |-- campaign_id: double (nullable = true)
 |-- group_id: double (nullable = true)
 |-- bid_set: double (nullable = true)
 |-- spend_time: double (nullable = true)
 |-- click: long (nullable = true)
 |-- conversion: long (nullable = true)
 |-- qualified: long (nullable = true)
 |-- unqualified: long (nullable = true)
 |-- id_company: integer (nullable = true)



<b> Load dữ liệu vào bảng event trong Data warehouse

In [31]:
output_ETL_final.write.format('jdbc').option('url','jdbc:mysql://localhost:3306/data_engineering')\
                                     .option('driver','com.mysql.cj.jdbc.Driver')\
                                     .option('dbtable','event_DW')\
                                     .option('user','root')\
                                     .option('password','1').mode('append').save()

Check table

In [32]:
event_DW_df = spark.read \
                .format("jdbc") \
                .option("driver","com.mysql.cj.jdbc.Driver") \
                .option("url", "jdbc:mysql://localhost:3306/data_engineering") \
                .option("dbtable", "event_DW") \
                .option("user", "root") \
                .option("password", "1") \
                .load()

In [34]:
event_DW_df.filter(event_DW_df.publisher_id.isNotNull() & event_DW_df.id_company.isNotNull()).show(5)

+------------+----------+----+------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|publisher_id|      date|hour|job_id|campaign_id|group_id|bid_set|spend_time|click|conversion|qualified|unqualified|id_company|
+------------+----------+----+------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|         4.0|      null|null|  null|       null|    null|   null|      null| null|      null|     null|       null|        58|
|         1.0|2022-07-08|   4| 187.0|       48.0|    null|    0.0|       0.0|    2|      null|     null|       null|        59|
|         1.0|2022-07-26|   3|1534.0|      222.0|    null|    0.0|       0.0|   16|      null|     null|       null|        59|
|         1.0|2022-07-25|   9| 188.0|       48.0|    34.0|    1.0|      52.0|   52|      null|     null|       null|        59|
|         1.0|2022-07-26|   2|1527.0|      222.0|    null|    0.0|       0.0|   10|      null|     null|