In [0]:
from pyspark.sql.functions import*
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,LongType
from pyspark.sql.functions import row_number,rank,dense_rank
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.functions import current_date,date_format,to_date,datediff,months_between,add_months

In [0]:
data=[("HDFC","Chennai",1050010123940000),
      ("CANARA","Bangalore",2050010123940001),
      ("INDIAN","Hyderabad",3050010123940002),
      ("UNION","Mumbai",4050010123940003)]

schema=StructType([
    StructField("Bank",StringType()),
    StructField("Address",StringType()),
    StructField("AccountNumber",LongType())
])

df=spark.createDataFrame(data=data,schema=schema)
display(df)

Bank,Address,AccountNumber
HDFC,Chennai,1050010123940000
CANARA,Bangalore,2050010123940001
INDIAN,Hyderabad,3050010123940002
UNION,Mumbai,4050010123940003


In [0]:
star_df = df.withColumn("AccountNumberStared",concat(lit('*' * 12),substring(col("AccountNumber").cast("string"), -4, 4)))
display(star_df)

Bank,Address,AccountNumber,AccountNumberStared
HDFC,Chennai,1050010123940000,************0000
CANARA,Bangalore,2050010123940001,************0001
INDIAN,Hyderabad,3050010123940002,************0002
UNION,Mumbai,4050010123940003,************0003


In [0]:
data=[("Nancy","HR",2000),
      ("Lishma","HR",3000),
      ("Lav","IT",3000),
      ("sowmi","HR",2500),
      ("joe","IT",3000),
      ("raj","IT",1500),
      ("rayan","IT",4000),
      ("Diya","PayRoll",2500),
      ("Vishnu","PayRoll",2000)]
schema=StructType([
    StructField("Name",StringType()),
    StructField("Dep",StringType()),
    StructField("Salary",IntegerType())
])
df=spark.createDataFrame(data=data,schema=schema)
# df.show()
# df.sort("Dep").show()
window=Window.partitionBy("Dep").orderBy("Salary")
df.withColumn("rownumber",row_number().over(window)).show()
df.withColumn("rank",rank().over(window)).show()
df.withColumn("denserank",dense_rank().over(window)).show()

+------+-------+------+---------+
|  Name|    Dep|Salary|rownumber|
+------+-------+------+---------+
| Nancy|     HR|  2000|        1|
| sowmi|     HR|  2500|        2|
|Lishma|     HR|  3000|        3|
|   raj|     IT|  1500|        1|
|   Lav|     IT|  3000|        2|
|   joe|     IT|  3000|        3|
| rayan|     IT|  4000|        4|
|Vishnu|PayRoll|  2000|        1|
|  Diya|PayRoll|  2500|        2|
+------+-------+------+---------+

+------+-------+------+----+
|  Name|    Dep|Salary|rank|
+------+-------+------+----+
| Nancy|     HR|  2000|   1|
| sowmi|     HR|  2500|   2|
|Lishma|     HR|  3000|   3|
|   raj|     IT|  1500|   1|
|   Lav|     IT|  3000|   2|
|   joe|     IT|  3000|   2|
| rayan|     IT|  4000|   4|
|Vishnu|PayRoll|  2000|   1|
|  Diya|PayRoll|  2500|   2|
+------+-------+------+----+

+------+-------+------+---------+
|  Name|    Dep|Salary|denserank|
+------+-------+------+---------+
| Nancy|     HR|  2000|        1|
| sowmi|     HR|  2500|        2|
|Lishma| 

In [0]:
data = [(1,"Nancy",2000,500),
        (2,"lish",3000,1000)]
schema = StructType([
    StructField("ID",IntegerType()),
    StructField("Name",StringType()),
    StructField("Salary",IntegerType()),
    StructField("Bonus",IntegerType())    
])
df=spark.createDataFrame(data=data,schema=schema)
df.display()

ID,Name,Salary,Bonus
1,Nancy,2000,500
2,lish,3000,1000


In [0]:
def totalpay(s,b):
    return s+b 
# help(udf)
TotalPayment = udf(lambda s,b : totalpay(s,b),IntegerType())
df.withColumn("totalpay",TotalPayment(df.Salary,df.Bonus)).show()

+---+-----+------+-----+--------+
| ID| Name|Salary|Bonus|totalpay|
+---+-----+------+-----+--------+
|  1|Nancy|  2000|  500|    2500|
|  2| lish|  3000| 1000|    4000|
+---+-----+------+-----+--------+



