In [1]:
from  pyspark.sql import functions as Fun
from pyspark.sql.types import *


In [2]:
data=[('a1',10,20,['d1','d2']),('b1',25,40,['d2','d4','d1']),('c1',25,400,None),('d1',40,15,[])]
inpdf=spark.createDataFrame(data,['col1','col2','col3','col4'])

In [15]:
inpdf.show()

+----+----+----+------------+
|col1|col2|col3|        col4|
+----+----+----+------------+
|  a1|  10|  20|    [d1, d2]|
|  b1|  25|  40|[d2, d4, d1]|
|  c1|  25| 400|        null|
|  d1|  40|  15|          []|
+----+----+----+------------+



## Check whether atleast one element from array matches with string

In [29]:
inpdf.selectExpr('*',"array_contains(col4,'d4') as col5","(array_contains(col4,'d4') or col1='c1') and col2>10 as col6 ").show()

+----+----+----+--------+-----+-----+
|col1|col2|col3|    col4| col5| col6|
+----+----+----+--------+-----+-----+
|  a1|  10|  20|[d1, d2]|false|false|
|  b1|  30|  40|[d2, d4]| true| true|
|  c1| 300| 400|    null| null| true|
|  d1| 200| 500|      []|false|false|
+----+----+----+--------+-----+-----+



## Sort using multiple columns

In [17]:
inpdf.sort(Fun.desc('col2'),Fun.asc('col3')).show()

+----+----+----+------------+
|col1|col2|col3|        col4|
+----+----+----+------------+
|  d1|  40|  15|          []|
|  b1|  25|  40|[d2, d4, d1]|
|  c1|  25| 400|        null|
|  a1|  10|  20|    [d1, d2]|
+----+----+----+------------+



## Combine multiple elements into a list

In [26]:
inpdf.selectExpr('collect_list(col2)').show()

+------------------+
|collect_list(col2)|
+------------------+
|  [10, 25, 25, 40]|
+------------------+



## Explode function

In [46]:
inpdf.withColumn("col4",Fun.explode('col4')).show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|  a1|  10|  20|  d1|
|  a1|  10|  20|  d2|
|  b1|  25|  40|  d2|
|  b1|  25|  40|  d4|
|  b1|  25|  40|  d1|
+----+----+----+----+



## Sort elements of any array

In [6]:
inpdf.withColumn('col6',Fun.sort_array(Fun.col('col4'))).show()

+----+----+----+------------+------------+
|col1|col2|col3|        col4|        col6|
+----+----+----+------------+------------+
|  a1|  10|  20|    [d1, d2]|    [d1, d2]|
|  b1|  25|  40|[d2, d4, d1]|[d1, d2, d4]|
|  c1|  25| 400|        null|        null|
|  d1|  40|  15|          []|          []|
+----+----+----+------------+------------+



## Get input file name

In [2]:
inp=spark.read.csv('input/*',header=True,sep='|')

In [9]:
inp.withColumn('input_file_name',Fun.input_file_name()).show()

+---+---+---+--------------------+
| c1| c2| c3|     input_file_name|
+---+---+---+--------------------+
| a1| 15|  5|file:///Users/sri...|
| a2| 25|  6|file:///Users/sri...|
| a1| 10| 20|file:///Users/sri...|
+---+---+---+--------------------+



In [10]:
inp_rdd=sc.textFile('input/*').map(lambda x:x.split('|'))

In [13]:
spark.createDataFrame(inp_rdd).withColumn('input_file_name',Fun.input_file_name()).show()

+---+---+---+--------------------+
| _1| _2| _3|     input_file_name|
+---+---+---+--------------------+
| c1| c2| c3|file:/Users/srika...|
| a1| 15|  5|file:/Users/srika...|
| a2| 25|  6|file:/Users/srika...|
| c1| c2| c3|file:/Users/srika...|
| a1| 10| 20|file:/Users/srika...|
+---+---+---+--------------------+



In [7]:
inpdf.withColumn('col6',Fun.spark_partition_id()).show()

+----+----+----+------------+----+
|col1|col2|col3|        col4|col6|
+----+----+----+------------+----+
|  a1|  10|  20|    [d1, d2]|   0|
|  b1|  25|  40|[d2, d4, d1]|   1|
|  c1|  25| 400|        null|   2|
|  d1|  40|  15|          []|   3|
+----+----+----+------------+----+

