# `What is Dataframe` :

  - A DataFrame is a distributed collection of data organized into columns and rows(similar to a table). 

###  `Properties of DataFrame`:
  1. **Distributed:** Data is distributed across multiple nodes in a cluster.
  2. **Immutable:** Once created, it cannot be changed. Transformations produce new DataFrames.
  3. **Lazy Evaluation:** Transformations on DataFrames are not computed immediately. Spark computes them only when an action requires a result to be returned to the driver program.
  4. **Schema:** Each DataFrame has a schema, representing the structure of the data, including column names and types.
  5. **Supports various data formats:** Can read and write data in various formats like CSV, JSON, Parquet, Avro, etc.


# Create DF from Reading Multiple File Formats

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# define the custom schema using structtype and structfield
books_shema = StructType([
    StructField("book_id",StringType()),
    StructField("title",StringType()),
    StructField("author",StringType()),
    StructField("category",StringType()),
    StructField("price", IntegerType())
])

# df_csv = spark.read.csv("dbfs:/mnt/adls_container/books-data.csv", header=True, sep=";", inferSchema=True)
df_csv = spark.read.csv("dbfs:/mnt/adls_container/books-data.csv", header=True, sep=";",schema=books_shema)

display(df_csv)
df_csv.printSchema()

In [0]:
df_json = spark.read.json("dbfs:/mnt/adls_container/customers-data.json")

display(df_json.limit(10))
df_json.printSchema()

In [0]:
df_parquet = spark.read.parquet("dbfs:/FileStore/parquet/")

display(df_parquet.limit(10))
df_parquet.printSchema()
df_parquet.count()

In [0]:
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = "jdbc:sqlserver://azure-sql-server-1111.database.windows.net:1433;database=azure-sql-db-111"
username = "myuser"
password = "mypass@123"  
table = "[SalesLT].[Customer]"


customer_df = (spark.read
                   .format("jdbc")
                   .option("driver", driver)
                   .option("url", url)
                   .option("dbtable", table)
                   .option("user", username)
                   .option("password", password)
                   .load())

display(customer_df.limit(10))
customer_df.count()

In [0]:
emp_data = [
    (1, "John Doe", "Male", 60000.0, "USA"),
    (2, "Jane Smith", "Female", 55000.0, "Canada"),
    (3, "Alice Johnson", "Female", 65000.0, "UK"),
    (4, "Bob Williams", "Male", 62000.0, "Australia"),
    (5, "Eve Davis", "Female", 70000.0, "India"),
    (5, "Eve Davis", "Female", 70000.0, "India"),
    (6, "Charlie Brown", "Male", 58000.0, "Germany"),
    (7, "Diana Miller", "Female", 60000.0, "France"),
    (8, "Frank Johnson", "Male", 62000.0, "Spain"),
    (9, "Grace Wilson", "Female", 54000.0, "Italy"),
    (10, "Henry Davis", "Male", 68000.0, "Japan"),
    (9, "Grace Wilson", "Female", 54000.0, "Italy"),
    (10, "Henry Davis", "Male", 68000.0, "Japan"),
    (11, "Isabel Clark", "Female", 59000.0, "Brazil"),
    (12, "Jack Turner", "Male", 63000.0, "Mexico"),
    (13, "Katherine White", "Female", 67000.0, "South Africa"),
    (14, "Louis Harris", "Male", 56000.0, "Russia"),
    (15, "Mia Lee", "Female", 61000.0, "China"),
    (14, "Louis Harris", "Male", 56000.0, "Russia"),
    (15, "Mia Lee", "Female", 61000.0, "China")
]

emp_schema = StructType([
    StructField("empId", IntegerType(), True),
    StructField("empName", StringType(), True),
    StructField("empGender", StringType(), True),
    StructField("empSalary", FloatType(), True),
    StructField("empCountry", StringType(), True)
])

df_sample = spark.createDataFrame(data=emp_data, schema=emp_schema)

