# ***Problem Statement:***

# **Assignment: Log Intelligence System**

Level 1: Foundational Log Processing

Level 2: Metadata Enrichment & Optimization

# **Background**

Your company operates several web-based services that generate large volumes of server logs. These logs are critical for understanding system performance, user behavior, and detecting operational issues. Additionally, business and product teams rely on enriched metrics for decision-making.


# **Level 1 Tasks: Log Intelligence Pipeline**
1. Load & Explore

  Ingest the log data and perform data profiling.

  I have provided a sample, generate Log Dataset of at least 100,000 rows

  Connect to AWS S3 for data retrieval or storage.


2. Clean & Prepare

  Clean and standardize the dataset.

  Handle incorrect, missing values

  Prepare the dataset for further processing.

3. Feature Engineering

  Derive new informative fields to enhance the dataset.
Hint -  hour_of_day, day_of_week, is_error (status >= 400) etc

4. Analyze Usage Patterns and Insights

  Analyze traffic patterns, usage behavior, and performance trends.
Hint -  Slowest APIs, most hit , suspicious user detected

5. Store & Summarize

  Store cleaned and processed data.

  Create summary outputs for further reporting


# **Level 2 Tasks: Metadata Enrichment & Optimization**
6. Enrichment

  Integrate the user_metadata dataset with the log dataset using user_id.
Generate data of size 5k

7. Aggregated Analysis
  Perform grouped analysis using enriched fields.

  Analyze patterns across account types, regions, and activity status.

  Ex . Activity trends of Free vs Premium users

8. Optimization & Output Strategy

Apply appropriate optimization techniques to improve performance.

Organize output for efficient querying and reuse.

Structure your saved data so that subsets can be efficiently accessed without scanning the entire dataset

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Log Intelligence Pipeline").getOrCreate()

In [141]:
df_serverlog = spark.read.option('header','true').csv("server_log_dataset.csv", inferSchema = True)

In [142]:
df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|
|               NULL|     INFO| user_2|  172.254.4.144|  /api/delete|      109.0|        1.927|4b15d36c-3b74-499...|
|2023-06-09 22:51:32|    ERROR| user_3|  192.15.237.65|  /api/logout|       NULL|        2.431|                NULL|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|       NULL|         0.94|cd76fb1a-b0c0-4c2...|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|       NULL|        0.225|058ee99a-6a35-4b2...|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/up

**Dealing with the null values. Assuming that I cannot replace columns like timestamp, ip_address, request_id, user_id with any values from the respected columns. So dropping that rows.**

In [143]:
from pyspark.sql.functions import col

In [144]:
df_serverlog = df_serverlog.filter(col("timestamp").isNotNull())

In [145]:
#As you can see user 2 is dropped from dataset as its timestamp was null#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|
|2023-06-09 22:51:32|    ERROR| user_3|  192.15.237.65|  /api/logout|       NULL|        2.431|                NULL|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|       NULL|         0.94|cd76fb1a-b0c0-4c2...|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|       NULL|        0.225|058ee99a-6a35-4b2...|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|         NULL|b427fa91-5bf8-4fa...|
|2023-12-08 11:24:56|    ERROR| user_9|   10.93.214.59|  /api/lo

In [146]:
df_serverlog.where(col("user_id") == "NULL").show()

+---------+---------+-------+----------+--------+-----------+-------------+----------+
|timestamp|log_level|user_id|ip_address|endpoint|status_code|response_time|request_id|
+---------+---------+-------+----------+--------+-----------+-------------+----------+
+---------+---------+-------+----------+--------+-----------+-------------+----------+



In [147]:
df_serverlog = df_serverlog.where(col("user_id") != "NULL")

In [148]:
#Checked whether there is any null user_id, its not there but still using if it was available#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|
|2023-06-09 22:51:32|    ERROR| user_3|  192.15.237.65|  /api/logout|       NULL|        2.431|                NULL|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|       NULL|         0.94|cd76fb1a-b0c0-4c2...|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|       NULL|        0.225|058ee99a-6a35-4b2...|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|         NULL|b427fa91-5bf8-4fa...|
|2023-12-08 11:24:56|    ERROR| user_9|   10.93.214.59|  /api/lo

In [149]:
df_serverlog = df_serverlog.where(col("ip_address") != "NULL")

