In [0]:
data = [['A' ,'Blue'],['A','Red'],['A','Blue'],['A','Blue'],['B','Purple'],['B','Green'],['B','Purple'],['B','Green'],['C','Black']]
header = ['Key','Value']

In [0]:
display(data)

_1,_2
A,Blue
A,Red
A,Blue
A,Blue
B,Purple
B,Green
B,Purple
B,Green
C,Black


In [0]:
df = spark.createDataFrame(data, header)

In [0]:
df.show()

+---+------+
|Key| Value|
+---+------+
|  A|  Blue|
|  A|   Red|
|  A|  Blue|
|  A|  Blue|
|  B|Purple|
|  B| Green|
|  B|Purple|
|  B| Green|
|  C| Black|
+---+------+



In [0]:
# Solution 1 , based on RDD

In [0]:
# create rdd from dataframe
r_dd = df.rdd

In [0]:
sol1 = r_dd.map(lambda l: (l, 1))\
    .reduceByKey(lambda a,b: a+b)\
    .sortBy(lambda l : (l[0][1]))\
    .map(lambda l: (l[0][0],(l[0][1],l[1])))\
    .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))\
    .map(lambda l: (l[0],l[1][0]))\
    .collect()
display(sol1)

_1,_2
B,Green
C,Black
A,Blue


In [0]:
# Solution 2 by dataframe

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [0]:
df.printSchema()

root
 |-- Key: string (nullable = true)
 |-- Value: string (nullable = true)



In [0]:
df1 = df.groupBy('Key','Value').agg(F.count('*').alias("cnt")).withColumn('mx', F.max("cnt").over(
    Window.partitionBy("Key").orderBy(F.asc("Value"))
)).filter("cnt = mx").groupBy('Key').agg(F.first('Value').alias("Value"))

In [0]:
display(df1)

Key,Value
A,Blue
B,Green
C,Black


In [0]:
# Solution 3 by dataframe

In [0]:
df2 = df.withColumn("index", F.lit(1)).groupBy('Key','Value').agg(F.sum('index').alias('flag'))
df3 =df2.groupBy('Key').agg(F.max('flag').alias('flag')).select(F.col("Key").alias('Key1'),'flag')
df_f = df2.join(df3, (df2.Key==df3.Key1) & (df2.flag==df3.flag),how='inner').groupBy('Key').agg(F.min('Value').alias('Value')) 
display(df_f) 

Key,Value
A,Blue
B,Green
C,Black


In [0]:
# solution 4, based on sql

In [0]:
df.createOrReplaceTempView('tbl')
df.printSchema()

root
 |-- Key: string (nullable = true)
 |-- Value: string (nullable = true)



In [0]:
spark.sql(" select T.Key, min(TT.Value) as Value from  (select Key , max(weight) as mx from " +
          "(select Key, Value, count(Value) as weight from tbl group by Key, Value) group by Key)T inner join" +
          "(select Key, Value, count(Value) as weight from tbl group by Key, Value) TT "+
          " on T.Key= TT.Key and T.mx=TT.weight group by T.Key ").show()

+---+-----+
|Key|Value|
+---+-----+
|  A| Blue|
|  B|Green|
|  C|Black|
+---+-----+

