In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, to_date, col, regexp_extract, min, desc, when

spark = SparkSession.builder.appName("IndieZ-Part2-B").getOrCreate()

In [44]:
df = spark.read.parquet('data/processing.parquet')
print(f'The number of row in DataFrame: {df.count()} \n')
df.show(5)

The number of row in DataFrame: 243144 

+------------+--------------------+---------------+---------+-------+
|{created_at}|     {campaign_name}|{activity_kind}|{country}|{store}|
+------------+--------------------+---------------+---------+-------+
|  1719842408|ss_min_NGi_Global...|     ad_revenue|       ru| itunes|
|  1719842412|ss_min_NGa_Global...|     ad_revenue|       ru| google|
|  1719842421|ss_min_NGa_Global...|     impression|       us| google|
|  1719842420|ss_min_NGa_Global...|     ad_revenue|       ru| google|
|  1719842422|ss_min_NGi_Global...|     ad_revenue|       ru| itunes|
+------------+--------------------+---------------+---------+-------+
only showing top 5 rows



Drop Duplicates

In [45]:
df = df.dropDuplicates()
print(f'The number of row after dropping duplicates in DataFrame: {df.count()}')

The number of row after dropping duplicates in DataFrame: 208761


Rename Column Name (remove '{' & '}')

In [46]:
for column in df.schema.names:
   new_column = column.replace('{', '').replace('}','')
   df = df.withColumnRenamed(column, new_column)
df.show(5)

+----------+--------------------+-------------+-------+------+
|created_at|       campaign_name|activity_kind|country| store|
+----------+--------------------+-------------+-------+------+
|1719320368|ss_min_NGa_Global...|      session|     ru|google|
|1719843448|ss_min_NGa_Global...|   impression|     ru|google|
|1719844285|ss_min_NGa_Global...|   impression|     ua|google|
|1719844786|unity_NGa_ROAS-Ad...|   ad_revenue|     ru|google|
|1719845511|ss_min_NGa_Global...|   impression|     ua|google|
+----------+--------------------+-------------+-------+------+
only showing top 5 rows



Parsed created_at to date and campaign_name to project_id

In [47]:
pattern = r"([A-Z]{2}[a-z]{1}+)"

df_processed = df.withColumn('created_at', to_date(from_unixtime(col('created_at')))) \
                 .withColumn('project_id', \
                              when(col('campaign_name').rlike(pattern), regexp_extract('campaign_name', pattern, 1)) \
                                 .otherwise(col('campaign_name')))
df_processed.show(5)

+----------+--------------------+-------------+-------+------+----------+
|created_at|       campaign_name|activity_kind|country| store|project_id|
+----------+--------------------+-------------+-------+------+----------+
|2024-06-25|ss_min_NGa_Global...|      session|     ru|google|       NGa|
|2024-07-01|ss_min_NGa_Global...|   impression|     ru|google|       NGa|
|2024-07-01|ss_min_NGa_Global...|   impression|     ua|google|       NGa|
|2024-07-01|unity_NGa_ROAS-Ad...|   ad_revenue|     ru|google|       NGa|
|2024-07-01|ss_min_NGa_Global...|   impression|     ua|google|       NGa|
+----------+--------------------+-------------+-------+------+----------+
only showing top 5 rows



List of unique projects

In [48]:
project_list = df_processed.select('project_id').distinct().toPandas()['project_id'].tolist()
print(f'List of unique projects: {project_list}')

List of unique projects: ['AZa', 'KNi', 'Malformed Advertising ID', 'IPs', 'TRi', 'Invalid Signature', 'NGi', 'NGa', 'японские кроссворды']


a. Return a list of project ids and their earliest recorded event time

In [49]:
df_processed.groupBy('project_id').agg(min('created_at').alias('created_at')).orderBy('created_at').show()

df_a = df_processed.groupBy('project_id').agg(min('created_at').alias('created_at')).orderBy('created_at')
list_project_ids_event_time = df_a.toPandas().to_dict('records')
print(list_project_ids_event_time)

+--------------------+----------+
|          project_id|created_at|
+--------------------+----------+
|                 TRi|2024-02-12|
|                 KNi|2024-04-09|
|                 NGi|2024-05-04|
|                 NGa|2024-05-06|
|                 AZa|2024-05-14|
|                 IPs|2024-06-03|
|Malformed Adverti...|2024-07-01|
|   Invalid Signature|2024-07-01|
| японские кроссворды|2024-07-01|
+--------------------+----------+

[{'project_id': 'TRi', 'created_at': datetime.date(2024, 2, 12)}, {'project_id': 'KNi', 'created_at': datetime.date(2024, 4, 9)}, {'project_id': 'NGi', 'created_at': datetime.date(2024, 5, 4)}, {'project_id': 'NGa', 'created_at': datetime.date(2024, 5, 6)}, {'project_id': 'AZa', 'created_at': datetime.date(2024, 5, 14)}, {'project_id': 'IPs', 'created_at': datetime.date(2024, 6, 3)}, {'project_id': 'Malformed Advertising ID', 'created_at': datetime.date(2024, 7, 1)}, {'project_id': 'Invalid Signature', 'created_at': datetime.date(2024, 7, 1)}, {'proje

b. Return a list of all project ids transactions sorted by number of events

In [50]:
df_processed.groupBy('project_id').count().withColumnRenamed('count','transaction_num').orderBy(desc('transaction_num')).show()

df_b = df_processed.groupBy('project_id').count().withColumnRenamed('count','transaction_num').orderBy(desc('transaction_num'))
list_project_ids_transactions_num = df_b.toPandas().to_dict('records')
print(list_project_ids_transactions_num)

+--------------------+---------------+
|          project_id|transaction_num|
+--------------------+---------------+
|                 TRi|          99560|
|                 NGa|          63403|
|                 KNi|          31097|
|                 NGi|          13917|
|                 IPs|            263|
|   Invalid Signature|            256|
|                 AZa|            201|
| японские кроссворды|             56|
|Malformed Adverti...|              8|
+--------------------+---------------+

[{'project_id': 'TRi', 'transaction_num': 99560}, {'project_id': 'NGa', 'transaction_num': 63403}, {'project_id': 'KNi', 'transaction_num': 31097}, {'project_id': 'NGi', 'transaction_num': 13917}, {'project_id': 'IPs', 'transaction_num': 263}, {'project_id': 'Invalid Signature', 'transaction_num': 256}, {'project_id': 'AZa', 'transaction_num': 201}, {'project_id': 'японские кроссворды', 'transaction_num': 56}, {'project_id': 'Malformed Advertising ID', 'transaction_num': 8}]


---

In [51]:
spark.stop()