In [70]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [71]:
spark = SparkSession.builder.appName('Write to Files').master('local[*]').getOrCreate()

In [72]:
emp_df = spark.read.csv('HR-Dataset/core_dataset.csv',header=True,inferSchema=True)

#### count

In [73]:
emp_df.count()

302

#### where & filter

In [74]:
emp_df.where(col('State')=='TX').count()

3

In [75]:
emp_df.where(col('State')!='TX').count()

298

In [76]:
emp_df.filter(col('State')=='TX').show()

+---------------+---------------+-----+-------+---------+---+----+-----------+-------------------+---------------+--------------------+------------+-------------------+--------------------+-----------------+----------+--------------------+--------+-------------+------------------+--------------------+
|  Employee Name|Employee Number|State|    Zip|      DOB|Age| Sex|MaritalDesc|        CitizenDesc|Hispanic/Latino|            RaceDesc|Date of Hire|Date of Termination|     Reason For Term|Employment Status|Department|            Position|Pay Rate| Manager Name|   Employee Source|   Performance Score|
+---------------+---------------+-----+-------+---------+---+----+-----------+-------------------+---------------+--------------------+------------+-------------------+--------------------+-----------------+----------+--------------------+--------+-------------+------------------+--------------------+
| Murray, Thomas|     1406068403|   TX|78230.0| 7/4/1988| 29|Male|   Divorced|         US C

In [77]:
# select count(*) from emp where Sex='Male' and Age between 25 and 30 and MaritalDesc in ('Single', 'Seperated', 'Divorced')
emp_df.where( 
    (col('Sex')=='Male') & 
    (col('Age')<=30) & 
    (col('Age')>=25) & 
    ((col('MaritalDesc')=='Single') | (col('MaritalDesc')=='Seperated') | (col('MaritalDesc')=='Divorced'))
).count()

11

#### withColumn

In [78]:
emp_df.withColumn('First_name', split(col('Employee Name'),', ').getItem(0)).\
withColumn('Last_name', split(col('Employee Name'),', ').getItem(1)).\
withColumn('new_pay_rate', col('Pay Rate')*1.1 ).\
select('First_name','Last_name','new_pay_rate','Department').show()

+------------+----------+------------------+----------------+
|  First_name| Last_name|      new_pay_rate|      Department|
+------------+----------+------------------+----------------+
|       Brown|       Mia|             31.35|   Admin Offices|
|   LaRotonda| William  |              25.3|   Admin Offices|
|      Steans|  Tyrone  |31.900000000000002|   Admin Offices|
|      Howard|   Estelle|23.650000000000002|   Admin Offices|
|       Singh|      Nan |            18.216|   Admin Offices|
|       Smith| Leigh Ann|             22.55|   Admin Offices|
|     LeBlanc|Brandon  R| 60.50000000000001|   Admin Offices|
|       Quinn|      Sean| 60.50000000000001|   Admin Offices|
|    Boutwell|   Bonalyn| 38.44500000000001|   Admin Offices|
|Foster-Baker|       Amy| 38.44500000000001|   Admin Offices|
|        King|     Janet|              88.0|Executive Office|
|      Zamora|  Jennifer|              71.5|           IT/IS|
|      Becker|     Renee|47.300000000000004|           IT/IS|
|       

#### withColumnRenamed

In [79]:
emp_df = emp_df.withColumnRenamed('Hispanic/Latino','Hispanic_Latino')

In [80]:
emp_df.columns

['Employee Name',
 'Employee Number',
 'State',
 'Zip',
 'DOB',
 'Age',
 'Sex',
 'MaritalDesc',
 'CitizenDesc',
 'Hispanic_Latino',
 'RaceDesc',
 'Date of Hire',
 'Date of Termination',
 'Reason For Term',
 'Employment Status',
 'Department',
 'Position',
 'Pay Rate',
 'Manager Name',
 'Employee Source',
 'Performance Score']

In [81]:
for colu in emp_df.columns:
    emp_df = emp_df.withColumnRenamed(colu, colu.replace(' ','_').replace('/',' '))

In [82]:
emp_df.columns

['Employee_Name',
 'Employee_Number',
 'State',
 'Zip',
 'DOB',
 'Age',
 'Sex',
 'MaritalDesc',
 'CitizenDesc',
 'Hispanic_Latino',
 'RaceDesc',
 'Date_of_Hire',
 'Date_of_Termination',
 'Reason_For_Term',
 'Employment_Status',
 'Department',
 'Position',
 'Pay_Rate',
 'Manager_Name',
 'Employee_Source',
 'Performance_Score']

#### drop

In [83]:
emp_df = emp_df.drop('Zip', 'DOB')

#### Distinct

In [84]:
d = [
    {'id':1,'name':'anna','dept':'cse'},
    {'id':2,'name':'john','dept':'cse'},
    {'id':4,'name':'alice','dept':'cse'},
    {'id':3,'name':'bob','dept':'cse'},
    {'id':4,'name':'alice','dept':'cse'}
]
df = spark.createDataFrame(d)

In [85]:
df.show()

