# Question 1: Execute Manipulating, Droping, Sorting, Aggregations, Joining, GroupeBy  DataFrames

In [5]:
import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [6]:
spark=SparkSession.builder.appName('Pyspark Assessment').getOrCreate()

In [7]:
spark

## Creating DataFrame

In [62]:
data = [("Rohan", 22, "Male","IT",10000),
        ("Pranjal", 23, "Female","Mechanical",20000),
        ("Rushi", 22, "Male","Comps",30000),
        ("Rohit", None, "Male","BBA",40000)]

schema = ["Name", "Age", "Gender","Degree","Salary"]

df = spark.createDataFrame(data, schema=schema)
df.show()

+-------+----+------+----------+------+
|   Name| Age|Gender|    Degree|Salary|
+-------+----+------+----------+------+
|  Rohan|  22|  Male|        IT| 10000|
|Pranjal|  23|Female|Mechanical| 20000|
|  Rushi|  22|  Male|     Comps| 30000|
|  Rohit|NULL|  Male|       BBA| 40000|
+-------+----+------+----------+------+



## Manipulating Data

## Selecting columns

In [63]:
select_columns = df.select("Name", "Age")
select_columns.show()

+-------+----+
|   Name| Age|
+-------+----+
|  Rohan|  22|
|Pranjal|  23|
|  Rushi|  22|
|  Rohit|NULL|
+-------+----+



## Filtering Data

In [64]:
filtered_df = df.filter(df["Age"] > 22)
filtered_df.show()

+-------+---+------+----------+------+
|   Name|Age|Gender|    Degree|Salary|
+-------+---+------+----------+------+
|Pranjal| 23|Female|Mechanical| 20000|
+-------+---+------+----------+------+



## Adding column

In [65]:
new_column = df.withColumn("Age+Two", df["Age"] + 5)
new_column.show()

+-------+----+------+----------+------+-------+
|   Name| Age|Gender|    Degree|Salary|Age+Two|
+-------+----+------+----------+------+-------+
|  Rohan|  22|  Male|        IT| 10000|     27|
|Pranjal|  23|Female|Mechanical| 20000|     28|
|  Rushi|  22|  Male|     Comps| 30000|     27|
|  Rohit|NULL|  Male|       BBA| 40000|   NULL|
+-------+----+------+----------+------+-------+



## Renaming column

In [66]:
renamed_df = df.withColumnRenamed("Age", "Years")
renamed_df.show()


+-------+-----+------+----------+------+
|   Name|Years|Gender|    Degree|Salary|
+-------+-----+------+----------+------+
|  Rohan|   22|  Male|        IT| 10000|
|Pranjal|   23|Female|Mechanical| 20000|
|  Rushi|   22|  Male|     Comps| 30000|
|  Rohit| NULL|  Male|       BBA| 40000|
+-------+-----+------+----------+------+



## Handling missing values

In [67]:
df_with_missing = df.fillna(0, subset=["Age"])
df_with_missing.show()

+-------+---+------+----------+------+
|   Name|Age|Gender|    Degree|Salary|
+-------+---+------+----------+------+
|  Rohan| 22|  Male|        IT| 10000|
|Pranjal| 23|Female|Mechanical| 20000|
|  Rushi| 22|  Male|     Comps| 30000|
|  Rohit|  0|  Male|       BBA| 40000|
+-------+---+------+----------+------+



## Dropping

In [68]:
df1=df
df1.show()

+-------+----+------+----------+------+
|   Name| Age|Gender|    Degree|Salary|
+-------+----+------+----------+------+
|  Rohan|  22|  Male|        IT| 10000|
|Pranjal|  23|Female|Mechanical| 20000|
|  Rushi|  22|  Male|     Comps| 30000|
|  Rohit|NULL|  Male|       BBA| 40000|
+-------+----+------+----------+------+



## Dropping Multiple column

In [69]:
columns_to_drop = ["Age", "Gender"]
df_dropped_columns = df.drop(*columns_to_drop)
df_dropped_columns.show()

+-------+----------+------+
|   Name|    Degree|Salary|
+-------+----------+------+
|  Rohan|        IT| 10000|
|Pranjal|Mechanical| 20000|
|  Rushi|     Comps| 30000|
|  Rohit|       BBA| 40000|
+-------+----------+------+



## Drop Rows with Missing Values

In [70]:
df_with_missing=df.na.drop(how='any',thresh=None,subset=None)
df_with_missing.show()

+-------+---+------+----------+------+
|   Name|Age|Gender|    Degree|Salary|
+-------+---+------+----------+------+
|  Rohan| 22|  Male|        IT| 10000|
|Pranjal| 23|Female|Mechanical| 20000|
|  Rushi| 22|  Male|     Comps| 30000|
+-------+---+------+----------+------+



## Dropping Duplicate