df_sample.show(5)

# Transformations on Dataframes

In [0]:
# add two more columns
    # origin ==> constant column ==> "India"
    # tax ==> derived column ==> "12%(salary)"

# withColumn() : to add columns

from pyspark.sql.functions import lit

df2 = df_sample.withColumn("origin", lit("india")) \
               .withColumn("tax", df_sample.empSalary * 0.12)

display(df2.limit(5))

In [0]:
# withColumnRenamed() : to rename the columns
    # two columns => country, tax

df3 = df2.withColumnRenamed("origin", "empOrigin") \
         .withColumnRenamed("tax", "empTax")

display(df3.limit(5))

In [0]:
from pyspark.sql.functions import col

df3.select("empId", df3.empName, col("empGender")).show(5)

In [0]:
df3.show(5)

In [0]:
# case conditions : when(), otherwise()
    # male ==> m
    # female ==> f
    # unknown ==> u


from pyspark.sql.functions import when

df4 = df3.select(
    "empId",
    "empName",
    when(df3.empGender == "Male", "m").when(df3.empGender == "Female", "f").otherwise("u").alias("empGender"),
    "empSalary",
    "empCountry", 
    "empOrigin", 
    "empTax"
    )

df4.show(5)

In [0]:
# orderBy() or sort() : to sort the data based on columns

df4.sort(df4.empSalary.desc()).show()

In [0]:
# dropDuplicates() : to remove the duplicates

df4.dropDuplicates().orderBy(df4.empSalary.desc()).show()

In [0]:
## where() or filter() : to filter the data

(df4
    .dropDuplicates()
    .filter((df4.empSalary > 55000) & (df4.empGender == 'f') & (df4.empName.like("%e")))
    .sort(df4.empSalary.desc())
    .show())

In [0]:
data=[(1, 'anil', 'M', 5000, 'IT'),\
      (2, 'sandeep', 'M',6000, 'IT'),\
      (3, 'riya', 'F',2500, 'payroll'),\
      (4, 'prteek', 'M',4000, 'HR'),\
      (5, 'vani', 'F',2000, 'HR'),\
      (6, 'sunil', 'M', 2000, 'payroll'),\
      (7, 'diksha', 'F',3000, 'IT'),
      (8, 'rajesh', 'M', 4500, 'Finance'),
      (9, 'neha', 'F', 3500, 'Finance'),
      (10, 'amit', 'M', 3000, 'HR'),
      (11, 'pooja', 'F', 5500, 'IT'),
      (12, 'rohit', 'M', 6000, 'IT')
      ]

# Define the schema for the data
schema = StructType([
    StructField("empId", IntegerType(), True),
    StructField("empName", StringType(), True),
    StructField("empGender", StringType(), True),
    StructField("empSalary", IntegerType(), True),
    StructField("empDepartment", StringType(), True)
])

df = spark.createDataFrame(data, schema)
df.show()
df.printSchema()

In [0]:
# aggregate functions : count, max, min, sum, avg

from pyspark.sql.functions import count, max, min, sum, avg

df.agg(count("*").alias("totalEmpCount")).show()
df.agg(max("empSalary").alias("maxSalary")).show()
df.agg(min("empSalary").alias("minSalary")).show()
df.agg(avg("empSalary").alias("avgSalary")).show()
df.agg(sum("empSalary").alias("sumSalary")).show()


In [0]:
df.agg(
    count("*").alias("totalEmpCount"),
    max("empSalary").alias("maxSalary"),
    min("empSalary").alias("minSalary"),
    avg("empSalary").alias("avgSalary"),
    sum("empSalary").alias("sumSalary")
).show()

In [0]:
# groupBy() : to group the data

df.groupBy("empDepartment").agg(
                                count("*").alias("totalEmpCount"),
                                max("empSalary").alias("maxSalary"),
                                min("empSalary").alias("minSalary"),
                                avg("empSalary").alias("avgSalary"),
                                sum("empSalary").alias("sumSalary")
                            ).show()

