# Custom schema in pyspark
# Importing files
# various analytics functionalities ( selectiong columns, filtering on the basis of conditions,aggregation functions)
# Sql Querries
# Joins 
# Missing data handling 
# optimization techniques

In [4]:
# !pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_practice").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/24 11:13:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/24 11:13:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
# Custom schema in pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data = [
    (1,"john","delhi"),
    (2,"stark","new yark"),
    (3,"ronita","paris")
]

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("emp_name", StringType(), True),
        StructField("city", StringType(), True),
    ]
)





In [6]:
custom_df = spark.createDataFrame(data,schema=schema)

custom_df.show()

                                                                                

+---+--------+--------+
| id|emp_name|    city|
+---+--------+--------+
|  1|    john|   delhi|
|  2|   stark|new yark|
|  3|  ronita|   paris|
+---+--------+--------+



# Importing files data

In [10]:
file_path = r"../data/department_pipe_delimeted.csv"
import_df = spark.read.csv(file_path,inferSchema=True,header=True,sep="|")
import_df.show()

+-------+---------+------+
|dept_id|dept_name|emp_id|
+-------+---------+------+
|      1|       HR|     1|
|      2|  Finance|     2|
|      3|       IT|     3|
|      4|Marketing|     4|
|      5|    Sales|     5|
|      6|       HR|     6|
|      7|  Finance|     7|
|      8|       IT|     8|
|      9|Marketing|     9|
|     10|    Sales|    10|
|     11|       HR|    11|
|     12|  Finance|    12|
|     13|       IT|    13|
|     14|Marketing|    14|
|     15|    Sales|    15|
|     16|       HR|    16|
|     17|  Finance|    17|
|     18|       IT|    18|
|     19|Marketing|    19|
|     20|    Sales|    20|
+-------+---------+------+



In [14]:
file_path = r"../data/department_pipe_delimeted.csv"
custom_import_df = spark.read.format("csv").option("header","true").option("inferSchema","true").option("delimiter","|").load(file_path)
custom_import_df.show()

+-------+---------+------+
|dept_id|dept_name|emp_id|
+-------+---------+------+
|      1|       HR|     1|
|      2|  Finance|     2|
|      3|       IT|     3|
|      4|Marketing|     4|
|      5|    Sales|     5|
|      6|       HR|     6|
|      7|  Finance|     7|
|      8|       IT|     8|
|      9|Marketing|     9|
|     10|    Sales|    10|
|     11|       HR|    11|
|     12|  Finance|    12|
|     13|       IT|    13|
|     14|Marketing|    14|
|     15|    Sales|    15|
|     16|       HR|    16|
|     17|  Finance|    17|
|     18|       IT|    18|
|     19|Marketing|    19|
|     20|    Sales|    20|
+-------+---------+------+



# Analytics with Pyspak

In [20]:
# select the particular columns
file_path = r"../data/department_pipe_delimeted.csv"

analytics_df = spark.read.csv(file_path,inferSchema=True,header=True,sep="|")

analytics_df.select("dept_name","emp_id").show()


# analytics_df.show()

+---------+------+
|dept_name|emp_id|
+---------+------+
|       HR|     1|
|  Finance|     2|
|       IT|     3|
|Marketing|     4|
|    Sales|     5|
|       HR|     6|
|  Finance|     7|
|       IT|     8|
|Marketing|     9|
|    Sales|    10|
|       HR|    11|
|  Finance|    12|
|       IT|    13|
|Marketing|    14|
|    Sales|    15|
|       HR|    16|
|  Finance|    17|
|       IT|    18|
|Marketing|    19|
|    Sales|    20|
+---------+------+



In [22]:
# filtering data on some  conditions
file_path = r"../data/employee_pipe_delimeted.csv"
emp_df = spark.read.csv(file_path,header=True,inferSchema=True,sep="|")

emp_df.filter("salary >6500").show()

+------+---------------+------+
|emp_id|       emp_name|salary|
+------+---------------+------+
|     2|     Jane Smith|  7000|
|     3|Michael Johnson|  8000|
|     5|  William Brown|  9000|
|     7|  Richard Moore|  7500|
|     8| Jessica Taylor|  8500|
|    10|Patricia Thomas|  7200|
|    12|    Susan White|  8100|
|    13|  Daniel Harris|  6600|
|    14|   Sarah Martin|  7700|
|    15|  Paul Thompson|  8800|
|    16|   Karen Garcia|  6900|
|    19|   George Clark|  7000|
+------+---------------+------+



In [25]:
# Union methods
file_path =r"../data/department_pipe_delimeted.csv"
dept_df = spark.read.csv(file_path, inferSchema=True, header=True, sep="|")

combined_df = emp_df.join(dept_df,emp_df.emp_id ==dept_df.emp_id)

combined_df.show()


