## Main

In [1]:
import findspark
findspark.init()
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
import os
from datetime import datetime
from functools import reduce
import time
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from functools import reduce
from pyspark.sql.window import Window
import pandas as pd

In [2]:
def getSparkSession(app_name="MyApp", master="local[*]", config_options=None):
    """
    Create a Spark session with the given app name and optional configurations.
    
    Parameters:
    - app_name (str): Name of the Spark application.
    - master (str): The master URL for the cluster.
    - config_options (dict): Additional Spark configurations as key-value pairs.
    
    Returns:
    - SparkSession: A SparkSession object.
    """
    builder = SparkSession.builder.appName(app_name).master(master)
    
    # Apply additional configurations if provided
    if config_options:
        for key, value in config_options.items():
            builder = builder.config(key, value)
    
    return builder.getOrCreate()

In [3]:
config= {"spark.jars.packages" : "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1"}
         #"spark.driver.memory" : "6g"}

In [4]:
spark = getSparkSession(app_name= "CassnadraJobETL", config_options= config)

In [5]:
spark_df = spark.read.format("org.apache.spark.sql.cassandra").options(table='tracking',keyspace='study_de').load()

In [6]:
spark_df.count()

6766

In [7]:
spark_df.show()

+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id| cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+--------

In [10]:
spark_df.printSchema()