In [0]:
# union / unionAll : to merge the data vertically (both works same way, allows duplicates)

data1 = [(1, 'Anil',27), 
         (2, 'sandeep', 28), 
         (3, 'riya', 29)]  #jan
schema1 = ['id', 'name', 'age']

data2 = [(3, 'riya', 29), 
         (4, 'rani', 26)] #feb
schema2 = ['id', 'name', 'age']

data3 = [(5, 'liya', 29), 
         (6, 'mani', 26)] #march
schema3 = ['id', 'name', 'age']

df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
df3 = spark.createDataFrame(data3, schema3)

df1.show()
df2.show()
df3.show()

In [0]:
df_union = df1.union(df2).union(df3)

df_union.dropDuplicates().sort("id").show()

In [0]:
emp_data = [
    (1, "John", 1, 50000, 1),
    (2, "Alice", 2, 60000, 2),
    (3, "Bob", 3, 55000, 2),
    (2, "Alice", 2, 60000, 2),
    (3, "Bob", 3, 55000, 2),
    (4, "Jane", 4, 52000, 3),
    (5, "Eve", None, 48000, 4),
    (6, "Charlie", 4, 47000, None),
    (7, "David", 2, 55000, 3),
    (8, "Linda", 3, 53000, 1),
    (9, "Frank", None, 59000, 4),
    (7, "David", 2, 55000, 3),
    (8, "Linda", 3, 53000, None),
    (9, "Frank", 1, 59000, 4),
    (10, "Grace",1, 49000, None),
    (10, "Grace",1, 49000, 4)]
emp_schema = ["empId", "empName", "deptId", "empSalary", "cityId"]

dept_data = [
    (1, "HR"),
    (2, "IT"),
    (3, "Sales"),
    (4, "Finance"),
]
dept_schema = ["deptId", "deptName"]

address_data = [
    (1, "hyd"),
    (2, "blr"),
    (3, "chn"),
    (4, "kkt")
]
add_schema = ["cityId", "cityName"]

print("emp_df :")
emp_df = spark.createDataFrame(emp_data,emp_schema)
emp_df.show()

print("dept_df :")
dept_df = spark.createDataFrame(dept_data,dept_schema)
dept_df.show()

print("address_df :")
address_df = spark.createDataFrame(address_data,add_schema)
address_df.show()

In [0]:
## joins types : inner, left, right, full, cross, self

from pyspark.sql.functions import col, current_timestamp

df_inner = (emp_df.join(dept_df, emp_df.deptId == dept_df.deptId, "inner")
                  .join(address_df, emp_df.cityId == address_df.cityId, "inner")
                  .drop(emp_df.cityId, emp_df.deptId)
                  .dropDuplicates()
                  .sort("empId")
                  .filter(col("CityName") == "chn")
                  .withColumn("createdAt", current_timestamp())
)


df_inner.show(truncate=False)

In [0]:
df_full = (emp_df.join(dept_df, emp_df.deptId == dept_df.deptId, "full")
                  .join(address_df, emp_df.cityId == address_df.cityId, "full")
                  .drop(emp_df.cityId, emp_df.deptId)
                  .dropDuplicates(["empId"])
                  .sort("empId")
                  .filter(col("CityName") == "chn")
                  .withColumn("createdAt", current_timestamp())
)

df_full.show(truncate=False)

In [0]:
## ranking functions : row_number, rank, dense_rank

salary     row_number      rank(skip)      dense_rank(no_skipping)
100         1               1                 1
100         2               1                 1
200         3               3                 2
300         4               4                 3
300         5               4                 3
300         6               4                 3
400         7               7                 4
500         8               8                 5
500         9               8                 5
500         10              8                 5
500         11              8                 5
600         12              12                6

