# Problem description

## Challenge: Spark-Based Social Media Post Analysis

### Goal
   Create a PySpark script, which takes a CSV file as an input, to process and analyze a dataset comprising social media posts. This dataset includes various metrics and information related to each post.
### Dataset
File: *social_media_data.csv*. Contains columns for:

* Post ID (post_id)
* Like Count (like_count)
* Comment Count (comment_count)
* Post Description (post_description)
* Post Date (post_date)
* Username of the Influencer (username)
* Follower Count of the Influencer (follower_count)

### Tasks
   * Data Preparation (has to be solved within 30 minutes to continue further):
	   * Load the dataset from social_media_data.csv.
	   * Clean the data by addressing missing or improperly formatted values according to the following rules:
			* Treat all numerical strings as numbers, except for post_id.
			* Default undefined numbers to zero.
			* In mixed string-number fields, extract and use only the numeric part. For example, "123ab" should be considered as "123" (as number)

   * Data Enrichment:
	   * Add a new column with the post date in ISO format (yyyy-MM-dd).
	   * Compute and add an engagement rate for each post using the formula: (Likes + Comments) / Followers.

   * Data Summarization:
	   * Aggregate the data to calculate the total engagement rate for each influencer.
	   * Create a summary dataframe featuring total likes, total comments, and total engagement rate for each influencer.

   * Content Analysis:
	   * Extract and store hashtags (e.g., #hashtag) from the post descriptions into a new column. Could be multiple or none.
	   * Extract and store user mentions (e.g., @username) from the post descriptions into another new column. Could be multiple or none.

   * Data Filtering and Ordering:
	   * Filter to include only the posts from the last week of november.
	   * Sort this subset by engagement rate in descending order.


### Deliverables
A Colab Notebook that uses PySpark to perform the specified tasks.

### Evaluation Criteria
* Accuracy and effectiveness in data manipulation and analysis.
* Clarity and maintainability of the code.
* Performance and scalability of the solution.
* Depth and relevance of insights during the challenge.

### Time Frame
Complete the challenge within 1 hour.

# PySpark installation

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=1c5813d468adac3486e3a2ba0ea1aed6eb4ab622788ace8ab4b5540a2e4d1445
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


# Download data

In [None]:
!wget -O social_media_data.csv https://drive.google.com/uc?id=1qweracaidNpm5k3j3myxHo0ODp757fKm

--2024-02-29 09:29:36--  https://drive.google.com/uc?id=1qweracaidNpm5k3j3myxHo0ODp757fKm
Resolving drive.google.com (drive.google.com)... 74.125.141.138, 74.125.141.113, 74.125.141.101, ...
Connecting to drive.google.com (drive.google.com)|74.125.141.138|:443... connected.
HTTP request sent, awaiting response... 303 See Other
Location: https://drive.usercontent.google.com/download?id=1qweracaidNpm5k3j3myxHo0ODp757fKm [following]
--2024-02-29 09:29:36--  https://drive.usercontent.google.com/download?id=1qweracaidNpm5k3j3myxHo0ODp757fKm
Resolving drive.usercontent.google.com (drive.usercontent.google.com)... 74.125.196.132, 2607:f8b0:400c:c36::84
Connecting to drive.usercontent.google.com (drive.usercontent.google.com)|74.125.196.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 28040 (27K) [application/octet-stream]
Saving to: ‘social_media_data.csv’


2024-02-29 09:29:36 (42.2 MB/s) - ‘social_media_data.csv’ saved [28040/28040]



# Code

## Data Preparation

### Load the dataset from social_media_data.csv.

In [None]:
from pyspark.sql import SparkSession

# Start SparkSession
spark = SparkSession.builder.master("local[*]").appName("SocialMediaDataAnalysis").getOrCreate()


In [None]:
df = spark.read.format("csv").option("header","true").option("delimiter", "|").load("social_media_data.csv")

In [None]:
df.show()

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|
|3247012161022596826|      1666|           14|We call it “bârfe...|grebenisancarmen|        691934|1701294183|
|3246621589263013232|      1414|           27|How to style a le...|grebenisancarmen|        691934|1701247491|
|3245884952484566080|      1931|           29|Teddy sets are my...|grebenisancarmen|        691934|1701159677|
|3245155721815515538|       818|           12|Styling ZARA MEN ...|grebenisancarmen|        691934|1701072747|
|3243258903187852328|      1732|           24|Am descoperit ace...|grebenisancarmen|        691934|1700846703|
|

### Clean the data by addressing missing or improperly formatted values according to the following rules:
- Treat all numerical strings as numbers, except for post_id.
- Default undefined numbers to zero.
- In mixed string-number fields, extract and use only the numeric part. For example, "123ab" should be considered as "123" (as number)

In [None]:
import pyspark.sql.functions as f

def clean_numeric_string(column):
    # Extracts numeric part of a string. If no numeric part is found, returns 0
    return f.regexp_extract(column, '(\d+)', 0).cast('integer')


numeric_fields = ["like_count","comment_count","follower_count","post_date"]
for column in numeric_fields:
  df = df.withColumn(column, clean_numeric_string(column))

df_cleaned = df.fillna(0, subset=numeric_fields)

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|
|3247012161022596826|      1666|           14|We call it “bârfe...|grebenisancarmen|        691934|1701294183|
|3246621589263013232|      1414|           27|How to style a le...|grebenisancarmen|        691934|1701247491|
|3245884952484566080|      1931|           29|Teddy sets are my...|grebenisancarmen|        691934|1701159677|
|3245155721815515538|       818|           12|Styling ZARA MEN ...|grebenisancarmen|        691934|1701072747|
|3243258903187852328|      1732|           24|Am descoperit ace...|grebenisancarmen|        691934|1700846703|
|

In [None]:
df_cleaned.show(50)

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|
|3247012161022596826|      1666|           14|We call it “bârfe...|grebenisancarmen|        691934|1701294183|
|3246621589263013232|      1414|           27|How to style a le...|grebenisancarmen|        691934|1701247491|
|3245884952484566080|      1931|           29|Teddy sets are my...|grebenisancarmen|        691934|1701159677|
|3245155721815515538|       818|           12|Styling ZARA MEN ...|grebenisancarmen|        691934|1701072747|
|3243258903187852328|      1732|           24|Am descoperit ace...|grebenisancarmen|        691934|1700846703|
|

### Data Enrichment:

Add a new column with the post date in ISO format (yyyy-MM-dd).
Compute and add an engagement rate for each post using the formula: (Likes + Comments) / Followers.


In [None]:
df_iso = df_cleaned.withColumn("post_date_iso", f.from_unixtime(f.col("post_date"), format="yyyy-MM-dd"))

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|post_date_iso|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|   2023-11-30|
|3247012161022596826|      1666|           14|We call it “bârfe...|grebenisancarmen|        691934|1701294183|   2023-11-29|
|3246621589263013232|      1414|           27|How to style a le...|grebenisancarmen|        691934|1701247491|   2023-11-29|
|3245884952484566080|      1931|           29|Teddy sets are my...|grebenisancarmen|        691934|1701159677|   2023-11-28|
|3245155721815515538|       818|           12|Styling ZARA MEN ...|grebenisancarmen|        691934|1701072747|   2023-11-27|


In [None]:
df_iso.show()

### Data Summarization:

Aggregate the data to calculate the total engagement rate for each influencer.
Create a summary dataframe featuring total likes, total comments, and total engagement rate for each influencer.




#### Reviewing duplicates:

In [None]:
print(df_iso.count())
print(df_iso.dropDuplicates(['post_id']).count())

120
120


There are no duplicates but there could be future cases in which there will be duplicates. That's why we remove possible duplicates by taking the post_id with latest timestamp (in case of a duplicate)

In [None]:
df_unique = df_iso.orderBy(f.col("post_date").desc()).dropDuplicates(['post_id'])

#### Calculating engagement

In [None]:
def calculate_engagement(sum_like_count, sum_comment_count, follower_count):
  # Engagament rate calculated as: ((like_count + comment_count)/follower_count)*100 as its a percentage
  return f.round((((sum_like_count + sum_comment_count) / follower_count)*100),2).alias("engagement_rate_%")

df_summary = df_unique\
            .groupBy(f.col("username"),f.col("follower_count")).agg(\
                                                                    f.sum(f.col("like_count")).alias("total_likes"),\
                                                                    f.sum(f.col("comment_count")).alias("total_comments"),\
                                                                    calculate_engagement(f.sum(f.col("like_count")),f.sum(f.col("comment_count")),f.col("follower_count"))\
                                                                    )

+----------------+--------------+-----------+--------------+-----------------+
|        username|follower_count|total_likes|total_comments|engagement_rate_%|
+----------------+--------------+-----------+--------------+-----------------+
|     kyliejenner|     398959125|   59407681|        184003|            14.94|
|      lidiabuble|       1431173|     140723|          1885|             9.96|
|     selenagomez|     429686292|  148086531|        940585|            34.68|
|grebenisancarmen|        691934|     134393|         14540|            21.52|
|    r.e.m.beauty|       1985029|    1901359|         17130|            96.65|
|    arianagrande|     380785988|   62332695|           949|            16.37|
|    interviewmag|       1266574|     666865|          2653|            52.86|
|        lizgillz|      14019525|    3407189|         11827|            24.39|
+----------------+--------------+-----------+--------------+-----------------+



In [None]:
df_summary.show()


### Content Analysis:

Extract and store hashtags (e.g., #hashtag) from the post descriptions into a new column. Could be multiple or none.
Extract and store user mentions (e.g., @username) from the post descriptions into another new column. Could be multiple or none.


In [None]:
# Extracting hashtags and mentions and replacing empty arrays by NULL.
hashtag_pattern = r"(#\w+)"
mention_pattern = r"(@\w+)"
df_content = df_iso\
             .withColumn("hashtags", f.regexp_extract_all(str=f.col("post_description"), regexp=f.lit(hashtag_pattern)))\
             .withColumn("mentions", f.regexp_extract_all(str=f.col("post_description"), regexp=f.lit(mention_pattern)))\
             .withColumn("hashtags", f.when(f.size(f.col("hashtags"))>0, f.col("hashtags")).otherwise(None))\
             .withColumn("mentions", f.when(f.size(f.col("mentions"))>0, f.col("mentions")).otherwise(None))

In [None]:
df_content.show()

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+--------------------+--------------------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|post_date_iso|            hashtags|            mentions|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+--------------------+--------------------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|   2023-11-30|                NULL|[@hm, @piecesoffi...|
|3247012161022596826|      1666|           14|We call it “bârfe...|grebenisancarmen|        691934|1701294183|   2023-11-29|                NULL|     [@nordik_cabin]|
|3246621589263013232|      1414|           27|How to style a le...|grebenisancarmen|        691934|1701247491|   2023-11-29|                NULL|[@cosstores, @oys...

### Data Filtering and Ordering:

Filter to include only the posts from the last week of november.
Sort this subset by engagement rate in descending order

In [None]:
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType


def is_in_november_last_week(post_date_timestamp:str) -> bool:
  # YEAR agnostic function to calculate if a given timestamp is in the last week of November of that day.
  post_date = datetime.fromtimestamp(post_date_timestamp)
  year = post_date.year
  last_day_of_november = datetime(year, 11, 30)
  weekday = last_day_of_november.weekday()
  day_start_of_last_week = last_day_of_november.day - weekday
  first_day_of_last_week = datetime(year, 11, day_start_of_last_week)
  return first_day_of_last_week <= post_date <= last_day_of_november

is_in_november_last_week_udf = udf(is_in_november_last_week, BooleanType()) # Maybe it's not the most optimized solution.


In [None]:
df_ordered = df_iso\
              .join(df_summary.select("username","engagement_rate_%"), ["username"])\
              .filter(is_in_november_last_week_udf(f.col("post_date"))==True)\
              .orderBy(f.col("engagement_rate_%").desc())

+----------------+-------------------+----------+-------------+--------------------+--------------+----------+-------------+-----------------+
|        username|            post_id|like_count|comment_count|    post_description|follower_count| post_date|post_date_iso|engagement_rate_%|
+----------------+-------------------+----------+-------------+--------------------+--------------+----------+-------------+-----------------+
|    interviewmag|3245331176155211241|    666865|         2653|@kyliejenner @int...|       1266574|1701093645|   2023-11-27|            52.86|
|grebenisancarmen|3247012161022596826|      1666|           14|We call it “bârfe...|        691934|1701294183|   2023-11-29|            21.52|
|grebenisancarmen|3246621589263013232|      1414|           27|How to style a le...|        691934|1701247491|   2023-11-29|            21.52|
|grebenisancarmen|3245884952484566080|      1931|           29|Teddy sets are my...|        691934|1701159677|   2023-11-28|            21.52|

In [None]:
# last_day_november = Spark date: 2023-11-30
# filter(abs(substract_date(last_day_november- post_date_iso)) <= 7)
.filter(f.col("post_date_iso").cast("date"))\

In [None]:
df_ordered.show(20)

+----------------+-------------------+----------+-------------+--------------------+--------------+----------+-------------+-----------------+
|        username|            post_id|like_count|comment_count|    post_description|follower_count| post_date|post_date_iso|engagement_rate_%|
+----------------+-------------------+----------+-------------+--------------------+--------------+----------+-------------+-----------------+
|    interviewmag|3245331176155211241|    666865|         2653|@kyliejenner @int...|       1266574|1701093645|   2023-11-27|            52.86|
|grebenisancarmen|3247012161022596826|      1666|           14|We call it “bârfe...|        691934|1701294183|   2023-11-29|            21.52|
|grebenisancarmen|3246621589263013232|      1414|           27|How to style a le...|        691934|1701247491|   2023-11-29|            21.52|
|grebenisancarmen|3245884952484566080|      1931|           29|Teddy sets are my...|        691934|1701159677|   2023-11-28|            21.52|

Sort the influencers in descending order based on the number of unique mentions they have in their post descriptions
For example, if influencer A has 3 unique mentions and influencer B has 5 unique mentions in their posts descriptions then the order should be:
  01. B - 5
  02. A - 3

In [None]:
df_exploded = df_content.withColumn("explode_mentions", f.explode(f.col("mentions")))
df_exploded.show()

+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+--------+--------------------+-----------------+
|            post_id|like_count|comment_count|    post_description|        username|follower_count| post_date|post_date_iso|hashtags|            mentions| explode_mentions|
+-------------------+----------+-------------+--------------------+----------------+--------------+----------+-------------+--------+--------------------+-----------------+
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|   2023-11-30|    NULL|[@hm, @piecesoffi...|              @hm|
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|   2023-11-30|    NULL|[@hm, @piecesoffi...|  @piecesofficial|
|3247472340898021032|      1900|           14|I see, I buy it.?...|grebenisancarmen|        691934|1701348910|   2023-11-30|    NULL|[@

In [None]:
df_exploded.dropDuplicates(['username', 'explode_mentions']).groupBy("username").count().orderBy(f.col("count").desc()).show()

+----------------+-----+
|        username|count|
+----------------+-----+
|grebenisancarmen|   25|
|    interviewmag|   12|
|     selenagomez|   12|
|     kyliejenner|    7|
|      lidiabuble|    6|
|    arianagrande|    6|
+----------------+-----+