root
 |-- create_time: string (nullable = false)
 |-- bid: integer (nullable = true)
 |-- bn: string (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- cd: integer (nullable = true)
 |-- custom_track: string (nullable = true)
 |-- de: string (nullable = true)
 |-- dl: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- ed: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- ev: integer (nullable = true)
 |-- group_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job_id: integer (nullable = true)
 |-- md: string (nullable = true)
 |-- publisher_id: integer (nullable = true)
 |-- rl: string (nullable = true)
 |-- sr: string (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- tz: integer (nullable = true)
 |-- ua: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_content: string (nullable = true)
 |-- utm_medium: string (nu

In [6]:
from utils import readMySQL


def get_latest_time_MySQL(spark):
    table= """(SELECT lastest_update_time FROM events) lastest_update_time"""
    mysql_latest_time= readMySQL(spark, database="data_warehouse", nameTable=table)
    mysql_latest_time= mysql_latest_time.take(1)[0][0]
    if mysql_latest_time is None:
        mysql_latest_time = '1998-01-01 23:59:59'
    else : 
        mysql_latest_time = mysql_latest_time
    return mysql_latest_time


In [37]:
def readCassandra_time(spark, table, keyspace, lastest_time):
    df = spark.read.format("org.apache.spark.sql.cassandra") \
        .options(table= table,keyspace= keyspace) \
        .load() \
        .where(F.date_trunc("second", F.col("ts")) > lastest_time)
    return df

In [59]:
table= """(SELECT max(lastest_update_time) FROM events) lastest_update_time"""
mysql_latest_time= readMySQL(spark, database="data_warehouse", nameTable=table)

In [60]:
mysql_time = mysql_latest_time.take(1)

In [63]:
mysql_time[0][0] is None

True

In [27]:
sql_time= get_latest_time_MySQL(spark)

In [38]:
data_new= readCassandra_time(spark, "tracking", "study_de", sql_time)

In [46]:
data_new.count()

AttributeError: 'DataFrame' object has no attribute 'isnull'

In [36]:
data_new.select(F.date_trunc("second", F.col("ts"))).show(truncate= False)

+----------------------+
|date_trunc(second, ts)|
+----------------------+
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
|2023-01-15 04:56:05   |
+----------------------+



In [34]:
cass_time= get_latest_time_cassandra(spark)

In [35]:
cass_time

datetime.datetime(2023, 1, 15, 4, 56, 5)

In [43]:
cass_time

datetime.datetime(2023, 1, 15, 4, 56, 5)

In [33]:
sql_time

datetime.datetime(2023, 1, 15, 4, 56, 5)

In [39]:
cass_time == sql_time

False

In [24]:
 # Extract the value from the Row object if needed
formatted_time = mysql_latest_time.strftime("%Y-%m-%d %H:%M:%S")

print(formatted_time)

2023-01-15 04:56:05


In [15]:
mysql_latest_time.take(1)[0][0]

datetime.datetime(2023, 1, 15, 4, 56, 5)

In [47]:
df = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="tracking", keyspace="study_de") \
    .load() \
    .filter(col("ts") > "2023-01-01 00:00:00")

AttributeError: 'DataFrameReader' object has no attribute 'filter'

In [8]:
import pyspark.sql.functions as F
def process_customtrack_bid(df, custom_track):
    data= df.filter(col("custom_track")== custom_track)
    data= data.selectExpr("date(ts) as date", "hour(ts) as hour", "job_id", \
                            "publisher_id", "campaign_id", "group_id","bid")
    data= data.na.fill(0)
    data= data.groupBy("date", "hour", "job_id", "publisher_id", "campaign_id", "group_id") \
        .agg(
            round(F.avg("bid"), 2).alias("bid_set"),
            F.sum("bid").alias("spend_hour"),
            F.count("*").alias(custom_track)
        )
    return data

def process_customtrack_withoutBid(df, custom_track):
    data= df.filter(col("custom_track")== custom_track)
    data= data.selectExpr("date(ts) as date", "hour(ts) as hour", "job_id", \
                            "publisher_id", "campaign_id", "group_id","bid")
    data= data.na.fill(0)
    data= data.groupBy("date", "hour", "job_id", "publisher_id", "campaign_id", "group_id") \
        .agg(
            F.count("*").alias(custom_track)
        )
    return data

In [9]:
def join_df_final(list_df):    
    join_columns = ['job_id', 'date', 'hour', 'publisher_id', 'campaign_id', 'group_id']
    total_df= list_df[0]
    for df in list_df[1:]:
        total_df= total_df.join(df, join_columns, "full")
    return total_df

In [10]:
def process_cassandra_data(df):
    customtrack= [process_customtrack_bid(df, customtrack) for customtrack in ["click"]]
    customtrack_withoutbid= [process_customtrack_withoutBid(df, customtrack) for customtrack in ["conversion", "qualified", "unqualified"]]
    customtrack.extend(customtrack_withoutbid)
    return join_df_final(customtrack)

In [11]:
df= process_cassandra_data(spark_df)

In [12]:
df.show()

+------+----------+----+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+
|job_id|      date|hour|publisher_id|campaign_id|group_id|bid_set|spend_hour|click|conversion|qualified|unqualified|
+------+----------+----+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+
|     0|2022-07-06|  16|           0|          0|       0|    0.0|         0|    1|      NULL|     NULL|       NULL|
|     0|2022-07-06|  22|           0|          0|       0|   NULL|      NULL| NULL|         2|     NULL|       NULL|
|     0|2022-07-07|   9|           0|          0|       0|    0.0|         0|    3|      NULL|     NULL|       NULL|
|     0|2022-07-07|  10|           0|          0|       0|    0.0|         0|    2|      NULL|     NULL|       NULL|
|   187|2022-07-07|  10|           1|         48|       0|   0.33|         2|    6|      NULL|     NULL|       NULL|
|     0|2022-07-08|   9|           0|          0|       0|    0.

In [28]:
# test= click_data.selectExpr("date(ts) as date", "date_format(ts, 'HH:mm:ss') as time", "job_id", \
#                             "publisher_id", "campaign_id", "group_id","bid")

In [16]:
from utils import readMySQL, importToMySQL

In [13]:
def retrieve_company_data(spark):
    df= readMySQL(spark, "data_warehouse", "job")
    company= df.selectExpr("id as job_id", "company_id", "group_id", "campaign_id")
    return company

In [14]:
company = retrieve_company_data(spark)

-----------------------------------
--------Read From Database--------
-----------------------------------
--------Finished--------
------------------------


In [15]:
final_output = df.join(company,'job_id','left').drop(company.group_id).drop(company.campaign_id)

In [17]:
final_output.show(5)

+------+----------+----+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|job_id|      date|hour|publisher_id|campaign_id|group_id|bid_set|spend_hour|click|conversion|qualified|unqualified|company_id|
+------+----------+----+------------+-----------+--------+-------+----------+-----+----------+---------+-----------+----------+
|     0|2022-07-06|  16|           0|          0|       0|    0.0|         0|    1|      NULL|     NULL|       NULL|      NULL|
|     0|2022-07-06|  22|           0|          0|       0|   NULL|      NULL| NULL|         2|     NULL|       NULL|      NULL|
|     0|2022-07-07|   9|           0|          0|       0|    0.0|         0|    3|      NULL|     NULL|       NULL|      NULL|
|     0|2022-07-07|  10|           0|          0|       0|    0.0|         0|    2|      NULL|     NULL|       NULL|      NULL|
|   187|2022-07-07|  10|           1|         48|       0|   0.33|         2|    6|      NULL|     NULL|

In [20]:
final_output = final_output.select('job_id','date','hour','publisher_id','company_id','campaign_id','group_id','unqualified','qualified','conversion','click','bid_set','spend_hour')
final_output = final_output.withColumnRenamed('date','dates').withColumnRenamed('hour','hours').withColumnRenamed('qualified','qualified_application').\
withColumnRenamed('unqualified','disqualified_application').withColumnRenamed('click','clicks')
final_output = final_output.withColumn('sources',F.lit('Cassandra'))
final_output = final_output.withColumn('lastest_update_time', F.lit(spark_df.selectExpr("max(ts)").take(1)[0][0]))

In [39]:
final_output = final_output.withColumn('lastest_update_time', F.lit(spark_df.selectExpr("max(ts)").take(1)[0][0]))

In [40]:
final_output.show(5)

+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+---------+--------------------+
|job_id|     dates|hours|publisher_id|company_id|campaign_id|group_id|disqualified_application|qualified_application|conversion|clicks|bid_set|spend_hour|  sources| lastest_update_time|
+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+---------+--------------------+
|     0|2022-07-06|   16|           0|      NULL|          0|       0|                    NULL|                 NULL|      NULL|     1|    0.0|         0|Cassandra|2023-01-15 04:56:...|
|     0|2022-07-06|   22|           0|      NULL|          0|       0|                    NULL|                 NULL|         2|  NULL|   NULL|      NULL|Cassandra|2023-01-15 04:56:...|
|     0|2022-07-07|    9|           0|      NULL|          0|       0|

In [41]:
importToMySQL(final_output, "data_warehouse", "events")

----------------------------------------
--------Saving data to Database--------
----------------------------------------
--------Data Import Successfully--------
----------------------------------------


### Spark connect CassandraDB

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1") \
    .appName("CassandraSparkConnector") \
    .config("spark.cassandra.connection.host", "127.0.0.1") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()

In [None]:
spark_df = spark.read.format("org.apache.spark.sql.cassandra").options(table='tracking',keyspace='study_de').load()

In [None]:
spark_df.show()

### Spark connect MongoDB

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

mongodb_uri = "mongodb+srv://studyDE:studyDE123456@studyde.gn2os2e.mongodb.net/?retryWrites=true&w=majority&appName=studyDE"

# Initialize Spark session
spark = SparkSession.builder \
    .appName("MongoSparkConnectorIntro") \
    .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .getOrCreate()


In [None]:
MONGODB_URI= os.getenv('MONGODB_URI') 
def readMongoDB(spark, database, nameTable):
    print('-----------------------------------')
    print(f"--------Read From Database--------")
    print('-----------------------------------')
    df = spark.read \
        .format("mongodb") \
        .option("spark.mongodb.read.connection.uri", MONGODB_URI) \
        .option("spark.mongodb.write.connection.uri", MONGODB_URI) \
        .option("database", database) \
        .option("collection", nameTable) \
        .load()
    print("--------Finished--------")
    print("------------------------")
    return df

In [None]:
df = spark.read \
        .format("mongodb") \
        .option("database", "sample_mflix") \
        .option("collection", "movies") \
        .load()

In [None]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
uri = "mongodb+srv://studyDE:studyDE123456@studyde.gn2os2e.mongodb.net/?retryWrites=true&w=majority&appName=studyDE"
# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


## Process Data before import to Cassandra

In [14]:
from cassandra.cluster import Cluster
cluster = Cluster(['localhost'])
session = cluster.connect("study_de")

In [None]:
import time_uuid
from datetime import datetime, timedelta
import pandas as pd

In [None]:
def set_time(uuid_str):
    timeuuid_obj= time_uuid.TimeUUID(uuid_str)
    datetime= timeuuid_obj.get_datetime()
    return datetime

In [None]:
df= pd.read_csv("../Class4_DataETLPipelineProject/Cassandra/tracking_trans_f.csv")

In [None]:
df.head()

Unnamed: 0,create_time,bid,bn,campaign_id,cd,custom_track,de,dl,dt,ev,...,ua,uid,utm_campaign,utm_content,utm_medium,utm_source,utm_term,v,vp,ed
0,5af328a0-0d60-11ed-90e9-7cd44fe229db,1.0,Chrome 103,48.0,24.0,,UTF-8,http://fe.dev.gotoro.io/candidate-portal/job/3...,CandidatePortal,1.0,...,Mozilla/5.0 (Windows NT 10.0; Win64; x64) Appl...,1-347wwfkr-l632u1zk,,,,,,1.0,1366x625,
1,d6b1f400-0188-11ed-b23e-8dfcae6c0dfd,,Chrome 103,,24.0,click,UTF-8,http://150.136.2.86/candidate-portal/job,CandidatePortal,2.0,...,Mozilla/5.0 (Windows NT 10.0; Win64; x64) Appl...,1-0d5ciljy-l4pdlaxd,,,,,,1.0,1455x929,"{'customEvent': 'click', 'jobId': '81', 'publi..."
2,9c1b1660-0191-11ed-8872-72e1f40ca1d2,,Chrome 103,,24.0,click,UTF-8,http://localhost:4200/candidate-portal/job,CandidatePortal,2.0,...,Mozilla/5.0 (Windows NT 10.0; Win64; x64) Appl...,1-rrc3k5vd-l4o0b4yy,,,,,,1.0,1367x929,"{'customEvent': 'click', 'jobId': '192', 'publ..."
3,38455980-0c1c-11ed-9604-2837d6f241c5,,Chrome 103,,24.0,,UTF-8,http://fe.stag.gotoro.io/candidate-portal/conf...,CandidatePortal,1.0,...,Mozilla/5.0 (Windows NT 10.0; Win64; x64) Appl...,1-7sdmvw7b-l60r3ocj,,,,,,1.0,1920x947,
4,855c7090-0a51-11ed-84f7-2d4940c4b8bc,,Safari 604,,32.0,,UTF-8,http://fe.test.gotoro.io/candidate-portal/job?...,CandidatePortal,1.0,...,Mozilla/5.0 (iPhone; CPU iPhone OS 15_5 like M...,1-m7pas2e6-l5xilolj,,,,,,1.0,375x640,


In [None]:
for i in range(len(df)):
    datetime= set_time(df['create_time'][i])
    df['ts'][i]= datetime

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  df['ts'][i]= datetime
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['ts'][i]= datetime
You are setting val

In [None]:
df['ts'].head(3)

0    2022-07-27 03:58:21.226000
1    2022-07-12 02:17:54.752000
2    2022-07-12 03:20:41.926000
Name: ts, dtype: object

In [None]:
df.to_csv("../Class4_DataETLPipelineProject/Cassandra/tracking_trans_f1.csv", index= False, header= True)

In [None]:
import uuid
from cassandra.util import datetime_from_uuid1
uuid_str = "3e48ed3e-9456-11ed-b45d-596499863832"


# Create a UUID object
uuid_obj = uuid.UUID(uuid_str)
# foo = uuid.uuid1(uuid_obj)
dt_foo = datetime_from_uuid1(uuid_obj)
dt_foo

datetime.datetime(2023, 1, 14, 21, 56, 5, 141843)

In [None]:
dt_foo.date()

datetime.date(2022, 7, 27)