In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import os

In [2]:
spark = SparkSession.builder \
 .master("local") \
     .appName("TrainingDay1")\
     .getOrCreate()

In [3]:
sc=spark.sparkContext

In [4]:
from pyspark.sql.functions import col

## UDF Capitalize the letter of the Name

In [5]:
data=[[1,"john jones"],[2,"tracey smith"],[3,"amy sanders"]]
columns=['seqno','names']
df=spark.createDataFrame(data,columns)

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [7]:
def toUpper(name):
    return name.upper()

In [58]:
toupper = udf(lambda z: toUpper(z),StringType())

In [59]:
df=df.withColumn("upper",toupper(col('names')))

In [8]:
df.show()

+-----+------------+
|seqno|       names|
+-----+------------+
|    1|  john jones|
|    2|tracey smith|
|    3| amy sanders|
+-----+------------+



In [10]:
df.columns

['seqno', 'names']

In [13]:
xx=df.select('names').rdd.flatMap(lambda x: x).collect()

In [14]:
xx

['john jones', 'tracey smith', 'amy sanders']

### Another approach is using annotation

In [61]:
@udf(returnType=StringType())
def say_hello(name):
     return f"Hello {name}"

In [62]:
df=df.withColumn("greet",say_hello(col('names')))

In [63]:
df.show()

+-----+------------+------------+------------------+
|seqno|       names|       upper|             greet|
+-----+------------+------------+------------------+
|    1|  john jones|  JOHN JONES|  Hello john jones|
|    2|tracey smith|TRACEY SMITH|Hello tracey smith|
|    3| amy sanders| AMY SANDERS| Hello amy sanders|
+-----+------------+------------+------------------+



### Register function is also an approach

In [66]:
spark.udf.register("toupper", toUpper)

<function __main__.toUpper(name)>

In [68]:
df.createOrReplaceTempView("df_table")

In [69]:
spark.sql("select toupper(names) from df_table").show()

+--------------+
|toupper(names)|
+--------------+
|    JOHN JONES|
|  TRACEY SMITH|
|   AMY SANDERS|
+--------------+



## Get value of a particular cell in PySpark Dataframe

In [4]:
data =[["1","sravan","company 1"],
       ["2","ojaswi","company 2"],
       ["3","bobby","company 3"],
       ["4","rohith","company 2"],
       ["5","gnanesh","company 1"]]
  
# specify column names
columns=['Employee ID','Employee NAME',
         'Company Name']
  
# creating a dataframe from the lists of data
dataframe = spark.createDataFrame(data,columns)
  
# display dataframe
dataframe.select("Employee ID").count()

5

In [5]:
var=dataframe.count()

In [13]:
var

5

In [14]:
for i in range(var):
    print(dataframe.collect()[i][0])

1
2
3
4
5


In [5]:
print("first row=",dataframe.collect()[0][0])

first row= 1


## How to TRIM leading spaces in values of column in Pyspark in Databricks

In [8]:
nullDF=spark.createDataFrame([
    (11,"  Ravi",100),(12,"   Verma", 200),(14, "       Sam",300)
],["id","name","salary"])

In [50]:
from pyspark.sql.functions import length, ltrim,rtrim

In [10]:
lengthDF=nullDF.withColumn("length",length(nullDF.name))

In [None]:
##Length here are 6 8 and 10

In [16]:
lengthDF.show()

+---+----------+------+------+
| id|      name|salary|length|
+---+----------+------+------+
| 11|      Ravi|   100|     6|
| 12|     Verma|   200|     8|
| 14|       Sam|   300|    10|
+---+----------+------+------+



In [15]:
trimmed_df=lengthDF.withColumn("name",ltrim(nullDF.name))

In [18]:
trimmed_df=trimmed_df.withColumn("length",length(trimmed_df.name))

In [20]:
## Length has been reduced to 4 5 and 3

In [19]:
trimmed_df.show()

+---+-----+------+------+
| id| name|salary|length|
+---+-----+------+------+
| 11| Ravi|   100|     4|
| 12|Verma|   200|     5|
| 14|  Sam|   300|     3|
+---+-----+------+------+



## How to TRIM trailing spaces in values of column in Pyspark
## How to TRIM leading & trailing spaces in values of column in Pyspark in Databricks ?
#### Using trim we can trim both leading and trailing space

In [35]:
nullDF=spark.createDataFrame([
    (11,"Ravi     ",100),(12,"Verma     ", 200),(14, "Sammy",300)
],["id","name","salary"])

