In [1]:
# Install Java & Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# Initialize SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PySparkOnColab").getOrCreate()
spark


In [3]:
from pyspark.sql import SparkSession

# Start SparkSession
spark = SparkSession.builder.appName("ProductData").getOrCreate()

# Data
data = [
    ("Laptop", 60000, "India"),
    ("Smartphone", 25000, "USA"),
    ("Headphones", 2000, "Germany"),
    ("Refrigerator", 30000, "India"),
    ("Microwave", 7000, "Japan"),
    ("Air Conditioner", 35000, "China"),
    ("Washing Machine", 25000, "USA"),
    ("Smartwatch", 5000, "South Korea"),
    ("Television", 40000, "Germany"),
    ("Tablet", 18000, "Canada"),
    ("Printer", 6000, "India"),
    ("Camera", 15000, "France")
]

# Column names
columns = ["Product", "Amount", "Country"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

# Show DataFrame
df.show()


+---------------+------+-----------+
|        Product|Amount|    Country|
+---------------+------+-----------+
|         Laptop| 60000|      India|
|     Smartphone| 25000|        USA|
|     Headphones|  2000|    Germany|
|   Refrigerator| 30000|      India|
|      Microwave|  7000|      Japan|
|Air Conditioner| 35000|      China|
|Washing Machine| 25000|        USA|
|     Smartwatch|  5000|South Korea|
|     Television| 40000|    Germany|
|         Tablet| 18000|     Canada|
|        Printer|  6000|      India|
|         Camera| 15000|     France|
+---------------+------+-----------+



In [5]:
df.groupBy("Product").pivot("Country").sum("Amount").show()

+---------------+------+-----+------+-------+-----+-----+-----------+-----+
|        Product|Canada|China|France|Germany|India|Japan|South Korea|  USA|
+---------------+------+-----+------+-------+-----+-----+-----------+-----+
|   Refrigerator|  null| null|  null|   null|30000| null|       null| null|
|     Smartwatch|  null| null|  null|   null| null| null|       5000| null|
|         Laptop|  null| null|  null|   null|60000| null|       null| null|
|      Microwave|  null| null|  null|   null| null| 7000|       null| null|
|Air Conditioner|  null|35000|  null|   null| null| null|       null| null|
|         Camera|  null| null| 15000|   null| null| null|       null| null|
|     Television|  null| null|  null|  40000| null| null|       null| null|
|Washing Machine|  null| null|  null|   null| null| null|       null|25000|
|         Tablet| 18000| null|  null|   null| null| null|       null| null|
|        Printer|  null| null|  null|   null| 6000| null|       null| null|
|     Smartp

In [7]:
df1=df.groupBy("Product").pivot("Country").sum("Amount")
df1.show()

+---------------+------+-----+------+-------+-----+-----+-----------+-----+
|        Product|Canada|China|France|Germany|India|Japan|South Korea|  USA|
+---------------+------+-----+------+-------+-----+-----+-----------+-----+
|   Refrigerator|  null| null|  null|   null|30000| null|       null| null|
|     Smartwatch|  null| null|  null|   null| null| null|       5000| null|
|         Laptop|  null| null|  null|   null|60000| null|       null| null|
|      Microwave|  null| null|  null|   null| null| 7000|       null| null|
|Air Conditioner|  null|35000|  null|   null| null| null|       null| null|
|         Camera|  null| null| 15000|   null| null| null|       null| null|
|     Television|  null| null|  null|  40000| null| null|       null| null|
|Washing Machine|  null| null|  null|   null| null| null|       null|25000|
|         Tablet| 18000| null|  null|   null| null| null|       null| null|
|        Printer|  null| null|  null|   null| 6000| null|       null| null|
|     Smartp

In [9]:
from pyspark.sql.functions import expr
df1.select("Product", expr("Stack(3, 'India', India, 'USA', USA) AS (Country, Amount)")).show()

+---------------+-------+------+
|        Product|Country|Amount|
+---------------+-------+------+
|   Refrigerator|  India| 30000|
|   Refrigerator|    USA|  null|
|   Refrigerator|   null|  null|
|     Smartwatch|  India|  null|
|     Smartwatch|    USA|  null|
|     Smartwatch|   null|  null|
|         Laptop|  India| 60000|
|         Laptop|    USA|  null|
|         Laptop|   null|  null|
|      Microwave|  India|  null|
|      Microwave|    USA|  null|
|      Microwave|   null|  null|
|Air Conditioner|  India|  null|
|Air Conditioner|    USA|  null|
|Air Conditioner|   null|  null|
|         Camera|  India|  null|
|         Camera|    USA|  null|
|         Camera|   null|  null|
|     Television|  India|  null|
|     Television|    USA|  null|
+---------------+-------+------+
only showing top 20 rows



USER DEFINED FUNCTIONS (UDF) IS THE MOST USEFUL FEATURES OF SPARK SQL AND DATAFRAME THAT IS USED TO EXTEND THE PYSPARK BUILD IN CAPABILITIES

UDF ARE USED TO EXPAND THE FUNCTIONS OF THE FRAMEWORK AND RE USE IT MULTIPLE
😊

In [18]:
data = [("Finance ", 10),
("Marketing", 20),
("Sales", 30),
("IT", 40)
]

columns = ["dept_name", "dept_id"]
df = spark.createDataFrame(data, columns)
df.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
| Finance |     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



In [21]:
from pyspark.sql.functions import udf

from pyspark.sql.types import LongType
def addone(a):
  return a+1
addone_df=udf(addone,LongType())


In [23]:
df.select('dept_name','dept_id',addone_df('dept_id').alias("addone_dept_id")).show()

+---------+-------+--------------+
|dept_name|dept_id|addone_dept_id|
+---------+-------+--------------+
| Finance |     10|            11|
|Marketing|     20|            21|
|    Sales|     30|            31|
|       IT|     40|            41|
+---------+-------+--------------+



In [24]:
data = [(1,'manish',1000),(2,'ravi',10000),(3,'Sumney',3000)]

In [25]:
columns=['id','name','salary']

In [26]:
df = spark.createDataFrame(data,columns)
df.show()

+---+------+------+
| id|  name|salary|
+---+------+------+
|  1|manish|  1000|
|  2|  ravi| 10000|
|  3|Sumney|  3000|
+---+------+------+



In [32]:
df = spark.sql("SELECT * FROM Employee")
df.show()
df = spark.sql("SELECT max(salary) FROM Employee")
df.show()


+---+------+------+
| id|  name|salary|
+---+------+------+
|  1|manish|  1000|
|  2|  ravi| 10000|
|  3|Sumney|  3000|
+---+------+------+

+-----------+
|max(salary)|
+-----------+
|      10000|
+-----------+

