In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# pip install Pyspark
# PySpark
#https://sparkbyexamples.com/pyspark/pyspark-rename-dataframe-column/
#https://www.guru99.com/pyspark-tutorial.html#7

In [2]:
spark = SparkSession.builder.master("local[1]").appName("spark demo").getOrCreate()

In [3]:
spark

In [8]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [6]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



In [7]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [15]:
dataCollect = df.collect()
print(dataCollect)

print("------------------")
for row in dataCollect:
    print(row['firstname'] + "," +str(row['dob']))

[Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000), Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000), Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000), Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000), Row(firstname='Jen', middlename='Mary', lastname='Brown', dob='1980-02-17', gender='F', salary=-1)]
------------------
James,1991-04-01
Michael,2000-05-19
Robert,1978-09-05
Maria,1967-12-01
Jen,1980-02-17


In [16]:
#select 

dataSelect = df.select("firstname","dob")
dataSelect.show(truncate=False)

+---------+----------+
|firstname|dob       |
+---------+----------+
|James    |1991-04-01|
|Michael  |2000-05-19|
|Robert   |1978-09-05|
|Maria    |1967-12-01|
|Jen      |1980-02-17|
+---------+----------+



In [8]:
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
columns = ["firstname","lastname","country","state"]
df_country = spark.createDataFrame(data = data, schema = columns)
df_country.show(truncate=False)

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|James    |Smith   |USA    |CA   |
|Michael  |Rose    |USA    |NY   |
|Robert   |Williams|USA    |CA   |
|Maria    |Jones   |USA    |FL   |
+---------+--------+-------+-----+



In [9]:
#join 

df_join = df.join(df_country,df.firstname == df_country.firstname,'inner')
df_join.show(truncate=False)

+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|firstname|middlename|lastname|dob       |gender|salary|firstname|lastname|country|state|
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |James    |Smith   |USA    |CA   |
|Michael  |Rose      |        |2000-05-19|M     |4000  |Michael  |Rose    |USA    |NY   |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |Maria    |Jones   |USA    |FL   |
|Robert   |          |Williams|1978-09-05|M     |4000  |Robert   |Williams|USA    |CA   |
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+



In [10]:
#left Join
df_join = df.join(df_country,df.firstname == df_country.firstname,'left')
df_join.show(truncate=False)

+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|firstname|middlename|lastname|dob       |gender|salary|firstname|lastname|country|state|
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |James    |Smith   |USA    |CA   |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |null     |null    |null   |null |
|Michael  |Rose      |        |2000-05-19|M     |4000  |Michael  |Rose    |USA    |NY   |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |Maria    |Jones   |USA    |FL   |
|Robert   |          |Williams|1978-09-05|M     |4000  |Robert   |Williams|USA    |CA   |
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+



In [11]:
#outer Join
df_join = df.join(df_country,df.firstname == df_country.firstname,'outer')
df_join.show(truncate=False)

+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|firstname|middlename|lastname|dob       |gender|salary|firstname|lastname|country|state|
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |James    |Smith   |USA    |CA   |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |null     |null    |null   |null |
|Michael  |Rose      |        |2000-05-19|M     |4000  |Michael  |Rose    |USA    |NY   |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |Maria    |Jones   |USA    |FL   |
|Robert   |          |Williams|1978-09-05|M     |4000  |Robert   |Williams|USA    |CA   |
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+



In [12]:
#right Join
df_join = df.join(df_country,df.firstname == df_country.firstname,'right')
df_join.show(truncate=False)

+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|firstname|middlename|lastname|dob       |gender|salary|firstname|lastname|country|state|
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |James    |Smith   |USA    |CA   |
|Michael  |Rose      |        |2000-05-19|M     |4000  |Michael  |Rose    |USA    |NY   |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |Maria    |Jones   |USA    |FL   |
|Robert   |          |Williams|1978-09-05|M     |4000  |Robert   |Williams|USA    |CA   |
+---------+----------+--------+----------+------+------+---------+--------+-------+-----+



In [6]:
# dateConvertionFunction

from datetime import datetime

def yearFromDate(dateStr):
    dt = datetime.strptime(dateStr, '%Y-%m-%d')
    return dt.year

print(yearFromDate('1991-04-01'))

1991


In [7]:
#UDF Creation
from pyspark.sql.functions import col, udf,array_contains
from pyspark.sql.types import StringType

#Converting function to UDF StringType() is by default hence not required 
yearFromDF = udf(lambda z: yearFromDate(z),StringType())

