# Start

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


In [2]:
CASSANDRA_HOST = "etl_cassandra"
CASSANDRA_KEYSPACE = "de_project"
CASSANDRA_TABLE = "tracking"

MYSQL_HOST = "localhost"
MYSQL_DATABASE = "Data_Warehouse"
# MYSQL_ROOT_PASSWORD = "123456"


In [3]:
spark = SparkSession.builder.config('spark.jars.packages','com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

In [4]:
spark_df = spark.read.format("org.apache.spark.sql.cassandra").options(table=CASSANDRA_TABLE,keyspace=CASSANDRA_KEYSPACE)\
    .load()

In [5]:
spark_df.printSchema()

root
 |-- create_time: string (nullable = false)
 |-- bid: integer (nullable = true)
 |-- bn: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- cd: integer (nullable = true)
 |-- custom_track: string (nullable = true)
 |-- de: string (nullable = true)
 |-- dl: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- ed: string (nullable = true)
 |-- ev: integer (nullable = true)
 |-- group_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- md: string (nullable = true)
 |-- publisher_id: integer (nullable = true)
 |-- rl: string (nullable = true)
 |-- sr: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- tz: integer (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_content: string (nullable = true)
 |-- utm_medium: string (nullable = true)
 |-- utm_source: string (nullable = true)
 |-- utm_term

In [6]:
spark_df.show()

+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+----+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+----+--------+
|         create_time| bid|        bn|campaign_id|  cd|custom_track|   de|                  dl|             dt|                  ed|  ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|   v|      vp|
+--------------------+----+----------+-----------+----+------------+-----+--------------------+---------------+--------------------+----+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+

In [None]:
# from pyspark.sql import Row

# columns = ('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
# sampleDF = spark.createDataFrame(data, columns)
# sampleDF.show()

In [None]:
data = spark_df.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')


In [7]:
# #create some fake data to insert to mysql
# columns = ["id", "event"]
# data = [(1, "clicks"), (2, "clicks"),
#     (3, "exit"),(4, "apply")]
# from pyspark.sql import Row
# rows = [Row(id=x[0], event=x[1]) for x in data]
# sampleDF = spark.createDataFrame(rows, columns)
# sampleDF = spark.sparkContext.parallelize(data).toDF(columns)
# sampleDF.show()

## Select column

In [8]:
data = spark_df.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
data = data.filter((data.job_id.isNotNull()) & (data.custom_track.isNotNull()))
data.show()

+--------------------+------+------------+---+-----------+--------+------------+
|                  ts|job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+--------------------+------+------------+---+-----------+--------+------------+
|2022-07-26 13:57:...|  1527|       click|  0|        222|    NULL|           1|
| 2023-01-15 04:47:44|   671|   qualified|  0|         12|      17|          34|
|2022-07-26 09:48:...|   336|       click|  1|         15|    NULL|           1|
|2022-07-25 16:54:...|  1532|       click|  0|        222|    NULL|           1|
|2022-07-25 17:02:...|  1527|       click|  0|        222|    NULL|           1|
|2022-07-26 09:53:...|   273|       click|  0|         48|    NULL|           1|
|2022-07-26 14:05:...|  1532|       click|  0|        222|    NULL|           1|
|2022-07-08 16:43:...|    98|       click|  2|          4|    NULL|           1|
|2022-07-26 10:02:...|  1533|       click|  0|        222|    NULL|           1|
|2022-07-08 16:40:...|    98

# Pocessing

## Click processing

In [9]:
click_data = data.filter(data.custom_track == 'click')
click_data = click_data.na.fill({'job_id':0})
click_data = click_data.na.fill({'custom_track':0})
click_data = click_data.na.fill({'bid':0})
click_data = click_data.na.fill({'campaign_id':0})
click_data = click_data.na.fill({'group_id':0})
click_data = click_data.na.fill({'publisher_id':0})

click_data.createTempView('clicktable')

In [10]:
click_output = spark.sql("""SELECT job_id, date(ts) as dates, hour(ts) as hours, group_id, campaign_id, publisher_id, Round(AVG(bid),2) as bid_set, count(*) as clicks, sum(bid) as spend_hour 
                       FROM clicktable 
                       GROUP BY job_id, campaign_id, group_id, publisher_id, date(ts), hour(ts)""")
click_output.show()

+------+----------+-----+--------+-----------+------------+-------+------+----------+
|job_id|     dates|hours|group_id|campaign_id|publisher_id|bid_set|clicks|spend_hour|
+------+----------+-----+--------+-----------+------------+-------+------+----------+
|  1533|2022-07-27|   11|       0|        222|           1|    0.0|    11|         0|
|  1530|2022-07-25|   16|       0|        222|           1|    0.0|    11|         0|
|  1531|2022-07-27|   11|       0|        222|           1|    0.0|    15|         0|
|   187|2022-07-26|   13|      34|         48|           1|    1.0|    10|        10|
|  1533|2022-07-26|   10|       0|        222|           1|    0.0|    11|         0|
|   273|2022-07-26|    9|       0|         48|           1|    0.0|    13|         0|
|  1532|2022-07-26|   10|       0|        222|           1|    0.0|     7|         0|
|   187|2022-07-07|   10|       0|         48|           1|   0.33|     6|         2|
|   188|2022-07-24|   17|      34|         48|        

## Conversion processing

In [11]:
conversion_data = data.filter(data.custom_track == 'conversion')
conversion_data = conversion_data.na.fill({'job_id':0})
conversion_data = conversion_data.na.fill({'custom_track':0})
conversion_data = conversion_data.na.fill({'bid':0})
conversion_data = conversion_data.na.fill({'campaign_id':0})
conversion_data = conversion_data.na.fill({'group_id':0})
conversion_data = conversion_data.na.fill({'publisher_id':0})

conversion_data.createTempView('conversiontable')

In [12]:
conversion_output = spark.sql("""SELECT job_id, date(ts) as dates, hour(ts) as hours, group_id, campaign_id, publisher_id, Round(AVG(bid),2) as bid_set, count(*) as conversion, sum(bid) as spend_hour 
                       FROM conversiontable 
                       GROUP BY job_id, campaign_id, group_id, publisher_id, date(ts), hour(ts)""")
conversion_output.show()

+------+----------+-----+--------+-----------+------------+-------+----------+----------+
|job_id|     dates|hours|group_id|campaign_id|publisher_id|bid_set|conversion|spend_hour|
+------+----------+-----+--------+-----------+------------+-------+----------+----------+
|   672|2023-01-15|    4|      11|         61|          15|    0.0|         1|         0|
|   125|2023-01-15|    4|      32|          1|          29|    0.0|         1|         0|
|   858|2023-01-15|    4|      32|          1|          15|    1.0|         1|         1|
|   355|2023-01-15|    4|      21|         61|           2|    0.0|         1|         0|
|   458|2023-01-15|    4|      25|         48|          17|    0.0|         1|         0|
|  1822|2023-01-15|    4|      27|         55|          22|    0.0|         1|         0|
|  1616|2023-01-15|    4|      26|         48|          31|    1.0|         1|         1|
|   969|2023-01-15|    4|      35|          5|          13|    1.0|         1|         1|
|  1059|20

## Qualified processing

In [13]:
qualified_data = data.filter(data.custom_track == 'qualified')
qualified_data = qualified_data.na.fill({'job_id':0})
qualified_data = qualified_data.na.fill({'custom_track':0})
qualified_data = qualified_data.na.fill({'bid':0})
qualified_data = qualified_data.na.fill({'campaign_id':0})
qualified_data = qualified_data.na.fill({'group_id':0})
qualified_data = qualified_data.na.fill({'publisher_id':0})

qualified_data.createTempView('qualifiedtable')

In [14]:
qualified_output = spark.sql("""SELECT job_id, date(ts) as dates, hour(ts) as hours, group_id, campaign_id, publisher_id, Round(AVG(bid),2) as bid_set, count(*) as qualified, sum(bid) as spend_hour 
                       FROM qualifiedtable 
                       GROUP BY job_id, campaign_id, group_id, publisher_id, date(ts), hour(ts)""")
qualified_output.show()

+------+----------+-----+--------+-----------+------------+-------+---------+----------+
|job_id|     dates|hours|group_id|campaign_id|publisher_id|bid_set|qualified|spend_hour|
+------+----------+-----+--------+-----------+------------+-------+---------+----------+
|  1218|2023-01-15|    4|      22|         33|          12|    1.0|        1|         1|
|  1644|2023-01-15|    4|      10|        141|          22|    1.0|        1|         1|
|   263|2023-01-15|    4|      26|         59|           3|    1.0|        1|         1|
|   671|2023-01-15|    4|      17|         12|          34|    0.0|        1|         0|
|   192|2023-01-15|    4|      41|         51|          15|    0.0|        1|         0|
|   339|2023-01-15|    4|      41|         12|          29|    0.0|        1|         0|
|  1656|2023-01-15|    4|      17|         58|          12|    0.0|        1|         0|
|  1981|2023-01-15|    4|      15|         13|          30|    1.0|        1|         1|
|  1400|2023-01-15|  

## Unqualified processing


In [15]:
unqualified_data = data.filter(data.custom_track == 'unqualified')
unqualified_data = unqualified_data.na.fill({'job_id':0})
unqualified_data = unqualified_data.na.fill({'custom_track':0})
unqualified_data = unqualified_data.na.fill({'bid':0})
unqualified_data = unqualified_data.na.fill({'campaign_id':0})
unqualified_data = unqualified_data.na.fill({'group_id':0})
unqualified_data = unqualified_data.na.fill({'publisher_id':0})

unqualified_data.createTempView('unqualifiedtable')

In [16]:
unqualified_output = spark.sql("""SELECT job_id, date(ts) as dates, hour(ts) as hours, group_id, campaign_id, publisher_id, Round(AVG(bid),2) as bid_set, count(*) as unqualified, sum(bid) as spend_hour 
                       FROM unqualifiedtable 
                       GROUP BY job_id, campaign_id, group_id, publisher_id, date(ts), hour(ts)""")
unqualified_output.show()

+------+----------+-----+--------+-----------+------------+-------+-----------+----------+
|job_id|     dates|hours|group_id|campaign_id|publisher_id|bid_set|unqualified|spend_hour|
+------+----------+-----+--------+-----------+------------+-------+-----------+----------+
|  2051|2023-01-15|    4|      41|        122|           9|    0.0|          1|         0|
|  2002|2023-01-15|    4|      17|         15|          33|    1.0|          1|         1|
|   311|2023-01-15|    4|      14|         53|          12|    0.0|          1|         0|
|   285|2023-01-15|    4|      41|         57|           9|    1.0|          1|         1|
|   197|2023-01-15|    4|      17|         12|          15|    1.0|          1|         1|
|   990|2023-01-15|    4|      21|         70|          10|    1.0|          1|         1|
|  1280|2023-01-15|    4|      37|          9|           9|    1.0|          1|         1|
|  1997|2023-01-15|    4|      27|         61|          15|    1.0|          1|         1|

## Merge to get final data

In [17]:
final_data = click_output.join(conversion_output,['job_id','dates','hours','publisher_id','campaign_id','group_id', 'bid_set', 'spend_hour'],'full').\
    join(qualified_output,['job_id','dates','hours','publisher_id','campaign_id','group_id', 'bid_set', 'spend_hour'],'full').\
    join(unqualified_output,['job_id','dates','hours','publisher_id','campaign_id','group_id', 'bid_set', 'spend_hour'],'full')
final_data = final_data.withColumnRenamed('qualified', 'qualified_application').withColumnRenamed('unqualified', 'disqualified_application')
final_data.show()

+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+
|job_id|     dates|hours|publisher_id|campaign_id|group_id|bid_set|spend_hour|clicks|conversion|qualified_application|disqualified_application|
+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+
|    76|2023-01-15|    4|          33|         54|      10|    1.0|         1|  NULL|      NULL|                 NULL|                       1|
|    81|2023-01-15|    4|          11|          1|      10|    1.0|         1|  NULL|         1|                 NULL|                    NULL|
|    90|2023-01-15|    4|          33|        198|      10|    1.0|         1|  NULL|      NULL|                 NULL|                       1|
|    98|2022-07-08|   16|           1|          4|       0|    2.0|       216|   108|      NULL|                 NULL|                  

## Get company_df 

In [18]:
company_df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://{host}:3306/{db}".format(host=MYSQL_HOST, db=MYSQL_DATABASE)) \
    .option("dbtable", """(SELECT id as job_id, company_id, group_id, campaign_id FROM job) test""") \
    .option("user", "root") \
    .option("password", "1") \
    .load()
company_df.show()

+------+----------+--------+-----------+
|job_id|company_id|group_id|campaign_id|
+------+----------+--------+-----------+
|     2|         1|      10|          1|
|     3|         1|      10|          1|
|     4|         1|      10|          1|
|     5|         1|      10|          1|
|     6|         1|      10|          1|
|     7|         1|      10|          1|
|     8|         1|      10|          1|
|     9|         1|      10|          1|
|    39|         1|    NULL|          1|
|    40|         1|      10|          1|
|    41|         1|      10|          1|
|    42|         1|      10|          1|
|    43|         1|    NULL|          1|
|    44|         1|      10|          1|
|    45|         1|      10|          1|
|    46|         1|      10|          1|
|    47|         1|      10|          1|
|    48|         1|      10|          1|
|    49|         1|      10|          1|
|    50|         1|      10|          1|
+------+----------+--------+-----------+
only showing top

## Merge company & final data

In [19]:
final_output = final_data.join(company_df,'job_id','left').drop(company_df.campaign_id).drop(company_df.group_id)
final_output.show()

+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+
|job_id|     dates|hours|publisher_id|campaign_id|group_id|bid_set|spend_hour|clicks|conversion|qualified_application|disqualified_application|company_id|
+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+
|    76|2023-01-15|    4|          33|         54|      10|    1.0|         1|  NULL|      NULL|                 NULL|                       1|         1|
|    81|2023-01-15|    4|          11|          1|      10|    1.0|         1|  NULL|         1|                 NULL|                    NULL|         1|
|    90|2023-01-15|    4|          33|        198|      10|    1.0|         1|  NULL|      NULL|                 NULL|                       1|         1|
|    98|2022-07-08|   16|           1|          4|       0|    2.0|   

In [20]:
from pyspark.sql.functions import lit

final_output = final_output.na.fill({'clicks':'0'})
final_output = final_output.na.fill({'conversion':'0'})
final_output = final_output.na.fill({'qualified_application':'0'})
final_output = final_output.na.fill({'disqualified_application':'0'})
final_output = final_output.withColumn('sources', lit('Cassandra'))
final_output.show()


+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+---------+
|job_id|     dates|hours|publisher_id|campaign_id|group_id|bid_set|spend_hour|clicks|conversion|qualified_application|disqualified_application|company_id|  sources|
+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+---------+
|    76|2023-01-15|    4|          33|         54|      10|    1.0|         1|     0|         0|                    0|                       1|         1|Cassandra|
|    81|2023-01-15|    4|          11|          1|      10|    1.0|         1|     0|         1|                    0|                       0|         1|Cassandra|
|    90|2023-01-15|    4|          33|        198|      10|    1.0|         1|     0|         0|                    0|                       1|         1|Cassandra|
|    98|20

# Output


## Add timestamp to output

In [21]:
latest_modified_time = spark_df.select(F.max('ts')).collect()[0][0]
latest_modified_time

datetime.datetime(2023, 1, 15, 4, 56, 5)

In [22]:
latest_modified_time = spark_df.select(F.max('ts')).collect()[0][0]
final_output = final_output.withColumn('latest_modified_time', F.lit(latest_modified_time))

In [23]:
final_output.show()

+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+---------+--------------------+
|job_id|     dates|hours|publisher_id|campaign_id|group_id|bid_set|spend_hour|clicks|conversion|qualified_application|disqualified_application|company_id|  sources|latest_modified_time|
+------+----------+-----+------------+-----------+--------+-------+----------+------+----------+---------------------+------------------------+----------+---------+--------------------+
|    76|2023-01-15|    4|          33|         54|      10|    1.0|         1|     0|         0|                    0|                       1|         1|Cassandra| 2023-01-15 04:56:05|
|    81|2023-01-15|    4|          11|          1|      10|    1.0|         1|     0|         1|                    0|                       0|         1|Cassandra| 2023-01-15 04:56:05|
|    90|2023-01-15|    4|          33|        198|      10|    1.0|   

In [24]:
final_output.write.format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/Data_Warehouse") \
    .option("dbtable", "events") \
    .mode("append") \
    .option("user", "root") \
    .option("password", "1") \
    .save()

In [25]:
spark = SparkSession.builder.config('spark.jars.packages','com.mysql:mysql-connector-j:8.0.33').getOrCreate()

df = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://{host}:3306/{db}".format(host=MYSQL_HOST, db=MYSQL_DATABASE)) \
    .option("dbtable", "events") \
    .option("user", "root") \
    .option("password", "1") \
    .load()

df.show()

+---+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+---------+--------------------+
| id|job_id|     dates|hours|publisher_id|company_id|campaign_id|group_id|disqualified_application|qualified_application|conversion|clicks|bid_set|spend_hour|  sources|latest_modified_time|
+---+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+---------+--------------------+
|  1|    76|2023-01-15|    4|          33|         1|         54|      10|                       1|                    0|         0|     0|    1.0|       1.0|Cassandra| 2023-01-15 04:56:05|
|  2|    81|2023-01-15|    4|          11|         1|          1|      10|                       0|                    0|         1|     0|    1.0|       1.0|Cassandra| 2023-01-15 04:56:05|
|  3|    90|2023-01-15|    4|          33|        