In [71]:
df_with_duplicates = df.union(df) 
df_dropped_duplicates = df_with_duplicates.dropDuplicates()
df_with_duplicates.show()
df_dropped_duplicates.show()


+-------+----+------+----------+------+
|   Name| Age|Gender|    Degree|Salary|
+-------+----+------+----------+------+
|  Rohan|  22|  Male|        IT| 10000|
|Pranjal|  23|Female|Mechanical| 20000|
|  Rushi|  22|  Male|     Comps| 30000|
|  Rohit|NULL|  Male|       BBA| 40000|
|  Rohan|  22|  Male|        IT| 10000|
|Pranjal|  23|Female|Mechanical| 20000|
|  Rushi|  22|  Male|     Comps| 30000|
|  Rohit|NULL|  Male|       BBA| 40000|
+-------+----+------+----------+------+

+-------+----+------+----------+------+
|   Name| Age|Gender|    Degree|Salary|
+-------+----+------+----------+------+
|  Rohan|  22|  Male|        IT| 10000|
|Pranjal|  23|Female|Mechanical| 20000|
|  Rushi|  22|  Male|     Comps| 30000|
|  Rohit|NULL|  Male|       BBA| 40000|
+-------+----+------+----------+------+



## Aggregations

In [72]:
df.agg(({"Age":"sum"})).show()

+--------+
|sum(Age)|
+--------+
|      67|
+--------+



In [73]:
df.agg(({"Age":"avg"})).show()

+------------------+
|          avg(Age)|
+------------------+
|22.333333333333332|
+------------------+



In [74]:
df.agg({"Name":"count"}).show()

+-----------+
|count(Name)|
+-----------+
|          4|
+-----------+



## Joining

In [75]:
data1 = [("Rohan","Jalgaon"),
        ("Pranjal","Mumbai"),
        ("Rushi","Pune"),
        ("Rohit","Nashik")]

schema1 = ["Name","Country"]

df1 = spark.createDataFrame(data1, schema=schema1)
df1.show()

+-------+-------+
|   Name|Country|
+-------+-------+
|  Rohan|Jalgaon|
|Pranjal| Mumbai|
|  Rushi|   Pune|
|  Rohit| Nashik|
+-------+-------+



## Inner join

In [76]:
df.join(df1,df.Name ==  df1.Name,"inner").show()

+-------+----+------+----------+------+-------+-------+
|   Name| Age|Gender|    Degree|Salary|   Name|Country|
+-------+----+------+----------+------+-------+-------+
|Pranjal|  23|Female|Mechanical| 20000|Pranjal| Mumbai|
|  Rohan|  22|  Male|        IT| 10000|  Rohan|Jalgaon|
|  Rohit|NULL|  Male|       BBA| 40000|  Rohit| Nashik|
|  Rushi|  22|  Male|     Comps| 30000|  Rushi|   Pune|
+-------+----+------+----------+------+-------+-------+



## Left join

In [77]:
df.join(df1,df.Name ==  df1.Name,"left").show()

+-------+----+------+----------+------+-------+-------+
|   Name| Age|Gender|    Degree|Salary|   Name|Country|
+-------+----+------+----------+------+-------+-------+
|  Rohan|  22|  Male|        IT| 10000|  Rohan|Jalgaon|
|Pranjal|  23|Female|Mechanical| 20000|Pranjal| Mumbai|
|  Rushi|  22|  Male|     Comps| 30000|  Rushi|   Pune|
|  Rohit|NULL|  Male|       BBA| 40000|  Rohit| Nashik|
+-------+----+------+----------+------+-------+-------+



## Right join

In [78]:
df.join(df1,df.Name ==  df1.Name,"right").show()

+-------+----+------+----------+------+-------+-------+
|   Name| Age|Gender|    Degree|Salary|   Name|Country|
+-------+----+------+----------+------+-------+-------+
|  Rohan|  22|  Male|        IT| 10000|  Rohan|Jalgaon|
|Pranjal|  23|Female|Mechanical| 20000|Pranjal| Mumbai|
|  Rushi|  22|  Male|     Comps| 30000|  Rushi|   Pune|
|  Rohit|NULL|  Male|       BBA| 40000|  Rohit| Nashik|
+-------+----+------+----------+------+-------+-------+



## Left semi join

In [79]:
df.join(df1,df.Name ==  df1.Name,"left_semi").show()

+-------+----+------+----------+------+
|   Name| Age|Gender|    Degree|Salary|
+-------+----+------+----------+------+
|Pranjal|  23|Female|Mechanical| 20000|
|  Rohan|  22|  Male|        IT| 10000|
|  Rohit|NULL|  Male|       BBA| 40000|
|  Rushi|  22|  Male|     Comps| 30000|
+-------+----+------+----------+------+



## Left Anti Join