In [9]:
df = df.withColumn('year',yearFromDF(col('dob')))
df.show(truncate=False)

+---------+----------+--------+----------+------+------+----+
|firstname|middlename|lastname|dob       |gender|salary|year|
+---------+----------+--------+----------+------+------+----+
|James    |          |Smith   |1991-04-01|M     |3000  |1991|
|Michael  |Rose      |        |2000-05-19|M     |4000  |2000|
|Robert   |          |Williams|1978-09-05|M     |4000  |1978|
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |1967|
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |1980|
+---------+----------+--------+----------+------+------+----+



In [10]:

#creating a UDF is a 2 step process, first, you need to create a Python function, 
#second convert function to UDF using SQL udf() function, however, you can avoid these two steps and create it with just a single step by using annotations

@udf(returnType=StringType()) 
def monthOfDate(dateStr):
    dt = datetime.strptime(dateStr, '%Y-%m-%d')
    return dt.month
    

In [12]:
df = df.withColumn('month',monthOfDate(col('dob')))
df.show(truncate=False)

+---------+----------+--------+----------+------+------+----+-----+
|firstname|middlename|lastname|dob       |gender|salary|year|month|
+---------+----------+--------+----------+------+------+----+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |1991|4    |
|Michael  |Rose      |        |2000-05-19|M     |4000  |2000|5    |
|Robert   |          |Williams|1978-09-05|M     |4000  |1978|9    |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |1967|12   |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |1980|2    |
+---------+----------+--------+----------+------+------+----+-----+



In [13]:
df = df.withColumnRenamed('year','year_of_dod')
df.show(truncate=False)

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|dob       |gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |1991       |4    |
|Michael  |Rose      |        |2000-05-19|M     |4000  |2000       |5    |
|Robert   |          |Williams|1978-09-05|M     |4000  |1978       |9    |
|Maria    |Anne      |Jones   |1967-12-01|F     |4000  |1967       |12   |
|Jen      |Mary      |Brown   |1980-02-17|F     |-1    |1980       |2    |
+---------+----------+--------+----------+------+------+-----------+-----+



In [14]:
dataCollect = df.collect()
print(dataCollect)

[Row(firstname='James', middlename='', lastname='Smith', dob='1991-04-01', gender='M', salary=3000, year_of_dod='1991', month='4'), Row(firstname='Michael', middlename='Rose', lastname='', dob='2000-05-19', gender='M', salary=4000, year_of_dod='2000', month='5'), Row(firstname='Robert', middlename='', lastname='Williams', dob='1978-09-05', gender='M', salary=4000, year_of_dod='1978', month='9'), Row(firstname='Maria', middlename='Anne', lastname='Jones', dob='1967-12-01', gender='F', salary=4000, year_of_dod='1967', month='12'), Row(firstname='Jen', middlename='Mary', lastname='Brown', dob='1980-02-17', gender='F', salary=-1, year_of_dod='1980', month='2')]


In [16]:
for row in dataCollect:
    print(row['firstname'] + "," +str(row['lastname'])+"," +str(row['dob']))

James,Smith,1991-04-01
Michael,,2000-05-19
Robert,Williams,1978-09-05
Maria,Jones,1967-12-01
Jen,Brown,1980-02-17


In [19]:
# filter or where (both are same)
df.filter(df.year_of_dod == '1991').show()

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|       dob|gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|    James|          |   Smith|1991-04-01|     M|  3000|       1991|    4|
+---------+----------+--------+----------+------+------+-----------+-----+



In [20]:
df.where(df.year_of_dod == '1991').show()

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|       dob|gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|    James|          |   Smith|1991-04-01|     M|  3000|       1991|    4|
+---------+----------+--------+----------+------+------+-----------+-----+



In [27]:
# multiple filter conditions
df.filter((df.year_of_dod == '1991') | (df.year_of_dod == '1980')).show()

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|       dob|gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|    James|          |   Smith|1991-04-01|     M|  3000|       1991|    4|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|       1980|    2|
+---------+----------+--------+----------+------+------+-----------+-----+



In [28]:
df.where((df.year_of_dod == '1991') | (df.year_of_dod == '1980')).show()

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|       dob|gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|    James|          |   Smith|1991-04-01|     M|  3000|       1991|    4|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|       1980|    2|
+---------+----------+--------+----------+------+------+-----------+-----+



In [33]:
df.filter(col('lastname').like("%Smi%")).show(truncate=False)  