+----+---+-----+
|dept| id| name|
+----+---+-----+
| cse|  1| anna|
| cse|  2| john|
| cse|  4|alice|
| cse|  3|  bob|
| cse|  4|alice|
+----+---+-----+



In [86]:
df.drop_duplicates().show()

+----+---+-----+
|dept| id| name|
+----+---+-----+
| cse|  1| anna|
| cse|  2| john|
| cse|  4|alice|
| cse|  3|  bob|
+----+---+-----+



In [87]:
df.distinct().show()

+----+---+-----+
|dept| id| name|
+----+---+-----+
| cse|  1| anna|
| cse|  2| john|
| cse|  4|alice|
| cse|  3|  bob|
+----+---+-----+



#### Group BY

In [88]:
# select sex, min(`Pay Rate`) min_pay_rate, max(`Pay Rate`) max_pay_rate
# from emp
# group by sex

In [89]:
emp_df.groupby('Sex').max('Pay_Rate').show()

+------+-------------+
|   Sex|max(Pay_Rate)|
+------+-------------+
|  NULL|         NULL|
|Female|         80.0|
|  male|         29.0|
|  Male|         65.0|
+------+-------------+



In [90]:
emp_df = emp_df.withColumn('Sex', lower(col('Sex')))

In [91]:
emp_df.groupby('Sex').max('Pay_Rate').show()

+------+-------------+
|   Sex|max(Pay_Rate)|
+------+-------------+
|  NULL|         NULL|
|female|         80.0|
|  male|         65.0|
+------+-------------+



In [92]:
emp_df.groupby('Sex').min('Pay_Rate').show()

+------+-------------+
|   Sex|min(Pay_Rate)|
+------+-------------+
|  NULL|         NULL|
|female|         14.0|
|  male|         14.0|
+------+-------------+



In [93]:
emp_df.groupby('Sex').count().show()

+------+-----+
|   Sex|count|
+------+-----+
|  NULL|    1|
|female|  174|
|  male|  127|
+------+-----+



#### Join

In [94]:
d = [
    {'dept_name':'Sales','HOD':'John'},
    {'dept_name':'Admin Offices','HOD':'Joshaf'},
    {'dept_name':'Production','HOD':'Ram'},
    {'dept_name':'Executive Office','HOD':'Vamsi'},
    {'dept_name':'Software Engineering','HOD':'Mahesh'},
    {'dept_name':'IT/IS','HOD':'Mahesh'}
]

In [95]:
dept_df = spark.createDataFrame(d)

In [96]:
dept_df.show()

+------+--------------------+
|   HOD|           dept_name|
+------+--------------------+
|  John|               Sales|
|Joshaf|       Admin Offices|
|   Ram|          Production|
| Vamsi|    Executive Office|
|Mahesh|Software Engineering|
|Mahesh|               IT/IS|
+------+--------------------+



In [97]:
# select * from emp
# left join dept on (emp.Department=dept.dept_name)

emp_df = emp_df.join(dept_df, emp_df['Department']==dept_df['dept_name'],'left')

#### Union

In [100]:
d1 = [
    {'id':1,'name':'anna','dept':'cse'},
    {'id':2,'name':'john','dept':'cse'},
    {'id':4,'name':'alice','dept':'cse'},
    {'id':3,'name':'bob','dept':'cse'},
    {'id':4,'name':'alice','dept':'cse'}
]
df_1 = spark.createDataFrame(d1)

d2 = [
    {'id':1,'name':'mahesh','dept':'cse'},
    {'id':1,'name':'mahesh','dept':'cse'},
    {'name':'mahesh','dept':'cse'},
]
df_2 = spark.createDataFrame(d2)

In [101]:
df_1.union(df_2).show()

+----+----+------+
|dept|  id|  name|
+----+----+------+
| cse|   1|  anna|
| cse|   2|  john|
| cse|   4| alice|
| cse|   3|   bob|
| cse|   4| alice|
| cse|   1|mahesh|
| cse|   1|mahesh|
| cse|NULL|mahesh|
+----+----+------+



In [102]:
df_u = df_1.union(df_2)

#### collect

In [103]:

df_u.collect()

[Row(dept='cse', id=1, name='anna'),
 Row(dept='cse', id=2, name='john'),
 Row(dept='cse', id=4, name='alice'),
 Row(dept='cse', id=3, name='bob'),
 Row(dept='cse', id=4, name='alice'),
 Row(dept='cse', id=1, name='mahesh'),
 Row(dept='cse', id=1, name='mahesh'),
 Row(dept='cse', id=None, name='mahesh')]

#### cache

In [105]:
emp_df = emp_df.cache()

#### cast

In [107]:
df_u = df_u.withColumn('id_new', col('id').cast('bigint'))

#### alias

In [111]:
df_u.select(col('id').alias('identifier'), 'dept').show()

+----------+----+
|identifier|dept|
+----------+----+
|         1| cse|
|         2| cse|
|         4| cse|
|         3| cse|
|         4| cse|
|         1| cse|
|         1| cse|
|      NULL| cse|
+----------+----+

