In [0]:
# creating spark session and df
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CodingAssessment").getOrCreate()
data=[(1,"John Doe",30,"Marketing",50000),
(2,"Jane Smith",35,"Finance",60000),
(3,"Michael Johnson",28,"Human Resources",45000),
(4,"Sarah Brown",40,"Engineering",70000),
(5,"Chris Lee",32,"Marketing",55000),
(6,"Emily Wilson",45,"Finance",65000),
(7,"David Kim",38,"Engineering",72000),
(8,"Lisa Garcia",29,"Human Resources",48000),
(9,"James Taylor",33,"Marketing",52000),
(10,"Amanda Clark",37,"Finance",62000)]
df = spark.createDataFrame(data, ['id', 'name', 'age', 'department', 'salary'])
df.show()

+---+---------------+---+---------------+------+
| id|           name|age|     department|salary|
+---+---------------+---+---------------+------+
|  1|       John Doe| 30|      Marketing| 50000|
|  2|     Jane Smith| 35|        Finance| 60000|
|  3|Michael Johnson| 28|Human Resources| 45000|
|  4|    Sarah Brown| 40|    Engineering| 70000|
|  5|      Chris Lee| 32|      Marketing| 55000|
|  6|   Emily Wilson| 45|        Finance| 65000|
|  7|      David Kim| 38|    Engineering| 72000|
|  8|    Lisa Garcia| 29|Human Resources| 48000|
|  9|   James Taylor| 33|      Marketing| 52000|
| 10|   Amanda Clark| 37|        Finance| 62000|
+---+---------------+---+---------------+------+



In [0]:
# manipulating df
# changing column name
df.withColumnRenamed("salary","salary_amount").show()

+---+---------------+---+---------------+-------------+
| id|           name|age|     department|salary_amount|
+---+---------------+---+---------------+-------------+
|  1|       John Doe| 30|      Marketing|        50000|
|  2|     Jane Smith| 35|        Finance|        60000|
|  3|Michael Johnson| 28|Human Resources|        45000|
|  4|    Sarah Brown| 40|    Engineering|        70000|
|  5|      Chris Lee| 32|      Marketing|        55000|
|  6|   Emily Wilson| 45|        Finance|        65000|
|  7|      David Kim| 38|    Engineering|        72000|
|  8|    Lisa Garcia| 29|Human Resources|        48000|
|  9|   James Taylor| 33|      Marketing|        52000|
| 10|   Amanda Clark| 37|        Finance|        62000|
+---+---------------+---+---------------+-------------+



In [0]:
print(df.take(3))

[Row(id=1, name='John Doe', age=30, department='Marketing', salary=50000), Row(id=2, name='Jane Smith', age=35, department='Finance', salary=60000), Row(id=3, name='Michael Johnson', age=28, department='Human Resources', salary=45000)]


In [0]:
# pivoting
df.groupBy("department").pivot("name").sum("salary").show()

+---------------+------------+---------+---------+------------+------------+----------+--------+-----------+---------------+-----------+
|     department|Amanda Clark|Chris Lee|David Kim|Emily Wilson|James Taylor|Jane Smith|John Doe|Lisa Garcia|Michael Johnson|Sarah Brown|
+---------------+------------+---------+---------+------------+------------+----------+--------+-----------+---------------+-----------+
|    Engineering|        NULL|     NULL|    72000|        NULL|        NULL|      NULL|    NULL|       NULL|           NULL|      70000|
|        Finance|       62000|     NULL|     NULL|       65000|        NULL|     60000|    NULL|       NULL|           NULL|       NULL|
|      Marketing|        NULL|    55000|     NULL|        NULL|       52000|      NULL|   50000|       NULL|           NULL|       NULL|
|Human Resources|        NULL|     NULL|     NULL|        NULL|        NULL|      NULL|    NULL|      48000|          45000|       NULL|
+---------------+------------+---------+-

In [0]:
# drop missing values
df_missing=spark.read.csv("dbfs:/FileStore/tables/test44.csv",header=True)
df_missing.show()