In [0]:
data = [(1,"Nancy",2000,500),
        (2,"lish",3000,1000)]
schema = StructType([
    StructField("ID",IntegerType()),
    StructField("Name",StringType()),
    StructField("Salary",IntegerType()),
    StructField("Bonus",IntegerType())    
])
df=spark.createDataFrame(data,schema)
df.createOrReplaceTempView('emp')

In [0]:
%sql
select * from emp

ID,Name,Salary,Bonus
1,Nancy,2000,500
2,lish,3000,1000


In [0]:
df=spark.range(2)
df1 = df.withColumn("currentdate",current_date())
df1.show()
df2=df1.withColumn("currentdate",to_date(df1.currentdate,'yyyy-dd-mm'))
df2.show()

+---+-----------+
| id|currentdate|
+---+-----------+
|  0| 2024-01-02|
|  1| 2024-01-02|
+---+-----------+

+---+-----------+
| id|currentdate|
+---+-----------+
|  0| 2024-01-02|
|  1| 2024-01-02|
+---+-----------+



In [0]:
df=spark.createDataFrame([('2023-11-20','2023-12-20')], ['d1' ,'d2'])
df.show()
df.withColumn("datediff",datediff(df.d2,df.d1)).show()
df.withColumn('monthsbetween',months_between(df.d2,df.d1)).show()
df.withColumn('addmonths',add_months(df.d2 , 10)).show()
df.withColumn('submonth',add_months(df.d2 , -3)).show()

+----------+----------+
|        d1|        d2|
+----------+----------+
|2023-11-20|2023-12-20|
+----------+----------+

+----------+----------+--------+
|        d1|        d2|datediff|
+----------+----------+--------+
|2023-11-20|2023-12-20|      30|
+----------+----------+--------+

+----------+----------+-------------+
|        d1|        d2|monthsbetween|
+----------+----------+-------------+
|2023-11-20|2023-12-20|          1.0|
+----------+----------+-------------+

+----------+----------+----------+
|        d1|        d2| addmonths|
+----------+----------+----------+
|2023-11-20|2023-12-20|2024-10-20|
+----------+----------+----------+

+----------+----------+----------+
|        d1|        d2|  submonth|
+----------+----------+----------+
|2023-11-20|2023-12-20|2023-09-20|
+----------+----------+----------+



In [0]:
data = [('Mohamed','Rayan'),('Mohamed','Iayan')]
rdd=spark.sparkContext.parallelize(data)
rdd1 = rdd.map(lambda x : x + (x[0]+''+x[1],))
print(rdd1.collect())

[('Mohamed', 'Rayan', 'MohamedRayan'), ('Mohamed', 'Iayan', 'MohamedIayan')]


In [0]:
data = [('Mohamed','Rayan'),('Mohamed','Iayan')]
df = spark.createDataFrame(data,['firstname','lastname'])
rdd1 = df.rdd.map(lambda x : x + (x[0]+''+x[1],))
df1 = rdd1.toDF(['firstname','lastname','fullname'])
df1.show()

+---------+--------+------------+
|firstname|lastname|    fullname|
+---------+--------+------------+
|  Mohamed|   Rayan|MohamedRayan|
|  Mohamed|   Iayan|MohamedIayan|
+---------+--------+------------+



In [0]:
def fullname(x):
    return x + (x[0]+''+x[1],)

data = [('Mohamed','Rayan'),('Mohamed','Iayan')]
df = spark.createDataFrame(data,['firstname','lastname'])
rdd1 = df.rdd.map(lambda x : fullname(x))
df1 = rdd1.toDF(['firstname','lastname','fullname'])
df1.show()

+---------+--------+------------+
|firstname|lastname|    fullname|
+---------+--------+------------+
|  Mohamed|   Rayan|MohamedRayan|
|  Mohamed|   Iayan|MohamedIayan|
+---------+--------+------------+



In [0]:
df = spark.createDataFrame(sc.parallelize([['ABC',[1,2,3]],['XYZ',[2,None,4]],['KLM',[8,7]],['IJK',[5]]]),["key","value"])
display(df)

key,value
ABC,"List(1, 2, 3)"
XYZ,"List(2, null, 4)"
KLM,"List(8, 7)"
IJK,List(5)


In [0]:
df.select("key",df.value[0],df.value[1],df.value[2]).display()