In [150]:
#You can see user_23 has been droppped#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|
|2023-06-09 22:51:32|    ERROR| user_3|  192.15.237.65|  /api/logout|       NULL|        2.431|                NULL|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|       NULL|         0.94|cd76fb1a-b0c0-4c2...|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|       NULL|        0.225|058ee99a-6a35-4b2...|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|         NULL|b427fa91-5bf8-4fa...|
|2023-12-08 11:24:56|    ERROR| user_9|   10.93.214.59|  /api/lo

In [151]:
df_serverlog = df_serverlog.where(col("request_id") != "NULL")

In [152]:
#user_21 has been dropped because its request_id is NULL#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|       NULL|         0.94|cd76fb1a-b0c0-4c2...|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|       NULL|        0.225|058ee99a-6a35-4b2...|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|         NULL|b427fa91-5bf8-4fa...|
|2023-12-08 11:24:56|    ERROR| user_9|   10.93.214.59|  /api/logout|      495.0|        1.314|1683c6a0-cacb-456...|
|2024-02-11 05:59:14|     WARN|user_10|   10.81.117.60|  /api/lo

**After dropping rows having nulls in columns timestamp, user_id, ip_address, request_id, Replacing nulls of other columns with most used, least used, average, median depending on column's requirement.**

In [153]:
from pyspark.sql.functions import avg,round,count,mean,median,mode,sum,when,min,max

In [154]:
#Replacing NULL of log_level with most used log_level which comes out to be ERROR#

from pyspark.sql.functions import col, count

most_used_log_level = df_serverlog.groupBy("log_level").agg(
    count("*").alias("count")
).orderBy(col("count").desc()).first()[0]

In [155]:
print(most_used_log_level)

ERROR


In [165]:
df_serverlog = df_serverlog.fillna({'log_level' : most_used_log_level})

In [170]:
#user_30 has changed its log_level to ERROR from NULL#

df_serverlog.where(col("user_id") == "user_30").show()

+-------------------+---------+-------+------------+-----------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|  ip_address|   endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+------------+-----------+-----------+-------------+--------------------+
|2025-04-21 20:49:05|    ERROR|user_30|172.69.76.78|/api/search|      296.0|         1.81|efea30a9-0cc9-4c6...|
+-------------------+---------+-------+------------+-----------+-----------+-------------+--------------------+



In [167]:
#Replaced NULL of endpoints with most used endpoints.#

most_used_endpoint = df_serverlog.groupBy("endpoint").agg(
    count("*").alias("count")
).orderBy(col("count").desc()).first()[0]

In [168]:
print(most_used_endpoint)

/api/delete


In [169]:
df_serverlog = df_serverlog.fillna({'endpoint' : most_used_endpoint})

In [174]:
#user_22 and user_27 has endpoints /api/delete/ in place of NULL. Showing user_27 as an example#

df_serverlog.where(col("user_id") == "user_27").show()

+-------------------+---------+-------+--------------+-----------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|    ip_address|   endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+--------------+-----------+-----------+-------------+--------------------+
|2023-07-05 17:52:37|     WARN|user_27|192.137.86.221|/api/delete|       75.0|        4.047|6cddcac5-0c59-44b...|
+-------------------+---------+-------+--------------+-----------+-----------+-------------+--------------------+



In [172]:
#Replacing NULL status codes with avg of all.#

avg_status_code = df_serverlog.select(round(avg("status_code"))).collect()[0][0]

In [173]:
df_serverlog = df_serverlog.fillna({'status_code' : avg_status_code})

In [175]:
#user_4, user_5, user_16 has status_code 390.0 in place of NULL. Showing user_4 as an example#

df_serverlog.where(col("user_id") == "user_4").show()

+-------------------+---------+-------+-------------+----------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|   ip_address|  endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+-------------+----------+-----------+-------------+--------------------+
|2024-11-26 03:43:14|     WARN| user_4|172.52.177.15|/api/login|      390.0|         0.94|cd76fb1a-b0c0-4c2...|
+-------------------+---------+-------+-------------+----------+-----------+-------------+--------------------+



In [176]:
#Replacing NULL response_time with mean response time#

mean_response_time = df_serverlog.select(round(mean("response_time"),3)).collect()[0][0]

In [177]:
df_serverlog = df_serverlog.fillna({'response_time' : mean_response_time})

In [178]:
#user_6 has response_time 2.523#

df_serverlog.where(col("user_id") == "user_6").show()