+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
| Vaibhav|        IT| 50000|
|    NULL|        HR| 45000|
| Shubham|        IT|  NULL|
|    NULL|        HR| 56000|
|Vaishali|        IT|  NULL|
|   Nisha|      NULL| 65000|
+--------+----------+------+



In [0]:
df_missing.na.drop().show()

+-------+----------+------+
|   Name|Department|Salary|
+-------+----------+------+
|Vaibhav|        IT| 50000|
+-------+----------+------+



In [0]:
df_missing.na.drop(how="any",thresh=2).show() 

+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
| Vaibhav|        IT| 50000|
|    NULL|        HR| 45000|
| Shubham|        IT|  NULL|
|    NULL|        HR| 56000|
|Vaishali|        IT|  NULL|
|   Nisha|      NULL| 65000|
+--------+----------+------+



In [0]:
df_missing.na.drop(how="any",subset=["name"]).show()

+--------+----------+------+
|    Name|Department|Salary|
+--------+----------+------+
| Vaibhav|        IT| 50000|
| Shubham|        IT|  NULL|
|Vaishali|        IT|  NULL|
|   Nisha|      NULL| 65000|
+--------+----------+------+



In [0]:
# sorting
df.sort(df["salary"].desc()).show()

+---+---------------+---+---------------+------+
| id|           name|age|     department|salary|
+---+---------------+---+---------------+------+
|  7|      David Kim| 38|    Engineering| 72000|
|  4|    Sarah Brown| 40|    Engineering| 70000|
|  6|   Emily Wilson| 45|        Finance| 65000|
| 10|   Amanda Clark| 37|        Finance| 62000|
|  2|     Jane Smith| 35|        Finance| 60000|
|  5|      Chris Lee| 32|      Marketing| 55000|
|  9|   James Taylor| 33|      Marketing| 52000|
|  1|       John Doe| 30|      Marketing| 50000|
|  8|    Lisa Garcia| 29|Human Resources| 48000|
|  3|Michael Johnson| 28|Human Resources| 45000|
+---+---------------+---+---------------+------+



In [0]:
df.sort("age","salary").show()

+---+---------------+---+---------------+------+
| id|           name|age|     department|salary|
+---+---------------+---+---------------+------+
|  3|Michael Johnson| 28|Human Resources| 45000|
|  8|    Lisa Garcia| 29|Human Resources| 48000|
|  1|       John Doe| 30|      Marketing| 50000|
|  5|      Chris Lee| 32|      Marketing| 55000|
|  9|   James Taylor| 33|      Marketing| 52000|
|  2|     Jane Smith| 35|        Finance| 60000|
| 10|   Amanda Clark| 37|        Finance| 62000|
|  7|      David Kim| 38|    Engineering| 72000|
|  4|    Sarah Brown| 40|    Engineering| 70000|
|  6|   Emily Wilson| 45|        Finance| 65000|
+---+---------------+---+---------------+------+



In [0]:
# groupby
df.groupBy("department").sum("salary").alias("total_salary").show()

+---------------+-----------+
|     department|sum(salary)|
+---------------+-----------+
|      Marketing|     157000|
|        Finance|     187000|
|Human Resources|      93000|
|    Engineering|     142000|
+---------------+-----------+



In [0]:
df.groupBy("department").count().alias("number_of_employees").show()

+---------------+-----+
|     department|count|
+---------------+-----+
|      Marketing|    3|
|        Finance|    3|
|Human Resources|    2|
|    Engineering|    2|
+---------------+-----+



In [0]:
# aggregation
df.groupBy("department").agg(({"salary":"sum"})).show()

+---------------+-----------+
|     department|sum(salary)|
+---------------+-----------+
|      Marketing|     157000|
|        Finance|     187000|
|Human Resources|      93000|
|    Engineering|     142000|
+---------------+-----------+



In [0]:
# joining
employees=spark.read.csv("dbfs:/FileStore/tables/employees.csv",header=True,inferSchema=True)
employees.show()

