## For resolving python version mismatch

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

### Importing and initiation the spark session

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/25 04:47:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Creating custom data frames

In [3]:
test_df = [("James","Sales","NY",90000,34,10000),
    ("Michael","Sales","NY",86000,56,20000),
    ("Robert","Sales","CA",81000,30,23000),
    ("Maria","Finance","CA",90000,24,23000),
    ("Raman","Finance","CA",99000,40,24000),
    ("Scott","Finance","NY",83000,36,19000),
    ("Jen","Finance","NY",79000,53,15000),
    ("Jeff","Marketing","CA",80000,25,18000),
    ("Kumar","Marketing","NY",91000,50,21000)
  ]

### Creating custom schema

In [4]:
udf_scehma = ["employee_name","department","state","salary","age","bonus"]

In [5]:
df = spark.createDataFrame(data=test_df,schema = udf_scehma)

In [6]:
df.show()

                                                                                

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



### Applying groupBy to tht data frames

In [14]:
df.groupBy('department').agg(sum("salary"),avg("bonus")).show()

[Stage 5:>                                                          (0 + 2) / 2]

+----------+-----------+------------------+
|department|sum(salary)|        avg(bonus)|
+----------+-----------+------------------+
|     Sales|     257000|17666.666666666668|
|   Finance|     351000|           20250.0|
| Marketing|     171000|           19500.0|
+----------+-----------+------------------+



                                                                                

### Creating a temp View to write native SQL commands

In [15]:
df.createOrReplaceTempView("ida")

In [17]:
spark.sql("select * from ida").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



### Applying Group by with aggregation functions

In [18]:
temp = df.groupBy("department","state").agg(sum("salary").alias("sum_salary"),avg("salary").alias("avg_salary")).where(col("avg_salary")>80000)

In [19]:
temp.show()

+----------+-----+----------+----------+
|department|state|sum_salary|avg_salary|
+----------+-----+----------+----------+
|     Sales|   CA|     81000|   81000.0|
|   Finance|   CA|    189000|   94500.0|
|     Sales|   NY|    176000|   88000.0|
|   Finance|   NY|    162000|   81000.0|
| Marketing|   NY|     91000|   91000.0|
+----------+-----+----------+----------+



### Creating a new  Managed database

In [20]:
spark.sql("CREATE DATABASE idashell")

DataFrame[]

In [21]:
spark.sql("USE idashell")

DataFrame[]

### Saving the DataFrames into table

In [22]:
df.write.saveAsTable("test_tbl")

                                                                                

In [23]:
test = spark.sql("DESCRIBE EXTENDED test_tbl")
test.show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       employee_name|              string|   null|
|          department|              string|   null|
|               state|              string|   null|
|              salary|              bigint|   null|
|                 age|              bigint|   null|
|               bonus|              bigint|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|            idashell|       |
|               Table|            test_tbl|       |
|        Created Time|Mon Sep 25 05:23:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

In [24]:
spark.sql("select * from test_tbl").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|        James|     Sales|   NY| 90000| 34|10000|
|      Michael|     Sales|   NY| 86000| 56|20000|
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|        Scott|   Finance|   NY| 83000| 36|19000|
|          Jen|   Finance|   NY| 79000| 53|15000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
|        Kumar| Marketing|   NY| 91000| 50|21000|
+-------------+----------+-----+------+---+-----+



In [25]:
spark.sql("SELECT * FROM test_tbl WHERE state ='CA'").show()

+-------------+----------+-----+------+---+-----+
|employee_name|department|state|salary|age|bonus|
+-------------+----------+-----+------+---+-----+
|       Robert|     Sales|   CA| 81000| 30|23000|
|        Maria|   Finance|   CA| 90000| 24|23000|
|        Raman|   Finance|   CA| 99000| 40|24000|
|         Jeff| Marketing|   CA| 80000| 25|18000|
+-------------+----------+-----+------+---+-----+



In [28]:
spark.sql("drop table test_tbl")

DataFrame[]

In [29]:
spark.sql("drop database idashell")

DataFrame[]

In [30]:
spark.sql("CREATE DATABASE spark")
spark.sql("USE DATABASE spark")

DataFrame[]

In [31]:
df.write.saveAsTable("employees")



In [32]:
spark.sql("SELECT * FROM employees")

DataFrame[employee_name: string, department: string, state: string, salary: bigint, age: bigint, bonus: bigint]

In [33]:
df.rdd.getNumPartitions()

2

## Saving the data frame in the local machine

In [34]:
df.write.csv('/home/labuser/test/')

### Saving the data based on a single Column

In [36]:
df.write.partitionBy("department").csv("/home/labuser/test_01/")

                                                                                

### Saving the data based on multiple columns

In [37]:
df.write.partitionBy("state","department").csv("/home/labuser/test_02/")

                                                                                

### For Each

In [38]:
def f(df):
    print(df.state)

In [39]:
df.foreach(f)

CAtage 22:>                                                         (0 + 2) / 2]
NY
NY
CA
NY
NY
NY
CA
CA
                                                                                

In [40]:
json_data = [
    '{"name": "Alice", "age": 25}',
    '{"name": "Bob", "age": 30}',
    '{"name": "Charlie", "age": 35}'
]

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