key,value[0],value[1],value[2]
ABC,1,2.0,3.0
XYZ,2,,4.0
KLM,8,7.0,
IJK,5,,


In [0]:
from pyspark.sql.functions import size,col
dfSize = df.select("key","value",size("value").alias("NoofArrayElements"))
dfSize.display()

key,value,NoofArrayElements
ABC,"List(1, 2, 3)",3
XYZ,"List(2, null, 4)",3
KLM,"List(8, 7)",2
IJK,List(5),1


In [0]:
max_value = dfSize.agg({"NoofArrayElements":"max"}).collect()[0][0]
print(max_value)

3


In [0]:
def arraySplitIntoCols(df,maxElements):
    for i in range(maxElements):
        df = df.withColumn(f"new_col{i}",df.value[i])
    return df

In [0]:
dfout = arraySplitIntoCols(df,max_value)
display(dfout)

key,value,new_col0,new_col1,new_col2
ABC,"List(1, 2, 3)",1,2.0,3.0
XYZ,"List(2, null, 4)",2,,4.0
KLM,"List(8, 7)",8,7.0,
IJK,List(5),5,,


In [0]:
df=spark.read.format("csv").option("header", "false").load("dbfs:/FileStore/sample12__1_.csv")
df1=df.toDF("Col1")
df1.show(truncate=False)

+--------------------------------------------------------------------------+
|Col1                                                                      |
+--------------------------------------------------------------------------+
|1001|Ram|28|Java|1002|Raj|24|Database|1004|Jam|28|DotNet|1005|Kesh|25|Java|
+--------------------------------------------------------------------------+



In [0]:
from pyspark.sql import functions as f
df2=df1.withColumn('Col2',f.regexp_replace(f.col("Col1"),"(.*?\\|){4}","$0\n"))
df2.select("Col2").show(truncate=False)

+--------------------------------------------------------------------------------+
|Col2                                                                            |
+--------------------------------------------------------------------------------+
|1001|Ram|28|Java|\n1002|Raj|24|Database|\n1004|Jam|28|DotNet|\n1005|Kesh|25|Java|
+--------------------------------------------------------------------------------+



In [0]:
df_header=['eno','ename','age','tech']
df2.select(f.explode(f.split("Col2","\n")))\
    .select(f.split("col","\|").alias("value"))\
    .select(*map(lambda i: f.col("value").getItem(df_header.index(i)).alias(i),df_header)).show()

+----+-----+---+--------+
| eno|ename|age|    tech|
+----+-----+---+--------+
|1001|  Ram| 28|    Java|
|1002|  Raj| 24|Database|
|1004|  Jam| 28|  DotNet|
|1005| Kesh| 25|    Java|
+----+-----+---+--------+



In [0]:
df_header=['eno','ename','age','tech']
df2.select(f.explode(f.split("Col2","\n")))\
    .select(f.split("col","\|").alias("value"))\
    .select(*map(lambda i: f.col("value").getItem(df_header.index(i)).alias(i),df_header)).show()

+----+-----+---+--------+
| eno|ename|age|    tech|
+----+-----+---+--------+
|1001|  Ram| 28|    Java|
|1002|  Raj| 24|Database|
|1004|  Jam| 28|  DotNet|
|1005| Kesh| 25|    Java|
+----+-----+---+--------+



In [0]:
df_header=['eno','ename','age','tech']
df2.select(f.split("Col2","\n")).show(truncate=False)
df2.select(f.explode(f.split("Col2","\n"))).show(truncate=False)
df2.select(f.explode(f.split("Col2","\n")))\
    .select(f.split("col","\|").alias("value")).show(truncate=False)

+----------------------------------------------------------------------------------+
|split(Col2, \n, -1)                                                               |
+----------------------------------------------------------------------------------+
|[1001|Ram|28|Java|, 1002|Raj|24|Database|, 1004|Jam|28|DotNet|, 1005|Kesh|25|Java]|
+----------------------------------------------------------------------------------+

+---------------------+
|col                  |
+---------------------+
|1001|Ram|28|Java|    |
|1002|Raj|24|Database||
|1004|Jam|28|DotNet|  |
|1005|Kesh|25|Java    |
+---------------------+

+---------------------------+
|value                      |
+---------------------------+
|[1001, Ram, 28, Java, ]    |
|[1002, Raj, 24, Database, ]|
|[1004, Jam, 28, DotNet, ]  |
|[1005, Kesh, 25, Java]     |
+---------------------------+