+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
|          1|        Alice|          101|
|          2|          Bob|          102|
|          3|      Charlie|          101|
|          4|        David|          103|
+-----------+-------------+-------------+



In [0]:
departments=spark.read.csv("dbfs:/FileStore/tables/department.csv",header=True,inferSchema=True)
departments.show()

+-------------+---------------+
|department_id|department_name|
+-------------+---------------+
|          101|    Engineering|
|          102|          Sales|
|          103|      Marketing|
|          104|             HR|
+-------------+---------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"inner") .show()

+-----------+-------------+-------------+-------------+---------------+
|employee_id|employee_name|department_id|department_id|department_name|
+-----------+-------------+-------------+-------------+---------------+
|          1|        Alice|          101|          101|    Engineering|
|          2|          Bob|          102|          102|          Sales|
|          3|      Charlie|          101|          101|    Engineering|
|          4|        David|          103|          103|      Marketing|
+-----------+-------------+-------------+-------------+---------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"outer") .show()

+-----------+-------------+-------------+-------------+---------------+
|employee_id|employee_name|department_id|department_id|department_name|
+-----------+-------------+-------------+-------------+---------------+
|          1|        Alice|          101|          101|    Engineering|
|          3|      Charlie|          101|          101|    Engineering|
|          2|          Bob|          102|          102|          Sales|
|          4|        David|          103|          103|      Marketing|
|       NULL|         NULL|         NULL|          104|             HR|
+-----------+-------------+-------------+-------------+---------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"left").show()

+-----------+-------------+-------------+-------------+---------------+
|employee_id|employee_name|department_id|department_id|department_name|
+-----------+-------------+-------------+-------------+---------------+
|          1|        Alice|          101|          101|    Engineering|
|          2|          Bob|          102|          102|          Sales|
|          3|      Charlie|          101|          101|    Engineering|
|          4|        David|          103|          103|      Marketing|
+-----------+-------------+-------------+-------------+---------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"right") .show()

+-----------+-------------+-------------+-------------+---------------+
|employee_id|employee_name|department_id|department_id|department_name|
+-----------+-------------+-------------+-------------+---------------+
|          3|      Charlie|          101|          101|    Engineering|
|          1|        Alice|          101|          101|    Engineering|
|          2|          Bob|          102|          102|          Sales|
|          4|        David|          103|          103|      Marketing|
|       NULL|         NULL|         NULL|          104|             HR|
+-----------+-------------+-------------+-------------+---------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"leftsemi") .show()

+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
|          1|        Alice|          101|
|          2|          Bob|          102|
|          3|      Charlie|          101|
|          4|        David|          103|
+-----------+-------------+-------------+



In [0]:
employees.join(departments,employees.department_id ==  departments.department_id,"leftanti") .show()

+-----------+-------------+-------------+
|employee_id|employee_name|department_id|
+-----------+-------------+-------------+
+-----------+-------------+-------------+



In [0]:
# creating session
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("CodingAssessment2").config("spark.sql.catalogImplementation","hive").getOrCreate()

In [0]:
# creating database
spark.sql("CREATE DATABASE IF NOT EXISTS employees_db COMMENT 'This is employees table' LOCATION '/user'")

DataFrame[]

In [0]:
# creating employeesData table in the database
spark.sql("CREATE TABLE IF NOT EXISTS employeesData(emp_id INT, emp_name String, emp_dept_id INT, emp_salary INT)")

DataFrame[]

In [0]:
# inserting data into employeesData table
spark.sql("INSERT INTO employeesData VALUES (1,'Alice',101,10000),(2,'Bob',102,12000),(3,'Charlie',103,15000),(4,'David',101,9000)")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# showing employeesData table
spark.sql("SELECT * FROM employeesData").show()

+------+--------+-----------+----------+
|emp_id|emp_name|emp_dept_id|emp_salary|
+------+--------+-----------+----------+
|     1|   Alice|        101|     10000|
|     2|     Bob|        102|     12000|
|     3| Charlie|        103|     15000|
|     4|   David|        101|      9000|
+------+--------+-----------+----------+