+---------+----------+--------+----------+------+------+-----------+-----+
|firstname|middlename|lastname|dob       |gender|salary|year_of_dod|month|
+---------+----------+--------+----------+------+------+-----------+-----+
|James    |          |Smith   |1991-04-01|M     |3000  |1991       |4    |
+---------+----------+--------+----------+------+------+-----------+-----+



In [39]:
# Drop Duplicates and Distinct

duplicateDataExp = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jen", "Finance", 4900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]
columnsForDropDup= ["employee_name", "department", "salary"]
dfExampleForDuplicates = spark.createDataFrame(data = duplicateDataExp, schema = columnsForDropDup)
dfExampleForDuplicates.printSchema()
dfExampleForDuplicates.show(truncate=False)

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jen          |Finance   |4900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



In [36]:
dfExampleForDuplicates.distinct().show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+



In [38]:
dfExampleForDuplicates.dropDuplicates().show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Scott|   Finance|  3300|
|        Kumar| Marketing|  2000|
|        James|     Sales|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
+-------------+----------+------+



In [40]:
dfExampleForDuplicates.dropDuplicates(["employee_name"]).show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        Scott|   Finance|  3300|
|        James|     Sales|  3000|
|          Jen|   Finance|  3900|
|      Michael|     Sales|  4600|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
|        Maria|   Finance|  3000|
|       Robert|     Sales|  4100|
|         Jeff| Marketing|  3000|
+-------------+----------+------+



In [4]:
# OrderBy or Sort

simpleData = [("James","Sales","NY",90000,34,10000), \
    ("Michael","Sales","NY",86000,56,20000), \
    ("Robert","Sales","CA",81000,30,23000), \
    ("Maria","Finance","CA",90000,24,23000), \
    ("Raman","Finance","CA",99000,40,24000), \
    ("Scott","Finance","NY",83000,36,19000), \
    ("Jen","Finance","NY",79000,53,15000), \
    ("Jeff","Marketing","CA",80000,25,18000), \
    ("Kumar","Marketing","NY",91000,50,21000) \
  ]
columns= ["employee_name","department","state","salary","age","bonus"]
dfForSort = spark.createDataFrame(data = simpleData,schema = columns)
dfForSort.show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [8]:
# OrderBy and Sort By both are same

dfForSort.sort(col("salary")).show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Raman        |Finance   |CA   |99000 |40 |24000|
+-------------+----------+-----+------+---+-----+