+-------------------+---------+-------+---------------+-----------+-----------+-------------+--------------------+
|          timestamp|log_level|user_id|     ip_address|   endpoint|status_code|response_time|          request_id|
+-------------------+---------+-------+---------------+-----------+-----------+-------------+--------------------+
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|/api/update|      309.0|        2.523|b427fa91-5bf8-4fa...|
+-------------------+---------+-------+---------------+-----------+-----------+-------------+--------------------+



In [179]:
df_serverlog.count()

72864

**Cleaning the dataset. Assuming that if the status code is less than 15% should be dropped. Status code has integer less than 1000.**

In [180]:
df_serverlog = df_serverlog.where(col("status_code")/1000 > 0.15)

In [181]:
#Getting the count after dropping some rows based on my assumption of status code and it count gets decreased by 10434#

df_serverlog.count()

62430

**Adding columns like hour_of_the_day, day_of_the_week, is_error to check the error log_level. Assuming good status code be greater than 400, and good response time be less than 1 sec.**

In [182]:
from pyspark.sql.functions import hour

df_serverlog = df_serverlog.withColumn("timestamp", df_serverlog["timestamp"].cast("timestamp"))
df_serverlog = df_serverlog.withColumn("hour_of_the_day", hour(df_serverlog["timestamp"]))

In [183]:
#As you can see, I have added the column named hour of the day in 24 hour format#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|hour_of_the_day|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|             19|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|      390.0|         0.94|cd76fb1a-b0c0-4c2...|              3|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|      390.0|        0.225|058ee99a-6a35-4b2...|             23|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|        2.523|b427fa91-5bf8-4fa...|             17|
|2023-12-08 11:24:56|    ERROR| user_9|   10.93.214.59|  /api/logout|

In [184]:
from pyspark.sql.functions import date_format

df_serverlog = df_serverlog.withColumn("day_of_the_week", date_format(df_serverlog["timestamp"], "EEEE"))

In [185]:
#Added a column named day of the week to check whether it has been done on weekday or weekends#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|hour_of_the_day|day_of_the_week|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|             19|      Wednesday|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|      390.0|         0.94|cd76fb1a-b0c0-4c2...|              3|        Tuesday|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|      390.0|        0.225|058ee99a-6a35-4b2...|             23|         Sunday|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.228|  /api/update|      309.0|        2.523|b427fa91-5b

In [186]:
from pyspark.sql.functions import when

df_serverlog = df_serverlog.withColumn("is_error",when(df_serverlog['log_level'] == "ERROR", True).otherwise(False))

In [187]:
#Added the column named is_error to check whether log_level is ERROR or NOT#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|hour_of_the_day|day_of_the_week|is_error|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|             19|      Wednesday|    true|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|      390.0|         0.94|cd76fb1a-b0c0-4c2...|              3|        Tuesday|   false|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|      390.0|        0.225|058ee99a-6a35-4b2...|             23|         Sunday|   false|
|2024-09-18 17:54:00|     INFO| user_6|172.126.189.2

In [188]:
df_serverlog = df_serverlog.withColumn("good_status",when(df_serverlog['status_code'] >=400, True).otherwise(False))

In [189]:
#Added the column named good_status. Assuming good_status has status_code >= 400#

df_serverlog.show()

+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+-----------+
|          timestamp|log_level|user_id|     ip_address|     endpoint|status_code|response_time|          request_id|hour_of_the_day|day_of_the_week|is_error|good_status|
+-------------------+---------+-------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+-----------+
|2023-02-22 19:28:34|    ERROR| user_1|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|             19|      Wednesday|    true|       true|
|2024-11-26 03:43:14|     WARN| user_4|  172.52.177.15|   /api/login|      390.0|         0.94|cd76fb1a-b0c0-4c2...|              3|        Tuesday|   false|      false|
|2023-06-18 23:44:57|     INFO| user_5|   192.25.49.86|  /api/logout|      390.0|        0.225|058ee99a-6a35-4b2...|             23|         Sunday|  

In [190]:
df_serverlog = df_serverlog.withColumn("good_response",when(df_serverlog['response_time'] <= 2.000, True).otherwise(False))

In [191]:
#Added column named good_response. Assuming good_reponse had response_time <= 2.000 and showing only relevent columns#

df_serverlog.select("user_id","response_time","good_response").show()