In [80]:
df.join(df1,df.Name ==  df1.Name,"left_anti").show()

+----+---+------+------+------+
|Name|Age|Gender|Degree|Salary|
+----+---+------+------+------+
+----+---+------+------+------+



## Group by

In [84]:
df.groupBy('Age').sum('Salary').show()
df.groupBy('Age').min('Salary').show()

+----+-----------+
| Age|sum(Salary)|
+----+-----------+
|  22|      40000|
|  23|      20000|
|NULL|      40000|
+----+-----------+

+----+-----------+
| Age|min(Salary)|
+----+-----------+
|  22|      10000|
|  23|      20000|
|NULL|      40000|
+----+-----------+



In [85]:
df.groupBy('Age').max('Salary').show()
df.groupBy('Age').avg('Salary').show()

+----+-----------+
| Age|max(Salary)|
+----+-----------+
|  22|      30000|
|  23|      20000|
|NULL|      40000|
+----+-----------+

+----+-----------+
| Age|avg(Salary)|
+----+-----------+
|  22|    20000.0|
|  23|    20000.0|
|NULL|    40000.0|
+----+-----------+



## Pivot

In [86]:
df.groupBy("Age").pivot("Gender").sum("Salary").show()

+----+------+-----+
| Age|Female| Male|
+----+------+-----+
|  22|  NULL|40000|
|NULL|  NULL|40000|
|  23| 20000| NULL|
+----+------+-----+



## Question 2:Execute Pyspark -sparksql joins & Applying Functions in a Pandas DataFrame

In [87]:
# data for the first DataFrame
data1 = [("Rohan", 23, "Male"),
         ("kirti", 26, "Female"),
         ("Rohit", 22, "Male")]

#data for the second DataFrame
data2 = [("Rohan", "Engineering"),
         ("kirti", "Marketing"),
         ("Rohit", "Finance")]

schema1 = ["Name", "Age", "Gender"]
schema2 = ["Name", "Department"]

df1 = spark.createDataFrame(data1, schema=schema1)
df2 = spark.createDataFrame(data2, schema=schema2)

## Creating View

In [88]:
# DataFrames as SQL tables
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")

## Inner join

In [89]:
spark.sql('SELECT * FROM table1 t1 INNER JOIN table2 t2 ON t1.Name=t2.Name').show()

+-----+---+------+-----+-----------+
| Name|Age|Gender| Name| Department|
+-----+---+------+-----+-----------+
|Rohan| 23|  Male|Rohan|Engineering|
|Rohit| 22|  Male|Rohit|    Finance|
|kirti| 26|Female|kirti|  Marketing|
+-----+---+------+-----+-----------+



## Left join

In [90]:
spark.sql('SELECT * FROM table1 t1 LEFT JOIN table2 t2 ON t1.Name=t2.Name').show()

+-----+---+------+-----+-----------+
| Name|Age|Gender| Name| Department|
+-----+---+------+-----+-----------+
|Rohan| 23|  Male|Rohan|Engineering|
|kirti| 26|Female|kirti|  Marketing|
|Rohit| 22|  Male|Rohit|    Finance|
+-----+---+------+-----+-----------+



## Right Join

In [91]:
spark.sql('SELECT * FROM table1 t1 Right JOIN table2 t2 ON t1.Name=t2.Name').show()

+-----+---+------+-----+-----------+
| Name|Age|Gender| Name| Department|
+-----+---+------+-----+-----------+
|Rohan| 23|  Male|Rohan|Engineering|
|kirti| 26|Female|kirti|  Marketing|
|Rohit| 22|  Male|Rohit|    Finance|
+-----+---+------+-----+-----------+



## Cross join

In [92]:
spark.sql('SELECT * FROM table1 t1 CROSS JOIN table2 t2 ON t1.Name=t2.Name').show()

+-----+---+------+-----+-----------+
| Name|Age|Gender| Name| Department|
+-----+---+------+-----+-----------+
|Rohan| 23|  Male|Rohan|Engineering|
|Rohit| 22|  Male|Rohit|    Finance|
|kirti| 26|Female|kirti|  Marketing|
+-----+---+------+-----+-----------+



## Function in Pandas DF

In [93]:
import pandas as pd

# Create a Pandas DataFrame
df = pd.read_csv('D:\Hexaware\Data_Engineering\Python\output_file.csv')
df

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"Johnston, Miss. Catherine Helen ""Carrie""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


## Function

In [97]:
def alive(row):
    if row["Survived"] == 0:
        return "Not alive"
    else:
        return "alive"

# Apply the custom function to a new column
df["Alive"] = df.apply(alive, axis=1)

# Show the modified DataFrame
df.head(7)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Alive
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,Not alive
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,alive
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,alive
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,alive
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,Not alive
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q,Not alive
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S,Not alive
