In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, LongType, StringType, StructType, StructField, BooleanType, MapType
from pyspark.sql import Row

In [0]:
data = [Row("Kenny","",10,[50,90,80],{"status":"Active"}), 
        Row("Elis","Robert",20,[10,56,43,20],{"status":"Inactive"}), 
        Row("Myck","Mendes",30,[18,50,32],{"status":"Active"}), 
        Row("Edson","Eliot",40,[60,87,3],{"status":"Active"}) 
      ]

rdd = spark.sparkContext.parallelize(data)


In [0]:
scheme = StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('age', IntegerType(), True),
         StructField("points", ArrayType(StringType()), True),
         StructField("user_state", MapType(StringType(),StringType()), True)        
         ])

In [0]:
df = rdd.toDF(schema=scheme)

In [0]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- points: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- user_state: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [0]:
display(df)

firstname,middlename,age,points,user_state
Kenny,,10,"List(50, 90, 80)",Map(status -> Active)
Elis,Robert,20,"List(10, 56, 43, 20)",Map(status -> Inactive)
Myck,Mendes,30,"List(18, 50, 32)",Map(status -> Active)
Edson,Eliot,40,"List(60, 87, 3)",Map(status -> Active)


In [0]:
df.show(truncate=False)

+---------+----------+---+----------------+--------------------+
|firstname|middlename|age|points          |user_state          |
+---------+----------+---+----------------+--------------------+
|Kenny    |          |10 |[50, 90, 80]    |{status -> Active}  |
|Elis     |Robert    |20 |[10, 56, 43, 20]|{status -> Inactive}|
|Myck     |Mendes    |30 |[18, 50, 32]    |{status -> Active}  |
|Edson    |Eliot     |40 |[60, 87, 3]     |{status -> Active}  |
+---------+----------+---+----------------+--------------------+



In [0]:
df02 = (df.withColumn("point", F.explode("points"))
  .select("firstname","point","user_state.status")        
)

display(df02)

firstname,point,status
Kenny,50,Active
Kenny,90,Active
Kenny,80,Active
Elis,10,Inactive
Elis,56,Inactive
Elis,43,Inactive
Elis,20,Inactive
Myck,18,Active
Myck,50,Active
Myck,32,Active


In [0]:
df03 = (df.filter(F.array_contains(F.col("points"), "50")).withColumn("get_point", F.element_at(F.col("points"), 2)))      
display(df03)

firstname,middlename,age,points,user_state,get_point
Kenny,,10,"List(50, 90, 80)",Map(status -> Active),90
Myck,Mendes,30,"List(18, 50, 32)",Map(status -> Active),50


In [0]:
df04 = df.withColumn("get_point", F.element_at(F.col("points"), 1))
display(df04)

firstname,middlename,age,points,user_state,get_point
Kenny,,10,"List(50, 90, 80)",Map(status -> Active),50
Elis,Robert,20,"List(10, 56, 43, 20)",Map(status -> Inactive),10
Myck,Mendes,30,"List(18, 50, 32)",Map(status -> Active),18
Edson,Eliot,40,"List(60, 87, 3)",Map(status -> Active),60


In [0]:
df05 = (df04.groupBy("user_state.status")
  .agg(F.collect_set("get_point").alias("Points"))
)
display(df05)

status,Points
Active,"List(60, 50, 18)"
Inactive,List(10)


In [0]:
df05 = df05.withColumnRenamed("Points","New_Points")

In [0]:
df05.show(truncate=False)

+--------+------------+
|status  |New_Points  |
+--------+------------+
|Active  |[60, 50, 18]|
|Inactive|[10]        |
+--------+------------+