+-------+-------------+-------------+
|user_id|response_time|good_response|
+-------+-------------+-------------+
| user_1|        1.411|         true|
| user_4|         0.94|         true|
| user_5|        0.225|         true|
| user_6|        2.523|        false|
| user_9|        1.314|         true|
|user_10|        2.523|        false|
|user_12|        1.201|         true|
|user_13|        1.083|         true|
|user_14|        0.215|         true|
|user_16|        3.091|        false|
|user_18|        4.229|        false|
|user_22|        2.609|        false|
|user_24|        1.759|         true|
|user_25|        2.985|        false|
|user_26|        3.635|        false|
|user_29|         4.34|        false|
|user_30|         1.81|         true|
|user_31|        4.296|        false|
|user_33|        0.635|         true|
|user_34|        2.523|        false|
+-------+-------------+-------------+
only showing top 20 rows



In [192]:
#Considering Slowest API be the one with hightest avg response_time#

mean_endpoint_response_time = print(df_serverlog.groupBy("endpoint").agg(
    avg("response_time").alias("mean_response_time")
).orderBy(col("endpoint").desc()).first()[0])

/api/update


In [193]:
#Consider suspicious user as the one who has response time less than 1, status code greater than 500 and endpoint is not null#

suspicious_user_id_df = df_serverlog.filter(col("status_code") > 500)

In [194]:
suspicious_user_id_df = suspicious_user_id_df.filter(col("response_time") < 1)

In [195]:
suspicious_user_id_df.select("user_id").show()

+--------+
| user_id|
+--------+
| user_33|
| user_54|
| user_64|
|user_113|
|user_139|
|user_158|
|user_183|
|user_194|
|user_221|
|user_257|
|user_269|
|user_296|
|user_305|
|user_324|
|user_343|
|user_365|
|user_377|
|user_409|
|user_503|
|user_573|
+--------+
only showing top 20 rows



In [196]:
df_serverlog.write.option(
    "Header",True).mode(
    "overwrite").csv(
    "output/cleaned_server_log_data"
    )

In [197]:
suspicious_user_id_df.write.option(
    "Header",True).mode(
    "overwrite").csv(
    "output/cleaned_suspicious_user_data"
    )

In [198]:
server_log_summary_df = df_serverlog.summary()

In [199]:
server_log_summary_df.show()

+-------+---------+----------+-------------+-----------+------------------+-----------------+--------------------+-----------------+---------------+
|summary|log_level|   user_id|   ip_address|   endpoint|       status_code|    response_time|          request_id|  hour_of_the_day|day_of_the_week|
+-------+---------+----------+-------------+-----------+------------------+-----------------+--------------------+-----------------+---------------+
|  count|    62430|     62430|        62430|      62430|             62430|            62430|               62430|            62430|          62430|
|   mean|     NULL|      NULL|         NULL|       NULL| 439.4622777510812|2.524618789043812|                NULL| 11.4853275668749|           NULL|
| stddev|     NULL|      NULL|         NULL|       NULL|161.56784812341596|1.355815827281236|                NULL|6.912774143850606|           NULL|
|    min|    DEBUG|    user_1|    10.0.1.14|/api/delete|             151.0|             0.05|00008032-a958

In [200]:
server_log_summary_df.write.option(
    "Header",True).mode(
    "Overwrite").csv(
    "output/summary_server_log_data"
    )

In [201]:
df_usermetadata = spark.read.option('header','true').csv("user_metadata.csv",inferSchema=True)

In [202]:
df_usermetadata.show()

+-------+------------+-------------+-----------+---------+
|user_id|account_type|       region|signup_date|is_active|
+-------+------------+-------------+-----------+---------+
| user_1|        Free|North America| 2016-09-10|     true|
| user_2|     Premium|      Oceania| 2018-05-03|    false|
| user_3|     Premium|       Europe| 2024-06-30|    false|
| user_4|  Enterprise|      Oceania| 2018-12-08|     true|
| user_5|     Premium|North America| 2024-10-26|     true|
| user_6|     Premium|         NULL| 2024-11-29|    false|
| user_7|     Premium|South America| 2015-03-17|    false|
| user_8|        Free|South America| 2016-04-14|     true|
| user_9|  Enterprise|South America| 2019-05-04|     true|
|user_10|  Enterprise|         Asia| 2019-05-06|     true|
|user_11|        NULL|North America| 2021-03-29|    false|
|user_12|  Enterprise|         NULL| 2024-01-28|     true|
|user_13|  Enterprise|         NULL| 2024-07-21|     true|
|user_14|        NULL|South America| 2024-01-24|    fals

**Inner joining the 2 dataset and doing task based on problem statement**

In [203]:
from pyspark.sql.functions import broadcast

In [204]:
df_joined = df_serverlog.join(broadcast(df_usermetadata), on='user_id', how='inner')