In [10]:
dfForSort.orderBy(col("salary").desc()).show(truncate = False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Jen          |Finance   |NY   |79000 |53 |15000|
+-------------+----------+-----+------+---+-----+



In [14]:
dfForSort.createOrReplaceTempView("EMPLOYEE_SALARY")


In [15]:
spark.sql("select * from EMPLOYEE_SALARY order by salary desc").show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Raman        |Finance   |CA   |99000 |40 |24000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Jen          |Finance   |NY   |79000 |53 |15000|
+-------------+----------+-----+------+---+-----+



In [16]:
spark.sql("select * from EMPLOYEE_SALARY order by salary asc").show(truncate=False)

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|Jen          |Finance   |NY   |79000 |53 |15000|
|Jeff         |Marketing |CA   |80000 |25 |18000|
|Robert       |Sales     |CA   |81000 |30 |23000|
|Scott        |Finance   |NY   |83000 |36 |19000|
|Michael      |Sales     |NY   |86000 |56 |20000|
|Maria        |Finance   |CA   |90000 |24 |23000|
|James        |Sales     |NY   |90000 |34 |10000|
|Kumar        |Marketing |NY   |91000 |50 |21000|
|Raman        |Finance   |CA   |99000 |40 |24000|
+-------------+----------+-----+------+---+-----+



In [17]:
# GroupBy  = always it will return the Grouped Data we need to apply actions on it like count,sum,Min,Max,Agg etc

dfForSort.groupBy(col("department")).sum("salary").show(truncate = False)

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Sales     |257000     |
|Finance   |351000     |
|Marketing |171000     |
+----------+-----------+



In [19]:
dfForSort.groupBy(col('department'),col('state')).sum('salary','bonus').show(truncate = False)

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance   |NY   |162000     |34000     |
|Marketing |NY   |91000      |21000     |
|Sales     |CA   |81000      |23000     |
|Marketing |CA   |80000      |18000     |
|Finance   |CA   |189000     |47000     |
|Sales     |NY   |176000     |30000     |
+----------+-----+-----------+----------+



In [24]:
spark.sql("select department,state,sum(salary),sum(bonus) from EMPLOYEE_SALARY group by department,state").show(truncate = False)

spark.sql("select department,state,sum(salary),sum(bonus),max(bonus),min(bonus) from EMPLOYEE_SALARY group by department,state").show(truncate = False)

+----------+-----+-----------+----------+
|department|state|sum(salary)|sum(bonus)|
+----------+-----+-----------+----------+
|Finance   |NY   |162000     |34000     |
|Marketing |NY   |91000      |21000     |
|Sales     |CA   |81000      |23000     |
|Marketing |CA   |80000      |18000     |
|Finance   |CA   |189000     |47000     |
|Sales     |NY   |176000     |30000     |
+----------+-----+-----------+----------+

+----------+-----+-----------+----------+----------+----------+
|department|state|sum(salary)|sum(bonus)|max(bonus)|min(bonus)|
+----------+-----+-----------+----------+----------+----------+
|Finance   |NY   |162000     |34000     |19000     |15000     |
|Marketing |NY   |91000      |21000     |21000     |21000     |
|Sales     |CA   |81000      |23000     |23000     |23000     |
|Marketing |CA   |80000      |18000     |18000     |18000     |
|Finance   |CA   |189000     |47000     |24000     |23000     |
|Sales     |NY   |176000     |30000     |20000     |10000     |
+--

In [28]:
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

dfForSort.select(collect_list("salary")).show(truncate=False)
#listagg in sql
spark.sql("select collect_list(salary) from EMPLOYEE_SALARY").show(truncate=False)

+---------------------------------------------------------------+
|collect_list(salary)                                           |
+---------------------------------------------------------------+
|[90000, 86000, 81000, 90000, 99000, 83000, 79000, 80000, 91000]|
+---------------------------------------------------------------+

+---------------------------------------------------------------+
|collect_list(salary)                                           |
+---------------------------------------------------------------+
|[90000, 86000, 81000, 90000, 99000, 83000, 79000, 80000, 91000]|
+---------------------------------------------------------------+



In [29]:
spark.sql("select department,collect_list(salary) from EMPLOYEE_SALARY group by department").show(truncate=False)

+----------+----------------------------+
|department|collect_list(salary)        |
+----------+----------------------------+
|Sales     |[90000, 86000, 81000]       |
|Finance   |[90000, 99000, 83000, 79000]|
|Marketing |[80000, 91000]              |
+----------+----------------------------+



In [40]:
df_Temp= dfForSort.groupby("department").agg(collect_list("salary").alias("collect_data"))
df_Temp.show(truncate = False)

+----------+----------------------------+
|department|collect_data                |
+----------+----------------------------+
|Sales     |[90000, 86000, 81000]       |
|Finance   |[90000, 99000, 83000, 79000]|
|Marketing |[80000, 91000]              |
+----------+----------------------------+



In [41]:
from pyspark.sql.functions import explode
df_Temp.select(df_Temp.department,explode(df_Temp.collect_data)).show(truncate = False)

+----------+-----+
|department|col  |
+----------+-----+
|Sales     |90000|
|Sales     |86000|
|Sales     |81000|
|Finance   |90000|
|Finance   |99000|
|Finance   |83000|
|Finance   |79000|
|Marketing |80000|
|Marketing |91000|
+----------+-----+



In [43]:
# Pivot Table
#Spark pivot() function is used to pivot/rotate the data from one DataFrame column into multiple columns 
#(transform rows to columns) and unpivot is used to transform it back (transform columns to rows).
dfForSort.groupBy("department").pivot("state").sum("salary").show(truncate = False)


+----------+------+------+
|department|CA    |NY    |
+----------+------+------+
|Sales     |81000 |176000|
|Finance   |189000|162000|
|Marketing |80000 |91000 |
+----------+------+------+



In [44]:
dfForSort.groupBy("state").pivot("department").sum("salary").show(truncate = False)

+-----+-------+---------+------+
|state|Finance|Marketing|Sales |
+-----+-------+---------+------+
|CA   |189000 |80000    |81000 |
|NY   |162000 |91000    |176000|
+-----+-------+---------+------+



In [47]:
#PySpark Window Functions
#PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row.

#row_number Window Function
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,rank
windowSpec  = Window.partitionBy("department").orderBy("salary")

dfForSort.withColumn('row_number',row_number().over(windowSpec)).show(truncate = False)


+-------------+----------+-----+------+---+-----+----------+
|employee_name|department|state|salary|age|bonus|row_number|
+-------------+----------+-----+------+---+-----+----------+
|Robert       |Sales     |CA   |81000 |30 |23000|1         |
|Michael      |Sales     |NY   |86000 |56 |20000|2         |
|James        |Sales     |NY   |90000 |34 |10000|3         |
|Jen          |Finance   |NY   |79000 |53 |15000|1         |
|Scott        |Finance   |NY   |83000 |36 |19000|2         |
|Maria        |Finance   |CA   |90000 |24 |23000|3         |
|Raman        |Finance   |CA   |99000 |40 |24000|4         |
|Jeff         |Marketing |CA   |80000 |25 |18000|1         |
|Kumar        |Marketing |NY   |91000 |50 |21000|2         |
+-------------+----------+-----+------+---+-----+----------+



In [48]:
dfForSort.withColumn('rank',rank().over(windowSpec)).orderBy('rank').show(truncate = False)

+-------------+----------+-----+------+---+-----+----+
|employee_name|department|state|salary|age|bonus|rank|
+-------------+----------+-----+------+---+-----+----+
|Jen          |Finance   |NY   |79000 |53 |15000|1   |
|Robert       |Sales     |CA   |81000 |30 |23000|1   |
|Jeff         |Marketing |CA   |80000 |25 |18000|1   |
|Michael      |Sales     |NY   |86000 |56 |20000|2   |
|Kumar        |Marketing |NY   |91000 |50 |21000|2   |
|Scott        |Finance   |NY   |83000 |36 |19000|2   |
|James        |Sales     |NY   |90000 |34 |10000|3   |
|Maria        |Finance   |CA   |90000 |24 |23000|3   |
|Raman        |Finance   |CA   |99000 |40 |24000|4   |
+-------------+----------+-----+------+---+-----+----+



In [52]:
# lag"""
from pyspark.sql.functions import lag    
dfForSort.withColumn("lag",lag("salary",1).over(windowSpec)).select('employee_name','department','state','salary','lag').show()

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary|  lag|
+-------------+----------+-----+------+-----+
|       Robert|     Sales|   CA| 81000| null|
|      Michael|     Sales|   NY| 86000|81000|
|        James|     Sales|   NY| 90000|86000|
|          Jen|   Finance|   NY| 79000| null|
|        Scott|   Finance|   NY| 83000|79000|
|        Maria|   Finance|   CA| 90000|83000|
|        Raman|   Finance|   CA| 99000|90000|
|         Jeff| Marketing|   CA| 80000| null|
|        Kumar| Marketing|   NY| 91000|80000|
+-------------+----------+-----+------+-----+



In [53]:
dfForSort.withColumn("lag",lag("salary",2).over(windowSpec)).select('employee_name','department','state','salary','lag').show()

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary|  lag|
+-------------+----------+-----+------+-----+
|       Robert|     Sales|   CA| 81000| null|
|      Michael|     Sales|   NY| 86000| null|
|        James|     Sales|   NY| 90000|81000|
|          Jen|   Finance|   NY| 79000| null|
|        Scott|   Finance|   NY| 83000| null|
|        Maria|   Finance|   CA| 90000|79000|
|        Raman|   Finance|   CA| 99000|83000|
|         Jeff| Marketing|   CA| 80000| null|
|        Kumar| Marketing|   NY| 91000| null|
+-------------+----------+-----+------+-----+



In [59]:
 #"""lead"""
from pyspark.sql.functions import lead    
dfForSort.withColumn("lead",lead("salary",1).over(windowSpec)).select('employee_name','department','state','salary','lead').show()

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary| lead|
+-------------+----------+-----+------+-----+
|       Robert|     Sales|   CA| 81000|86000|
|      Michael|     Sales|   NY| 86000|90000|
|        James|     Sales|   NY| 90000| null|
|          Jen|   Finance|   NY| 79000|83000|
|        Scott|   Finance|   NY| 83000|90000|
|        Maria|   Finance|   CA| 90000|99000|
|        Raman|   Finance|   CA| 99000| null|
|         Jeff| Marketing|   CA| 80000|91000|
|        Kumar| Marketing|   NY| 91000| null|
+-------------+----------+-----+------+-----+



In [60]:
dfForSort.withColumn("lead",lead("salary",2).over(windowSpec)).select('employee_name','department','state','salary','lead').show()

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary| lead|
+-------------+----------+-----+------+-----+
|       Robert|     Sales|   CA| 81000|90000|
|      Michael|     Sales|   NY| 86000| null|
|        James|     Sales|   NY| 90000| null|
|          Jen|   Finance|   NY| 79000|90000|
|        Scott|   Finance|   NY| 83000|99000|
|        Maria|   Finance|   CA| 90000| null|
|        Raman|   Finance|   CA| 99000| null|
|         Jeff| Marketing|   CA| 80000| null|
|        Kumar| Marketing|   NY| 91000| null|
+-------------+----------+-----+------+-----+



In [61]:
dfForSort.withColumn("lead",lead("salary",0).over(windowSpec)).select('employee_name','department','state','salary','lead').show()

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary| lead|
+-------------+----------+-----+------+-----+
|       Robert|     Sales|   CA| 81000|81000|
|      Michael|     Sales|   NY| 86000|86000|
|        James|     Sales|   NY| 90000|90000|
|          Jen|   Finance|   NY| 79000|79000|
|        Scott|   Finance|   NY| 83000|83000|
|        Maria|   Finance|   CA| 90000|90000|
|        Raman|   Finance|   CA| 99000|99000|
|         Jeff| Marketing|   CA| 80000|80000|
|        Kumar| Marketing|   NY| 91000|91000|
+-------------+----------+-----+------+-----+



In [66]:
# windows aggration function 
windowSpecAgg  = Window.partitionBy("department")
dfForSort.withColumn("sum", sum(col("salary")).over(windowSpecAgg)).select('employee_name','department','state','salary','sum').show(truncate = False)
dfForSort.withColumn("max", max(col("salary")).over(windowSpecAgg)).select('employee_name','department','state','salary','max').show(truncate = False)

+-------------+----------+-----+------+------+
|employee_name|department|state|salary|sum   |
+-------------+----------+-----+------+------+
|James        |Sales     |NY   |90000 |257000|
|Michael      |Sales     |NY   |86000 |257000|
|Robert       |Sales     |CA   |81000 |257000|
|Maria        |Finance   |CA   |90000 |351000|
|Raman        |Finance   |CA   |99000 |351000|
|Scott        |Finance   |NY   |83000 |351000|
|Jen          |Finance   |NY   |79000 |351000|
|Jeff         |Marketing |CA   |80000 |171000|
|Kumar        |Marketing |NY   |91000 |171000|
+-------------+----------+-----+------+------+

+-------------+----------+-----+------+-----+
|employee_name|department|state|salary|max  |
+-------------+----------+-----+------+-----+
|James        |Sales     |NY   |90000 |90000|
|Michael      |Sales     |NY   |86000 |90000|
|Robert       |Sales     |CA   |81000 |90000|
|Maria        |Finance   |CA   |90000 |99000|
|Raman        |Finance   |CA   |99000 |99000|
|Scott        |Finan

In [46]:
# Machine Learning Example
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
sc = spark.sparkContext
sc.addFile(url)
dfAdultData = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
dfAdultData.show(truncate=False)

+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|x  |age|workclass       |fnlwgt|education   |educational-num|marital-status    |occupation       |relationship |race              |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+----------------+------+------------+---------------+------------------+-----------------+-------------+------------------+------+------------+------------+--------------+--------------+------+
|1  |25 |Private         |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child    |Black             |Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private         |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband      |White             |Male  |0           |0           |50            |United-S

In [67]:
from pyspark.sql.types import *
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [None]:
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
df_string = convertColumn(dfAdultData, CONTI_FEATURES, FloatType())
df_string.show(truncate=False,5)

In [None]:
dfAdultData.select('age','fnlwgt').show(5)
dfAdultData.describe().show()
dfAdultData.crosstab('age', 'label').sort("age_label").show()

In [None]:
dfAdultData.drop('education_num').columns
age_square = df.select(col("age")**2)
dfAdultData = dfAdultData.withColumn("age_square", col("age")**2)

In [None]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
dfAdultData = dfAdultData.select(COLUMNS)
dfAdultData.first()

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(dfAdultData)
indexed = model.transform(dfAdultData)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoded = encoder.transform(indexed)
encoded.show(2)

In [None]:
#ML Pipe line

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [None]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")stages += [assembler]
ages += [assembler]

In [None]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(dfAdultData)
model = pipelineModel.transform(dfAdultData)
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
train_data, test_data = dfAdultData.randomSplit([.8,.2],seed=1234)
train_data.groupby('label').agg({'label': 'count'}).show()
test_data.groupby('label').agg({'label': 'count'}).show()

In [None]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)
predictions = linearModel.transform(test_data)
selected = predictions.select("label", "prediction", "probability")
selected.show(20)