+------+-------------------+------+-------+---------+------+
|emp_id|           emp_name|salary|dept_id|dept_name|emp_id|
+------+-------------------+------+-------+---------+------+
|     1|           John Doe|  5000|      1|       HR|     1|
|     2|         Jane Smith|  7000|      2|  Finance|     2|
|     3|    Michael Johnson|  8000|      3|       IT|     3|
|     4|        Emily Davis|  6000|      4|Marketing|     4|
|     5|      William Brown|  9000|      5|    Sales|     5|
|     6|       Linda Wilson|  5500|      6|       HR|     6|
|     7|      Richard Moore|  7500|      7|  Finance|     7|
|     8|     Jessica Taylor|  8500|      8|       IT|     8|
|     9|   Charles Anderson|  6500|      9|Marketing|     9|
|    10|    Patricia Thomas|  7200|     10|    Sales|    10|
|    11|Christopher Jackson|  5400|     11|       HR|    11|
|    12|        Susan White|  8100|     12|  Finance|    12|
|    13|      Daniel Harris|  6600|     13|       IT|    13|
|    14|       Sarah Mar

In [28]:
combined_df.agg({"salary":"avg"}).show()

+-----------+
|avg(salary)|
+-----------+
|     6950.0|
+-----------+



In [29]:
combined_df.groupBy("dept_name").agg({"salary":"max"}).show()

+---------+-----------+
|dept_name|max(salary)|
+---------+-----------+
|    Sales|       9000|
|       HR|       6900|
|  Finance|       8100|
|Marketing|       7700|
|       IT|       8500|
+---------+-----------+



In [30]:
combined_df.groupBy("dept_name").agg({"salary":"sum"}).show()


+---------+-----------+
|dept_name|sum(salary)|
+---------+-----------+
|    Sales|      31300|
|       HR|      22800|
|  Finance|      28400|
|Marketing|      27200|
|       IT|      29300|
+---------+-----------+



# SQL queries with pyspark

In [31]:
# SQL Query

combined_df.createOrReplaceTempView("combined_table")



In [35]:
query = "select * from combined_table"

query2 = "select * from combined_table where salary >8000"


spark.sql(query2).show()

+------+--------------+------+-------+---------+------+
|emp_id|      emp_name|salary|dept_id|dept_name|emp_id|
+------+--------------+------+-------+---------+------+
|     5| William Brown|  9000|      5|    Sales|     5|
|     8|Jessica Taylor|  8500|      8|       IT|     8|
|    12|   Susan White|  8100|     12|  Finance|    12|
|    15| Paul Thompson|  8800|     15|    Sales|    15|
+------+--------------+------+-------+---------+------+



# Missing data handling


In [7]:
from pyspark.sql.functions import col, when, mean
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data = [
    (1, "Alice", 25),
    (2, "Bob", None),
    (3, None, 28),
    (4, "David", 32),
    (5, "Emily", None)
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

temp_df = spark.createDataFrame(data, schema) 
temp_df.show()

+---+-----+----+
| id| name| age|
+---+-----+----+
|  1|Alice|  25|
|  2|  Bob|null|
|  3| null|  28|
|  4|David|  32|
|  5|Emily|null|
+---+-----+----+



In [42]:
# find out the missing data
from pyspark.sql.functions import col,isnan,when,count

column= 'age'
missing_count = temp_df.filter(col(column).isNull() | isnan(col(column))).count()
print(f"Missing values in column '{column}': {missing_count}")


Missing values in column 'age': 2


In [44]:
for column in temp_df.columns:
    missing_count = temp_df.filter(col(column).isNull() | isnan(col(column))).count()
    print(f"Missing values in column '{column}': {missing_count}")


Missing values in column 'id': 0
Missing values in column 'name': 1
Missing values in column 'age': 2


In [46]:
temp_df.na.drop().show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 25|
|  4|David| 32|
+---+-----+---+



In [48]:
temp_df.select("name").dropna().show()

+-----+
| name|
+-----+
|Alice|
|  Bob|
|David|
|Emily|
+-----+



In [52]:
temp_df.dropna(subset=["name"]).show()


+---+-----+----+
| id| name| age|
+---+-----+----+
|  1|Alice|  25|
|  2|  Bob|null|
|  4|David|  32|
|  5|Emily|null|
+---+-----+----+



## fill the missing data with values

In [12]:
df_replaced = temp_df.withColumn("age", when(col("age").isNull(), 0).otherwise(col("age")))
df_replaced.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 25|
|  2|  Bob|  0|
|  3| null| 28|
|  4|David| 32|
|  5|Emily|  0|
+---+-----+---+



In [24]:
from pyspark.sql.functions import round

mean_age = temp_df.select(round(mean(col("age"))).alias("mean_age")).collect()[0][0]
print(mean_age)
df_replaced = temp_df.withColumn("age", when(col("age").isNull(), mean_age).otherwise(col("age")))
df_replaced.show()


28.0
+---+-----+----+
| id| name| age|
+---+-----+----+
|  1|Alice|25.0|
|  2|  Bob|28.0|
|  3| null|28.0|
|  4|David|32.0|
|  5|Emily|28.0|
+---+-----+----+