In [205]:
df_joined.count()

3148

In [206]:
df_joined.show()

+-------+-------------------+---------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+-----------+-------------+------------+-------------+-----------+---------+
|user_id|          timestamp|log_level|     ip_address|     endpoint|status_code|response_time|          request_id|hour_of_the_day|day_of_the_week|is_error|good_status|good_response|account_type|       region|signup_date|is_active|
+-------+-------------------+---------+---------------+-------------+-----------+-------------+--------------------+---------------+---------------+--------+-----------+-------------+------------+-------------+-----------+---------+
| user_1|2023-02-22 19:28:34|    ERROR|   192.113.2.76|/api/register|      463.0|        1.411|093c96b7-7dae-48e...|             19|      Wednesday|    true|       true|         true|        Free|North America| 2016-09-10|     true|
| user_4|2024-11-26 03:43:14|     WARN|  172.52.177.15|   /api/login

**Getting total number of active users based on account type.
Not done of inactive or status unknown users and it just needs one change in entire code.**

In [207]:
account_type_active_user = df_joined.groupBy("account_type").agg(
    count(when(col("is_active") == "True", True)).alias("active_users")
)

In [208]:
account_type_active_user.show()

+------------+------------+
|account_type|active_users|
+------------+------------+
|     Premium|         435|
|        NULL|         147|
|  Enterprise|         421|
|        Free|         432|
+------------+------------+



**Getting total number of active users based on region.
Not done of inactive or status unknown users and it just needs one change in entire code.**

In [209]:
region_active_user = df_joined.groupBy("region").agg(
    count(when(col("is_active") == "True", True)).alias("active_users")
)

In [210]:
region_active_user.show()

+-------------+------------+
|       region|active_users|
+-------------+------------+
|       Europe|         221|
|       Africa|         207|
|         NULL|         136|
|North America|         209|
|South America|         212|
|      Oceania|         232|
|         Asia|         218|
+-------------+------------+



**Getting total number of active users based on year of signup.
Not done of inactive or status unknown users and it just needs one change in entire code.**

In [211]:
from pyspark.sql.functions import year

df_joined = df_joined.withColumn("signup_year",year(df_joined["signup_date"]))

In [212]:
df_joined.select("user_id","signup_date","signup_year").show()

+-------+-----------+-----------+
|user_id|signup_date|signup_year|
+-------+-----------+-----------+
| user_1| 2016-09-10|       2016|
| user_4| 2018-12-08|       2018|
| user_5| 2024-10-26|       2024|
| user_6| 2024-11-29|       2024|
| user_9| 2019-05-04|       2019|
|user_10| 2019-05-06|       2019|
|user_12| 2024-01-28|       2024|
|user_13| 2024-07-21|       2024|
|user_14| 2024-01-24|       2024|
|user_16| 2016-12-31|       2016|
|user_18|       NULL|       NULL|
|user_22| 2016-03-08|       2016|
|user_24| 2017-03-27|       2017|
|user_25| 2022-03-04|       2022|
|user_26|       NULL|       NULL|
|user_29| 2022-05-15|       2022|
|user_30| 2017-01-12|       2017|
|user_31| 2017-03-15|       2017|
|user_33| 2017-10-16|       2017|
|user_34| 2017-04-10|       2017|
+-------+-----------+-----------+
only showing top 20 rows



In [213]:
signup_year_active_users = df_joined.groupBy("signup_year").agg(
    count(when(col("is_active") == "True" , True)).alias("signup_year_active_user")
).orderBy(col("signup_year").asc())

In [214]:
signup_year_active_users.show()

+-----------+-----------------------+
|signup_year|signup_year_active_user|
+-----------+-----------------------+
|       NULL|                    156|
|       2015|                    131|
|       2016|                    118|
|       2017|                    122|
|       2018|                    134|
|       2019|                    122|
|       2020|                    133|
|       2021|                    133|
|       2022|                    129|
|       2023|                    129|
|       2024|                    128|
+-----------+-----------------------+



In [215]:
account_type_active_user.write.option(
    "Header",True).mode(
    "Overwrite").csv(
    "output/account_type_active_user"
    )

In [216]:
region_active_user.write.option(
    "Header",True).mode(
    "Overwrite").csv(
    "output/region_active_user"
    )

In [217]:
signup_year_active_users.write.option(
    "Header",True).mode(
    "Overwrite").csv(
    "output/signup_year_active_user"
    )