In [2]:
! source /Users/macos/Documents/Python/Project-CV/VNG/VNG-Assignment/venv/bin/activate
# ! pip3 install -r /Users/macos/Documents/Python/Project-CV/VNG/VNG-Assignment/requirements.txt

In [3]:
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, TimestampType, DoubleType
import redis
# Initialize Spark Session with Redis and Kafka dependencies

# Initialize Spark Session with Redis configuration
spark = SparkSession.builder \
    .appName("KafkaStructuredStreaming") \
    .config("spark.jars.packages", 
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
            "com.redislabs:spark-redis_2.12:3.1.0,"
            "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \
    .config("spark.redis.host", "localhost") \
    .config("spark.redis.port", "6379") \
    .config("spark.redis.db", "0") \
    .config('spark.cassandra.connection.host', 'localhost') \
    .getOrCreate()
    



In [4]:
# Đọc file JSON

event_schema = StructType([
    StructField("user_id", StringType(), True),
    StructField("role_id", StringType(), True),
    StructField("game_id", StringType(), True),
    StructField("event_time", TimestampType(), True)
])

df = spark.read \
    .format('json') \
    .option('multiline', 'true') \
    .schema(event_schema) \
    .load('/Users/macos/Documents/Python/Project-CV/VNG/VNG-Assignment/files/sample_file.json')

# df.show(truncate=False)
# df.printSchema()



In [5]:
timestamp_format = "yyyy-MM-dd'T'HH:mm:ss.SSS"
dateformats = "yyyy-MM-dd'T'HH:mm:00"
start_window = "yyyy-MM-dd'T'00:00:ssXXX"
end_window = "yyyy-MM-dd'T'23:59:59XXX"
date= "yyyy-MM-dd"
date_sample= "2024-07-17"


df_2 = df.withColumn("event_time", to_timestamp(col("event_time"), timestamp_format)) \
        .withColumn("event_time", date_format(col("event_time"), dateformats))

df_2 = df_2.withColumn("time_start_window", date_format(col("event_time"),start_window))

df_2 = df_2.withColumn("time_end_window", date_format(col("event_time"),end_window))
df_dropDuplicate = df_2.dropDuplicates()

# df_dropDuplicate=df_dropDuplicate.filter(date_format(col('event_time'),date) == current_date())
df_dropDuplicate=df_dropDuplicate.filter(date_format(col('event_time'),date) == date_sample)
df_dropDuplicate.show(truncate=False)


                                                                                

+-------+-------+-------+-------------------+-------------------------+-------------------------+
|user_id|role_id|game_id|event_time         |time_start_window        |time_end_window          |
+-------+-------+-------+-------------------+-------------------------+-------------------------+
|220018 |role009|game2  |2024-07-17T00:34:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220018 |role006|game2  |2024-07-17T00:50:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220018 |role009|game2  |2024-07-17T01:24:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220018 |role006|game2  |2024-07-17T02:41:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220018 |role003|game2  |2024-07-17T02:46:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220018 |role006|game2  |2024-07-17T03:10:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220011 |role003|game2  |2024-07-17T03:13:00|2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|
|220097 |role006|gam

In [6]:

transformed_df = df_dropDuplicate.groupBy("user_id","game_id","time_start_window","time_end_window").agg(count_distinct("user_id","game_id","event_time").alias("playing_time_minutes"))
compose_key = transformed_df.withColumn("compose_id", concat(col("user_id"),lit(":"),col("game_id")))

transformed_df.sort("playing_time_minutes", ascending=False).show(truncate=False)
compose_key.orderBy("playing_time_minutes", ascending=True).show(truncate=False)


+-------+-------+-------------------------+-------------------------+--------------------+
|user_id|game_id|time_start_window        |time_end_window          |playing_time_minutes|
+-------+-------+-------------------------+-------------------------+--------------------+
|220018 |game2  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|266                 |
|220074 |game1  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|99                  |
|220044 |game1  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|92                  |
|220016 |game2  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|88                  |
|220086 |game1  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|88                  |
|220064 |game2  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|87                  |
|220063 |game4  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|86                  |
|220061 |game3  |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|86                  |

In [7]:

aggregate = df_dropDuplicate.groupBy("user_id","time_start_window","time_end_window").agg(count_distinct("user_id","game_id","event_time").alias("playing_time_minutes"))
aggregate.orderBy(["playing_time_minutes"] , ascending=False).show(truncate=False)

+-------+-------------------------+-------------------------+--------------------+
|user_id|time_start_window        |time_end_window          |playing_time_minutes|
+-------+-------------------------+-------------------------+--------------------+
|220018 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|548                 |
|220044 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|386                 |
|220097 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|383                 |
|220074 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|381                 |
|220058 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|379                 |
|220077 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|378                 |
|220061 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|370                 |
|220084 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|370                 |
|220045 |2024-07-17T00:00:00+07:00|2024-07-17T23:59:59+07:00|369                 |
|220

In [8]:
# compose_key.write \
#     .format("org.apache.spark.sql.redis") \
#     .option("table", "playing_time") \
#     .option("key.column", "compose_id") \
#     .mode("overwrite") \
#     .save()

# aggregate.write \
#     .format("org.apache.spark.sql.redis") \
#     .option("table", "total_playing_time") \
#     .option("key.column", "user_id") \
#     .mode("overwrite") \
#     .save()



In [9]:
# data = [
#     {"user_id": "220011", "role_id": "role004", "game_id": "game2", "event_time": "2024-07-16T23:55:09"},
#     {"user_id": "220012", "role_id": "role010", "game_id": "game2", "event_time": "2024-07-16T23:55:10"},
#     {"user_id": "220013", "role_id": "role005", "game_id": "game2", "event_time": "2024-07-16T23:55:11"},
#     {"user_id": "220014", "role_id": "role010", "game_id": "game2", "event_time": "2024-07-16T23:55:12"},
#     {"user_id": "220015", "role_id": "role008", "game_id": "game2", "event_time": "2024-07-16T23:55:13"},
#     {"user_id": "220016", "role_id": "role004", "game_id": "game2", "event_time": "2024-07-16T23:55:14"},
#     {"user_id": "220017", "role_id": "role002", "game_id": "game2", "event_time": "2024-07-16T23:55:15"},
# ]

# df = spark.createDataFrame(data)


In [10]:
def create_cassandra_connection():
    #connecting to the cassandra cluster
    session = None
    try:
        cluster = Cluster(['localhost'])
        cas_session = cluster.connect()

        return cas_session
    except Exception as e:
        print(f"could not create cassandra connection due to {e}")
        return None
    return session
        
session = create_cassandra_connection()



In [11]:
import uuid
import hashlib
def generate_uuid():
    return str(uuid.uuid4())
uuid_udf = udf(generate_uuid, StringType())



def create_keyspace(session):
    #create keyspace her
    session.execute("""
                    CREATE KEYSPACE IF NOT EXISTS spark_streams
                    WITH replication = {'class': 'SimpleStrategy','replication_factor': '1'};
                    """)
    print('keyspace is created')


def create_table(session):
    session.execute("""
                    CREATE TABLE IF NOT EXISTS spark_streams.created_users(
                    id UUID PRIMARY KEY,
                    user_id TEXT,
                    game_id TEXT,
                    role_id TEXT,
                    event_time TEXT)
                    """)
    print("Table created successfully")
create_keyspace(session)
create_table(session)


df = df.withColumn("id", uuid_udf())
df.show(truncate=False)


# compose_key.write \
#     .format("org.apache.spark.sql.redis") \
#     .option("table", "playing_time") \
#     .option("key.column", "compose_id") \
#     .mode("overwrite") \
#     .save()


keyspace is created
Table created successfully
+-------+-------+-------+-------------------+------------------------------------+
|user_id|role_id|game_id|event_time         |id                                  |
+-------+-------+-------+-------------------+------------------------------------+
|220018 |role004|game2  |2024-07-16 23:55:09|110cb6d5-1008-467e-9312-99e6f684fe68|
|220018 |role010|game2  |2024-07-16 23:55:10|ef621c16-6753-4672-8e9f-a2290d8e6f51|
|220018 |role005|game2  |2024-07-16 23:55:11|c5f4e951-dc83-4024-a477-80928fce61ef|
|220018 |role010|game2  |2024-07-16 23:55:12|29854cfb-6a9c-4114-9459-56b792ebcabe|
|220018 |role008|game2  |2024-07-16 23:55:13|5d0d97dc-88ce-40b3-a1c8-d8198c3ca799|
|220018 |role004|game2  |2024-07-16 23:55:14|1931071e-d02a-4a8b-bc23-da4ee02d8bd7|
|220018 |role002|game2  |2024-07-16 23:55:15|4ab9f634-b333-4fa5-97f1-f6ad5c16f2d2|
|220018 |role008|game2  |2024-07-16 23:55:16|e70a3779-98be-4522-bbab-480a0b1c79a5|
|220018 |role006|game2  |2024-07-16 23:5

In [12]:
df.write.format("org.apache.spark.sql.cassandra")\
                                .option('keyspace','spark_streams')\
                                .option('table','created_users') \
                                .mode('append')\
                                .save()


                                                                                

25/01/05 16:34:54 WARN ControlConnection: [s0] Error connecting to Node(endPoint=localhost/127.0.0.1:9042, hostId=8ee8285b-58a9-40aa-b712-0e0295b29a85, hashCode=646dc423), trying next node (ConnectionInitException: [s0|control|id: 0x4bdeb48f, L:/127.0.0.1:65255 - R:localhost/127.0.0.1:9042] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=6ee91e56-90d9-471c-9e01-03ff8a4b2a16, APPLICATION_NAME=Spark-Cassandra-Connector-local-1736069543625}): unexpected failure (com.datastax.oss.driver.api.core.connection.ClosedConnectionException: Lost connection to remote peer))
25/01/05 16:34:54 WARN ChannelPool: [s0|localhost/127.0.0.1:9042]  Error while opening new channel (ConnectionInitException: [s0|id: 0xf4831b7c, L:/127.0.0.1:65265 - R:localhost/127.0.0.1:9042] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R),