In [0]:
data = [
    ("John", 50000, "IT"),
    ("Alice", 60000, "HR"),
    ("Bob", 55000, "IT"),
    ("Jane", 52000, "Finance"),
    ("Eve", 48000, "HR"),
    ("lavanya", 55000, "IT"),
    ("ALEX", 60000, "HR"),
    ("Charlie", 47000, "Finance"),
    ("David", 55000, "IT"),
    ("Linda", 53000, "Finance"),
    ("Frank", 59000, "HR"),
    ("Grace", 49000, "IT"),
    ("Raghu", 53000, "Finance"),
]
schema = ["empName", "salary", "department"]

df = spark.createDataFrame(data, schema)
df.show()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

window_spec = Window.orderBy(col("salary").desc())

df1 = (df.withColumn("row_numColumn", row_number().over(window_spec))
         .withColumn("rankColumn", rank().over(window_spec))
         .withColumn("dense_rankColumn", dense_rank().over(window_spec))
)

display(df1)

In [0]:
# second highest salary 

df1.filter(df1.dense_rankColumn == 2).show()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

window_spec = Window.orderBy(col("salary").desc()).partitionBy("department")

df1 = (df.withColumn("row_numColumn", row_number().over(window_spec))
         .withColumn("rankColumn", rank().over(window_spec))
         .withColumn("dense_rankColumn", dense_rank().over(window_spec))
)

display(df1)

In [0]:
# second highest salary for each department

df1.filter(df1.dense_rankColumn == 2).show()

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

data = [
    (1, "Alice", 29, "New York"),
    (2, "Bob", None, "Los Angeles"),
    (3, None, 23, None),
    (4, "David", None, "San Francisco"),
    (5, "Eve", 35, None),
    (6, None, None, None)
]

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("city", StringType(), True)
])

df = spark.createDataFrame(data, schema)
display(df)

In [0]:
# drop the records with null values in any column

df.na.drop().show()

In [0]:
# drop the records with null values in specific columns

df.na.drop(subset=["name", "age"]).show()

In [0]:
# drops those records where a threshold number of columns are null

df.na.drop(thresh=3).show() # 3 non-null columns 

In [0]:
# filling nulls in all columns

df.na.fill("unknown").show()

In [0]:
# fill in specific columns 

df.na.fill("unknown", subset=["age", "city"]).show()

In [0]:
# filter rows with nulls or not nulls

df.filter(col("age").isNull()).show()

df.filter(col("age").isNotNull()).show()

In [0]:
# coalese nulls : returns first non-null value

from pyspark.sql.functions import coalesce, lit

df1 = df.withColumn("nw_col", coalesce(col("age"), col("id")))
df1.show()

In [0]:
data = [("John,Doe,25,Engineer",),
        ("Jane,Smith,30,Data Scientist",),
        ("Bob,Johnson,22,Analyst",),
        ("Alice,Williams,28,Manager",),
        ("Charlie,Brown,35,Developer",)]

columns = ["full_details"]

df = spark.createDataFrame(data, columns)
display(df)

In [0]:
## split() : to split the data based on delimeter 

from pyspark.sql.functions import split

df1 = (df.withColumn("firstname", split(df.full_details, ",")[0])
         .withColumn("lastName", split(df.full_details, ",")[1])
         .withColumn("Age", split(df.full_details, ",")[2])
         .withColumn("role", split(df.full_details, ",")[3])
         .drop(df.full_details))

display(df1)

In [0]:
# concat() : to concatenate the columns

from pyspark.sql.functions import concat, lit

df2 = df1.withColumn("email", concat(df1.firstname,lit("."), df1.lastName, lit("@gmail.com")))

display(df2)

In [0]:
# assignement : to extract a domain from email address (charindex, split)

In [0]:
# substring : to extract part of a string

from pyspark.sql.functions import substring, substring_index

df3 = df2.withColumn("domain", substring_index(df2.email, "@", -1)) \
         .withColumn("new_role",substring("role", 1, 3))

display(df3)