In [0]:
# creating Departments table in the database
spark.sql("CREATE TABLE IF NOT EXISTS departments(dept_id INT, dept_name String)")

DataFrame[]

In [0]:
# inserting data into Departments table
spark.sql("INSERT INTO Departments VALUES (101,'Engineering'),(102,'Sales'),(103,'Marketing'),(104,'HR')")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Showing departments table
spark.sql("SELECT * FROM Departments").show()

+-------+-----------+
|dept_id|  dept_name|
+-------+-----------+
|    101|Engineering|
|    102|      Sales|
|    103|  Marketing|
|    104|         HR|
+-------+-----------+



In [0]:
# Inner join
spark.sql("SELECT * FROM employeesData INNER JOIN Departments ON employeesData.emp_dept_id=Departments.dept_id").show()

+------+--------+-----------+----------+-------+-----------+
|emp_id|emp_name|emp_dept_id|emp_salary|dept_id|  dept_name|
+------+--------+-----------+----------+-------+-----------+
|     1|   Alice|        101|     10000|    101|Engineering|
|     2|     Bob|        102|     12000|    102|      Sales|
|     3| Charlie|        103|     15000|    103|  Marketing|
|     4|   David|        101|      9000|    101|Engineering|
+------+--------+-----------+----------+-------+-----------+



In [0]:
# left join
spark.sql("SELECT * FROM employeesData LEFT JOIN Departments ON employeesData.emp_dept_id=Departments.dept_id").show()

+------+--------+-----------+----------+-------+-----------+
|emp_id|emp_name|emp_dept_id|emp_salary|dept_id|  dept_name|
+------+--------+-----------+----------+-------+-----------+
|     1|   Alice|        101|     10000|    101|Engineering|
|     2|     Bob|        102|     12000|    102|      Sales|
|     3| Charlie|        103|     15000|    103|  Marketing|
|     4|   David|        101|      9000|    101|Engineering|
+------+--------+-----------+----------+-------+-----------+



In [0]:
# right join
spark.sql("SELECT * FROM employeesData RIGHT JOIN Departments ON employeesData.emp_dept_id=Departments.dept_id").show()

+------+--------+-----------+----------+-------+-----------+
|emp_id|emp_name|emp_dept_id|emp_salary|dept_id|  dept_name|
+------+--------+-----------+----------+-------+-----------+
|     4|   David|        101|      9000|    101|Engineering|
|     1|   Alice|        101|     10000|    101|Engineering|
|     2|     Bob|        102|     12000|    102|      Sales|
|     3| Charlie|        103|     15000|    103|  Marketing|
|  NULL|    NULL|       NULL|      NULL|    104|         HR|
+------+--------+-----------+----------+-------+-----------+



In [0]:
# full/outer join
spark.sql("SELECT * FROM employeesData FULL JOIN Departments ON employeesData.emp_dept_id=Departments.dept_id").show()

+------+--------+-----------+----------+-------+-----------+
|emp_id|emp_name|emp_dept_id|emp_salary|dept_id|  dept_name|
+------+--------+-----------+----------+-------+-----------+
|     1|   Alice|        101|     10000|    101|Engineering|
|     4|   David|        101|      9000|    101|Engineering|
|     2|     Bob|        102|     12000|    102|      Sales|
|     3| Charlie|        103|     15000|    103|  Marketing|
|  NULL|    NULL|       NULL|      NULL|    104|         HR|
+------+--------+-----------+----------+-------+-----------+



In [0]:
# applying functions in a pandas dataframe
import pyspark.pandas as pd
marks=({"marks_scored":[90,67,98,56,87],
      "negative_marks":[12,5,7,19,20]})
my_df=pd.DataFrame(marks)
print(my_df)

   marks_scored  negative_marks
0            90              12
1            67               5
2            98               7
3            56              19
4            87              20


In [0]:
def calculate_scores(scores):
    return scores[0]-scores[1]

final_score=my_df.apply(calculate_scores)
print(final_score)

marks_scored      23
negative_marks     7
dtype: int64
