<a href="https://colab.research.google.com/github/vignesh0361/Pyspark/blob/main/0_D_Dataframes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**DataFrame **is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

**Creating a basic DF with some data**





In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [(14, "Tom"), (23, "Alice"), (16, "Bob")],
     ["age", "name"]
    )

In [None]:
df.printSchema()  # Shows the cols, datatype and nullable or not

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [None]:
df.show()  # Shows the first 20 data in the DF

+---+-----+
|age| name|
+---+-----+
| 14|  Tom|
| 23|Alice|
| 16|  Bob|
+---+-----+



**Selecting from dataframe**

In [None]:
df.select(df.name, df.age).show()

+-----+---+
| name|age|
+-----+---+
|  Tom| 14|
|Alice| 23|
|  Bob| 16|
+-----+---+



**Filtering from the DF**

In [None]:
df.filter((df.age>15) & (df.name.contains("o"))).show()

+---+----+
|age|name|
+---+----+
| 16| Bob|
+---+----+



**Describing the DF**





In [None]:
df.describe().show()

+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 3|    3|
|   mean|17.666666666666668| NULL|
| stddev| 4.725815626252608| NULL|
|    min|                14|Alice|
|    max|                23|  Tom|
+-------+------------------+-----+



**Using withColumn to create a new column**


In [None]:
df.withColumn("age",df.age+1).show()

+---+-----+
|age| name|
+---+-----+
| 15|  Tom|
| 24|Alice|
| 17|  Bob|
+---+-----+



**Using withColumn to create a new column**


In [None]:
from pyspark.sql.functions import when
df.withColumn("type", when(df.age <= 15, "kid").otherwise("adult")).show()

+---+-----+-----+
|age| name| type|
+---+-----+-----+
| 14|  Tom|  kid|
| 23|Alice|adult|
| 16|  Bob|adult|
+---+-----+-----+