In [36]:
lengthDF=nullDF.withColumn("length",length(nullDF.name))

In [37]:
lengthDF.show()


+---+----------+------+------+
| id|      name|salary|length|
+---+----------+------+------+
| 11| Ravi     |   100|     9|
| 12|Verma     |   200|    10|
| 14|     Sammy|   300|     5|
+---+----------+------+------+



In [38]:
trimmed_df=lengthDF.withColumn("name",rtrim(nullDF.name))

In [39]:
trimmed_df=trimmed_df.withColumn("length",length(trimmed_df.name))

In [40]:
trimmed_df.show()

+---+-----+------+------+
| id| name|salary|length|
+---+-----+------+------+
| 11| Ravi|   100|     4|
| 12|Verma|   200|     5|
| 14|Sammy|   300|     5|
+---+-----+------+------+



## How to Remove the space in Columns & converting into camelCase

In [43]:
data =[["1","sravan",100],
       ["2","ojaswi",None],
       ["3","bobby",102],
       ["4","rohith",None],
       ["5","gnanesh",200]]

In [44]:
# specify column names
columns=['emp id','first name',
         'emp salary']

In [45]:
mydf=spark.createDataFrame(data,columns)

In [46]:
mydf.show()

+------+----------+----------+
|emp id|first name|emp salary|
+------+----------+----------+
|     1|    sravan|       100|
|     2|    ojaswi|      null|
|     3|     bobby|       102|
|     4|    rohith|      null|
|     5|   gnanesh|       200|
+------+----------+----------+



In [47]:
mydf.printSchema()

root
 |-- emp id: string (nullable = true)
 |-- first name: string (nullable = true)
 |-- emp salary: long (nullable = true)



In [51]:
cols=mydf.columns

In [52]:
titleCols=[''.join(j for j in i.title() if not j.isspace()) for i in cols]

In [53]:
camelCols=[column[0].lower()+column[1:] for column in titleCols]

In [54]:
finalColsDf=mydf.toDF(*camelCols)

In [55]:
finalColsDf.show()

+-----+---------+---------+
|empId|firstName|empSalary|
+-----+---------+---------+
|    1|   sravan|      100|
|    2|   ojaswi|     null|
|    3|    bobby|      102|
|    4|   rohith|     null|
|    5|  gnanesh|      200|
+-----+---------+---------+



In [56]:
finalColsDf.printSchema()

root
 |-- empId: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- empSalary: long (nullable = true)



In [58]:
finalColsDf.explain(extended = "true")

