In [51]:
import pyspark

from pyspark.sql import SparkSession  
from pyspark.sql.functions import col, regexp_extract, sum as pyspark_sum, when  ,desc,row_number

In [None]:
spark = SparkSession.builder.appName("PeerIDYearProcessing").getOrCreate()  
data = [  
    ('ABC17969(AB)', '1', 'ABC17969', 2022),  
    ('ABC17969(AB)', '2', 'CDC52533', 2022),  
    ('ABC17969(AB)', '3', 'DEC59161', 2023),  
    ('ABC17969(AB)', '4', 'F43874', 2022),  
    ('ABC17969(AB)', '5', 'MY06154', 2021),  
    ('ABC17969(AB)', '6', 'MY4387', 2022),  
    ('AE686(AE)', '7', 'AE686', 2023),  
    ('AE686(AE)', '8', 'BH2740', 2021),  
    ('AE686(AE)', '9', 'EG999', 2021),  
    ('AE686(AE)', '10', 'AE0908', 2021),  
    ('AE686(AE)', '11', 'QA402', 2022),  
    ('AE686(AE)', '12', 'OM691', 2022)  
]  
  
columns = ["peer_id", "id_1", "id_2", "year"]  
df = spark.createDataFrame(data, schema=columns)  

In [52]:
df.show()

+------------+----+--------+----+
|     peer_id|id_1|    id_2|year|
+------------+----+--------+----+
|ABC17969(AB)|   1|ABC17969|2022|
|ABC17969(AB)|   2|CDC52533|2022|
|ABC17969(AB)|   3|DEC59161|2023|
|ABC17969(AB)|   4|  F43874|2022|
|ABC17969(AB)|   5| MY06154|2021|
|ABC17969(AB)|   6|  MY4387|2022|
|   AE686(AE)|   7|   AE686|2023|
|   AE686(AE)|   8|  BH2740|2021|
|   AE686(AE)|   9|   EG999|2021|
|   AE686(AE)|  10|  AE0908|2021|
|   AE686(AE)|  11|   QA402|2022|
|   AE686(AE)|  12|   OM691|2022|
+------------+----+--------+----+



In [12]:
df_with_year = df.withColumn("target_year", when(col("peer_id").contains(col("id_2")), col("year"))).dropDuplicates().dropna() 
df_with_year.show()

+------------+----+--------+----+-----------+
|     peer_id|id_1|    id_2|year|target_year|
+------------+----+--------+----+-----------+
|ABC17969(AB)|   1|ABC17969|2022|       2022|
|   AE686(AE)|   7|   AE686|2023|       2023|
+------------+----+--------+----+-----------+



In [18]:
# Q2: Group by peer_id and year, then count and filter year
df_grouped = df.groupBy("peer_id", "year").count()
df_filtered = df_grouped.join(df_with_year.select("peer_id", "target_year"), on="peer_id").where(col("year") <= col("target_year"))
df_filtered.show()

+------------+----+-----+-----------+
|     peer_id|year|count|target_year|
+------------+----+-----+-----------+
|ABC17969(AB)|2021|    1|       2022|
|ABC17969(AB)|2022|    4|       2022|
|   AE686(AE)|2022|    2|       2023|
|   AE686(AE)|2021|    3|       2023|
|   AE686(AE)|2023|    1|       2023|
+------------+----+-----+-----------+



In [35]:
windowSpec = pyspark_sum("count").over(pyspark.sql.Window.partitionBy("peer_id").orderBy(desc("year")))
windowSpec2 = row_number().over(pyspark.sql.Window.partitionBy("peer_id").orderBy(desc("year")))
df_cumulative = df_filtered.withColumn("cumulative_count", windowSpec)
df_cumulative=df_cumulative.withColumn("rn",windowSpec2)
df_cumulative.show()

+------------+----+-----+-----------+----------------+---+
|     peer_id|year|count|target_year|cumulative_count| rn|
+------------+----+-----+-----------+----------------+---+
|ABC17969(AB)|2022|    4|       2022|               4|  1|
|ABC17969(AB)|2021|    1|       2022|               5|  2|
|   AE686(AE)|2023|    1|       2023|               1|  1|
|   AE686(AE)|2022|    2|       2023|               3|  2|
|   AE686(AE)|2021|    3|       2023|               6|  3|
+------------+----+-----+-----------+----------------+---+



In [55]:
size=3 
# Filter rows where the cumulative count is bigger than or equal to the size  
df_tmp = df_cumulative.where(col("cumulative_count") >= size)
df_tmp.show()

df_tmp1=df_tmp.groupBy("peer_id").min('cumulative_count')
df_tmp1.show()

+------------+----+-----+-----------+----------------+---+
|     peer_id|year|count|target_year|cumulative_count| rn|
+------------+----+-----+-----------+----------------+---+
|ABC17969(AB)|2022|    4|       2022|               4|  1|
|ABC17969(AB)|2021|    1|       2022|               5|  2|
|   AE686(AE)|2022|    2|       2023|               3|  2|
|   AE686(AE)|2021|    3|       2023|               6|  3|
+------------+----+-----+-----------+----------------+---+

+------------+---------------------+
|     peer_id|min(cumulative_count)|
+------------+---------------------+
|ABC17969(AB)|                    4|
|   AE686(AE)|                    3|
+------------+---------------------+



In [60]:
df_result=df_cumulative.join(df_tmp1,on="peer_id").where(col("cumulative_count") <= col("min(cumulative_count)"))
df_result.orderBy('peer_id',desc('year')).show()

+------------+----+-----+-----------+----------------+---+---------------------+
|     peer_id|year|count|target_year|cumulative_count| rn|min(cumulative_count)|
+------------+----+-----+-----------+----------------+---+---------------------+
|ABC17969(AB)|2022|    4|       2022|               4|  1|                    4|
|   AE686(AE)|2023|    1|       2023|               1|  1|                    3|
|   AE686(AE)|2022|    2|       2023|               3|  2|                    3|
+------------+----+-----+-----------+----------------+---+---------------------+