In [None]:
people = spark.createDataFrame([
    {"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
    {"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
    {"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
    {"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

In [None]:
display(people)

DataFrame[age: bigint, deptId: bigint, gender: string, name: string, salary: bigint]

In [None]:
people.show()

+---+------+------+-------------+------+
|age|deptId|gender|         name|salary|
+---+------+------+-------------+------+
| 40|     1|     M| Hyukjin Kwon|    50|
| 50|     1|     M|Takuya Ueshin|   100|
| 60|     2|     F| Xinrong Meng|   150|
| 20|     3|     M|  Haejoon Lee|   200|
+---+------+------+-------------+------+



https://sparkbyexamples.com/spark/spark-dataframe-withcolumn/

In [None]:
df_cal = spark.read.csv("/content/sample_data/california_housing_train.csv")
df_cal.show()
df_cal.printSchema()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|    

In [None]:
from re import S
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructField, StringType, FloatType, IntegerType, StructType, DateType, DecimalType

df_cal_schema = StructType([
    StructField("longitude", FloatType(), True),
    StructField("latitude", FloatType(), True),
    StructField("housing_median_age", DecimalType(), True),
    StructField("total_rooms", DecimalType(), True),
    StructField("total_bedrooms", DecimalType(), True),
    StructField("population", DecimalType(), True),
    StructField("households", DecimalType(), True),
    StructField("median_income", FloatType(), True),
    StructField("median_house_value", DecimalType(), True)
])

df_cal = spark.read.csv("/content/sample_data/california_housing_train.csv", schema=df_cal_schema)
display(df_cal)

DataFrame[longitude: float, latitude: float, housing_median_age: decimal(10,0), total_rooms: decimal(10,0), total_bedrooms: decimal(10,0), population: decimal(10,0), households: decimal(10,0), median_income: float, median_house_value: decimal(10,0)]

In [None]:
df_cal.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|     NULL|    NULL|              NULL|       NULL|          NULL|      NULL|      NULL|         NULL|              NULL|
|  -114.31|   34.19|                15|       5612|          1283|      1015|       472|       1.4936|             66900|
|  -114.47|    34.4|                19|       7650|          1901|      1129|       463|         1.82|             80100|
|  -114.56|   33.69|                17|        720|           174|       333|       117|       1.6509|             85700|
|  -114.57|   33.64|                14|       1501|           337|       515|       226|       3.1917|             73400|
|  -114.57|   33.57|    


1.   Using withColumn to perform 3 transformations
2.   withCoulumnrenamed to rename a column
3.   drop to drop a column
4.   Using when otherwise
5.   filtering data
6.   Casting datatype





In [None]:
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import col, lit, when

df_trans_val = df_cal\
.withColumn("loaded_dt", lit("2025-03-01"))\
.withColumn("population_in_K", col("population")/1000)\
.withColumn("hosuing_age", when(col("housing_median_age") <= 15 , lit("newer")).otherwise(lit("older")))\
.withColumnRenamed("longitude", "long")\
.drop("latitude")\
.filter(col("total_bedrooms") >= 4)\
.withColumn("population", col("population").cast("decimal(15,0)"))

In [None]:
df_trans_val.printSchema()

root
 |-- long: float (nullable = true)
 |-- housing_median_age: decimal(10,0) (nullable = true)
 |-- total_rooms: decimal(10,0) (nullable = true)
 |-- total_bedrooms: decimal(10,0) (nullable = true)
 |-- population: decimal(15,0) (nullable = true)
 |-- households: decimal(10,0) (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: decimal(10,0) (nullable = true)
 |-- loaded_dt: string (nullable = false)
 |-- population_in_K: decimal(16,6) (nullable = true)
 |-- hosuing_age: string (nullable = false)





1.   Selecting cols from DF
2.   Column Alias


In [None]:
df_trans_val.select(df_trans_val.housing_median_age, df_trans_val.population_in_K.alias("pop_in_k")).show()

+------------------+--------+
|housing_median_age|pop_in_k|
+------------------+--------+
|                15|1.015000|
|                19|1.129000|
|                17|0.333000|
|                14|0.515000|
|                20|0.624000|
|                29|0.671000|
|                25|1.841000|
|                41|0.375000|
|                34|3.134000|
|                46|0.787000|
|                16|2.434000|
|                21|1.182000|
|                48|0.580000|
|                31|1.346000|
|                15|0.949000|
|                17|1.005000|
|                28|0.666000|
|                21|0.064000|
|                17|0.775000|
|                17|0.029000|
+------------------+--------+
only showing top 20 rows



**Row number in a DF**
partition by housing_age order by housing_median_age

https://sparkbyexamples.com/spark/spark-dataframe-how-to-select-the-first-row-of-each-group/

In [None]:
from pyspark.sql.window import Window # Import the Window class
from pyspark.sql.functions import row_number, rank, desc

windowSpec = Window.partitionBy("hosuing_age").orderBy("housing_median_age")
windowSpec2 = Window.partitionBy("hosuing_age").orderBy(desc("median_house_value"))

df_fnl_val = df_trans_val.withColumn("row_num", row_number().over(windowSpec))\
.withColumn("row_num", row_number().over(windowSpec))\
.withColumn("rank", rank().over(windowSpec2))\
.filter(col("row_num") <= 10)


In [None]:
df_fnl_val.count()

20

SORTING of DF

In [None]:
df_cal.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|     NULL|    NULL|              NULL|       NULL|          NULL|      NULL|      NULL|         NULL|              NULL|
|  -114.31|   34.19|                15|       5612|          1283|      1015|       472|       1.4936|             66900|
|  -114.47|    34.4|                19|       7650|          1901|      1129|       463|         1.82|             80100|
|  -114.56|   33.69|                17|        720|           174|       333|       117|       1.6509|             85700|
|  -114.57|   33.64|                14|       1501|           337|       515|       226|       3.1917|             73400|
|  -114.57|   33.57|    

In [None]:
from pyspark.sql.functions import asc, desc

df_cal.sort(asc("total_bedrooms"), desc("total_rooms")).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|     NULL|    NULL|              NULL|       NULL|          NULL|      NULL|      NULL|         NULL|              NULL|
|   -122.5|   37.79|                52|          8|             1|        13|         1|      15.0001|            500001|
|  -117.79|   35.21|                 4|          2|             2|         6|         2|        2.375|            137500|
|  -117.27|   34.17|                16|         30|             3|        49|         8|        4.625|            250000|
|  -119.23|   34.25|                28|         26|             3|        29|         9|          8.0|            275000|
|  -117.76|   35.22|    

In [None]:
from pyspark.sql.functions import col as coll, asc, desc

df_cal.sort(col("total_bedrooms").asc(), coll("total_rooms").desc()).show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|     NULL|    NULL|              NULL|       NULL|          NULL|      NULL|      NULL|         NULL|              NULL|
|   -122.5|   37.79|                52|          8|             1|        13|         1|      15.0001|            500001|
|  -117.79|   35.21|                 4|          2|             2|         6|         2|        2.375|            137500|
|  -117.27|   34.17|                16|         30|             3|        49|         8|        4.625|            250000|
|  -119.23|   34.25|                28|         26|             3|        29|         9|          8.0|            275000|
|  -117.76|   35.22|    

**Union of DF**

In [None]:


df_cal_1 = df_cal
df_cal_2 = df_cal
df_cal_3 = df_cal
df_cal_1_2_3 = df_cal_1.union(df_cal_2).union(df_cal_3)
print(df_cal_1_2_3.count())

df_cal_1_2_3_dst = df_cal_1.union(df_cal_2).union(df_cal_3).distinct()
print(df_cal_1_2_3_dst.count())

51003
17001


**Getting common elements between 2 DF**

In [None]:
intersected_df = df_cal_1.intersect(df_cal_2)
intersected_df.count()

17001

**One DF minus the OTHER**

In [None]:
intersected_df = df_cal_1.subtract(df_cal_2)
intersected_df.count()

0

**Getting distinct values only**

In [None]:
df_cal_1_dst = df_cal_1.distinct()
df_cal_1_dst.count()

17001

In [None]:
from pyspark.sql.functions import col

df_new = df_cal_1_dst.subtract\
(df_cal_1_dst.sort(col("housing_median_age").asc()).filter(col("population").isNull()))
df_new.count()

17000

**Dropping columns with null**

In [None]:
from pyspark.sql.functions import col

df_new.na.drop("any").count()  # removes the whole row even if any one column is null
df_new.na.drop("all").count()  # removes the whole row if all one column is null

df_new.na.drop(how="any", subset=["housing_median_age", "total_rooms"]).count()  # removes the whole row even if any one column is null in specified subset # removes the whole row even if any one column is null

17000