== Parsed Logical Plan ==
Project [emp id#221 AS empId#237, first name#222 AS firstName#238, emp salary#223L AS empSalary#239L]
+- LogicalRDD [emp id#221, first name#222, emp salary#223L], false

== Analyzed Logical Plan ==
empId: string, firstName: string, empSalary: bigint
Project [emp id#221 AS empId#237, first name#222 AS firstName#238, emp salary#223L AS empSalary#239L]
+- LogicalRDD [emp id#221, first name#222, emp salary#223L], false

== Optimized Logical Plan ==
Project [emp id#221 AS empId#237, first name#222 AS firstName#238, emp salary#223L AS empSalary#239L]
+- LogicalRDD [emp id#221, first name#222, emp salary#223L], false

== Physical Plan ==
*(1) Project [emp id#221 AS empId#237, first name#222 AS firstName#238, emp salary#223L AS empSalary#239L]
+- Scan ExistingRDD[emp id#221,first name#222,emp salary#223L]


## Get number of partition of records

In [30]:
data_df =[["1","sravan","company 1"],
       ["2","ojaswi","company 2"],
       ["3","bobby","company 3"],
       ["4","rohith","company 2"],
       ["5","gnanesh","company 1"]]
  
# specify column names
columns=['Employee ID','Employee NAME',
         'Company Name']
  
# creating a dataframe from the lists of data
my_dataframe = spark.createDataFrame(data_df,columns)
  
# display dataframe
#dataframe.select("Employee ID").count()

In [31]:
from pyspark.sql.functions import spark_partition_id

In [32]:
my_dataframe=my_dataframe.withColumn("partition_id",spark_partition_id()).groupBy("partition_id").count()

In [33]:
my_dataframe.show()

+------------+-----+
|partition_id|count|
+------------+-----+
|           0|    5|
+------------+-----+



In [14]:
my_dataframe.show()

+-----------+-------------+------------+
|Employee ID|Employee NAME|Company Name|
+-----------+-------------+------------+
|          1|       sravan|   company 1|
|          2|       ojaswi|   company 2|
|          3|        bobby|   company 3|
|          4|       rohith|   company 2|
|          5|      gnanesh|   company 1|
+-----------+-------------+------------+



# Given a record calculate the performance

***
Input
* year    runs 
* 2016    360
* 2017    312
* 2018    275
* 2019    301
* 2020    103

 

 output:
* year    runs    performance
* 2016    360        null
* 2017    312        bad
* 2018    275        bad
* 2019    301        good
* 2020    103        bad

***

In [117]:
data=[[2016,360],[2017,312],[2018,275],[2019,301],[2020,103]]
columns=['year','runs']
score_df=spark.createDataFrame(data,columns)

In [118]:
score_df.show()

+----+----+
|year|runs|
+----+----+
|2016| 360|
|2017| 312|
|2018| 275|
|2019| 301|
|2020| 103|
+----+----+



In [98]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [122]:
score_df=score_df.withColumn("perf",lag('runs').over(Window.orderBy(asc('year'))))


In [123]:
score_df=score_df.withColumn("out",when( col("runs") < col("perf"),'bad').otherwise('good'))

In [134]:
score_df=score_df.withColumn("out",when(score_df.perf.isNull(),'null').otherwise(col('out')))

In [135]:
score_df.show()

+----+----+----+----+
|year|runs|perf| out|
+----+----+----+----+
|2016| 360|null|null|
|2017| 312| 360| bad|
|2018| 275| 312| bad|
|2019| 301| 275|good|
|2020| 103| 301| bad|
+----+----+----+----+



In [119]:
score_df.withColumn("out",when( col("runs") < lag('runs').over(Window.orderBy(asc('year'))),'bad').otherwise('good')).show()

+----+----+----+
|year|runs| out|
+----+----+----+
|2016| 360|good|
|2017| 312| bad|
|2018| 275| bad|
|2019| 301|good|
|2020| 103| bad|
+----+----+----+



## Handling None and null in Pyspark 

In [136]:
null_df=spark.createDataFrame([[1,None],[2,"ravi"]],('id','name'))

In [137]:
null_df.show()

+---+----+
| id|name|
+---+----+
|  1|null|
|  2|ravi|
+---+----+



In [138]:
#If we type null it will throw error as null is not a value in python
null_df=spark.createDataFrame([[1,null],[2,"ravi"]],('id','name'))

NameError: name 'null' is not defined

In [140]:
null_df=spark.createDataFrame([[1,None],[2,"ravi"],[None,None]],('id','name'))

In [141]:
null_df.show()

+----+----+
|  id|name|
+----+----+
|   1|null|
|   2|ravi|
|null|null|
+----+----+



In [145]:
#The isNull function returns True if the value is null and False otherwise.
null_df.withColumn("check_null",null_df.name.isNull()).show()

+----+----+----------+
|  id|name|check_null|
+----+----+----------+
|   1|null|      true|
|   2|ravi|     false|
|null|null|      true|
+----+----+----------+



In [149]:
null_df=null_df.withColumn("other_id",lit(1))

In [150]:
# == equality operator handles comparisons with null values.
null_df.withColumn("check_null",null_df.id==null_df.other_id).show()

+----+----+--------+----------+
|  id|name|other_id|check_null|
+----+----+--------+----------+
|   1|null|       1|      true|
|   2|ravi|       1|     false|
|null|null|       1|      null|
+----+----+--------+----------+



In [None]:
#When one value is null and the other is not null, return False
#When both values are null, return True

In [151]:
null_df.withColumn(
  "num1_eq_num2",
  when(null_df.id.isNull() & null_df.other_id.isNull(), True)
    .when(null_df.id.isNull() | null_df.other_id.isNull(), False)
    .otherwise(null_df.id == null_df.other_id)
).show()

+----+----+--------+------------+
|  id|name|other_id|num1_eq_num2|
+----+----+--------+------------+
|   1|null|       1|        true|
|   2|ravi|       1|       false|
|null|null|       1|       false|
+----+----+--------+------------+



### null safe equality
### We can perform the same null safe equality comparison with the built-in eqNullSafe function.

In [152]:
null_df.withColumn("num1_eqNullSafe_num2", null_df.id.eqNullSafe(null_df.other_id)).show()


+----+----+--------+--------------------+
|  id|name|other_id|num1_eqNullSafe_num2|
+----+----+--------+--------------------+
|   1|null|       1|                true|
|   2|ravi|       1|               false|
|null|null|       1|               false|
+----+----+--------+--------------------+

