<a href="https://colab.research.google.com/github/tiasaxena/PySpark/blob/main/Pyspark_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Databricks notebook - [notebook link](https://community.cloud.databricks.com/?o=18716907705525#notebook/2636093781425835/command/2636093781425838  )

## 1. Reading CSV in Spark

### DataFrame reader API
The API used for reading the files in spark is `.read()`. The files can be CSV, JSON, Parquet, ODBC/JDBC. Parquet is the format by default.

**Syntax**
`spark.read.format("__file format__").option("key": "value").load("__path location__").schema()`

3 modes `mode()`
* Failfast - If there is any corrupted record in our data, fail execution.
* Dropmalformed - Drop the corrupted record.
* Permissive - By default. Set null value to all corrupted fields.

In [None]:
spark

In [None]:
flight_df = (
    spark.read.format("csv")
    .option("header", "false")
    .option("inferschema", "false")
    .option("mode", "FAILFAST")
    .load("/FileStore/tables/flight_data_pyspark.csv")
)

flight_df.show(5)

In [None]:
flight_df_header = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferschema", "false")
    .option("mode", "FAILFAST")
    .load("/FileStore/tables/flight_data_pyspark.csv")
)

flight_df_header.show(5)

In [None]:
flight_df_header.printSchema()

In [None]:
flight_df_schema = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "FAILFAST")
    .load("/FileStore/tables/flight_data_pyspark.csv")
)

flight_df_schema.printSchema()

## 2. Creating Manual Schema


There are two ways to create schema:
1. Struct Type - Struct Field
2. DDL


Task - We will explicity provide the Schema(which has the header of the data), with PERMISSIVE mode.


**skipRows:** this is done to remove the first row, which includes all the column names, since we have provided the schema explicitly\

In [None]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

my_schema = StructType([
  StructField("DEST_COUNTRY_NAME", StringType(), True),
  StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
  StructField("count", IntegerType(), True)
])

flight_db = spark.read.format("csv")\
                      .option("inferschema", "false")\
                      .option("skipRows", 1)\
                      .option("header", "false")\
                      .option("mode", "PERMISSIVE")\
                      .schema(my_schema)\
                      .load("/FileStore/tables/flight_data_pyspark.csv")

flight_db.show(5)

## 3. Handling Corrupted Record in Spark

When do we say our record is corrupted?
1. JSON - the {, } braces are not closed properly.
2. CSV - There are 5 fields expected, so we must have 5 comma separated values. But one of the fields(e.g., address) may have commas in itself.

Thus, in different modes, following will the behaivor be:
1. PERMISSIVE - all the records will be there, and all the corrupted values will be replaced with NULL.
2. FAILFAST - will drop the entire record even if one of the fields have a corrupted value.
3. DROPMALFOMRED - only the corrupted rows will be dropped.

In [None]:
employee_df1 = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("mode", "PERMISSIVE")\
                        .load("/FileStore/tables/employee.csv")

employee_df1.show()

In [None]:
employee_df2 = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("mode", "FAILFAST")\
                        .load("/FileStore/tables/employee.csv")

employee_df2.show()

In [None]:
employee_df3 = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("mode", "DROPMALFORMED")\
                        .load("/FileStore/tables/employee.csv")

Thus, we observe:
1. PERMISSIVE will replace corrupt values with null.
2. FAILFAST will show error if any corrupt value is there.
3. DROPMALFORMED will show only the non-corrupt rows.

In [None]:

# Printing the corrupted records
from pyspark.sql.types import *

emp_schema = StructType(
  [
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("nominee", StringType(), True),
    StructField("_corrupt_record", StringType(), True)
  ]
)

employee_df = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("mode", "PERMISSIVE")\
                        .schema(emp_schema)\
                        .load("/FileStore/tables/employee.csv")

employee_df.show(truncate=False)

In [None]:
# Storing all the corrupt records in a file
employee_df = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("badRecordsPath", "/FileStore/tables/corrupt_records_employee")\
                        .load("/FileStore/tables/employee.csv")

employee_df.show(truncate=False)

In [None]:
corrupt_data_df = spark.read.format("json").load("/FileStore/tables/corrupt_records_employee//20241208T153124/bad_records")
corrupt_data_df.show(truncate=False)

## 4. Reading JSON file in Spark

1. File uploaded to /FileStore/tables/multiline_correct.json
2. File uploaded to /FileStore/tables/multiline_incorrect.json
3. File uploaded to /FileStore/tables/line_deliminated_json.json
4. File uploaded to /FileStore/tables/line_deliminated_json_extrafield.json
5. File uploaded to /FileStore/tables/corrupted.json

**Note:** Line-deliminated works faster than Multiline


In [None]:
spark.read.format("json")\
          .option("inferschema", "true")\
          .option("mode", "PERMISSIVE")\
          .load("/FileStore/tables/line_deliminated_json.json").show(truncate=False)


In [None]:
spark.read.format("json")\
          .option("inferschema", "true")\
          .option("mode", "PERMISSIVE")\
          .load("/FileStore/tables/line_deliminated_json_extrafield.json").show(truncate=False)

**Note:** If our data is multiline, we must specify it in the option, else we will be getting an error.
`.option("multiline", "true")`

In [None]:
spark.read.format("json")\
          .option("inferschema", "true")\
          .option("mode", "PERMISSIVE")\
          .option("multiline", "true")\
          .load("/FileStore/tables/multiline_correct.json").show(truncate=False)

**Note:** Multiline JSON must always be passed as a list. Else, only the first element(like below) will be printed.

In [None]:
spark.read.format("json")\
          .option("inferschema", "true")\
          .option("mode", "PERMISSIVE")\
          .option("multiline", "true")\
          .load("/FileStore/tables/multiline_incorrect.json").show(truncate=False)

In [None]:
spark.read.format("json")\
          .option("inferschema", "true")\
          .option("mode", "PERMISSIVE")\
          .load("/FileStore/tables/corrupted.json").show(truncate=False)

## 5. Reading Parquet in Spark


What is Parquet?
* Columnar based file format -> data is filled in the tables in a column-wise fashion. That is, all entries of col1, followed by col2.. etc
* In big data, we are required to **write data once, read data many times.** So if we have a row based file format(CSV, JSON, Text files), the data stored in the continuous memory location will be `row1col1_val, row1col2_val, row1col3_val,..., row2col1_val, row2col2_val, row2col3_val,...`, while in columnar based file format, the data is stored like `col1_row1_val, col1_row2, ..., col2_row1_val, col2_row2, ...,`. Thus, all the data associated with one specific column is present in contiguous manner. So, less time and load in retreival will be taken.
* Data is present in a abinary file format, which isn't human-readable.

If we have so much processing time wasted in row based file format, then why do we need it?
* OLAP - Online Analytical Processing. When we need to only read data of few columns, we use columnar based file formatting.
* OLTP - Online Transactional Processing. When we need to write data(insert, update, delete) in the file, it will be easy to write the values if for each row, the data can be stored contiguously. So, in such a case, we use OLTP.

**Note:** Our aim is:
1. Cost reduce
2. Time for lookup reduce
3. Performance increase

**We don't need to provide any info related to inferschema, etc. because it has enough metadata on its own.**

In [None]:
df = spark.read.format("parquet").load("/FileStore/tables/flight_data.parquet")
df.show()

## 6. How to  write dataframe to disk in Spark?

Syntax - `DataFrameWriter.format().option().partitionBy().bucketBy().save()`

**Note:** We can provide the path location in `.save()` or in `.option()`.

Modes in DataframeWriter API
* Append - adds the new file to the location
* Overwrite - deletes the older file and adds the current file
* errorIfExists -  If file is already found in the location, it will return an error that we won't be able to write in the same location since the file already exists.
* ignore - ignores the older file and doesn't write anything about the current file sent to be saved in that location.

In [None]:
df = spark.read.format("csv").option("header", "true").load("/FileStore/tables/employee_write_data.csv")

df.show()

In [None]:
df.write.format("csv")\
        .option("header", "true")\
        .option("mode", "overwrite")\
        .option("path", "/FileStore/tables/csv_write")\
        .save()

In [None]:
# Check if the file is present
dbutils.fs.ls("/FileStore/tables/csv_write/")

### Repartition
In this, we will write the file in 3 separate partitions

In [None]:
df.repartition(3).write.format("csv")\
        .option("header", "true")\
        .option("mode", "overwrite")\
        .option("path", "/FileStore/tables/csv_write_repartition")\
        .save()

In [None]:
dbutils.fs.ls("/FileStore/tables/csv_write_repartition")

We can thus see, that 3 separate files are created.

## 7. Partitioning

Partitioning is a technique using which we can partition data into different files based up on a columns value. For eg., we need to store data for all `INDIANS` in one file, `RUSSIANS` in another, etc. This is useful for cases when we have t filter data and work on a specific section of dataset only.

If we partition a data on the value of column having only unique values,
`# of partitions == # of rows`

**Note:** The order in which the partiitonBy values are written matters.

In [None]:
df.write.format("csv")\
        .option("header", "true")\
        .option("path", "/FileStore/tables/partition_by_gender_address")\
        .option("mode", "overwrite")\
        .partitionBy("gender", "address")\
        .save()

In [None]:
dbutils.fs.ls("/FileStore/tables/partition_by_gender_address")

In [None]:
dbutils.fs.ls("/FileStore/tables/partition_by_gender_address/gender=f/")

## 8. Bucketing in Spark

Bucket is used to divide and store the data in equal size chunks.

In cases where partitioning does not help, for eg., when `# of partitions = # of rows` (the column is unique), we can instead do bucketing.

**Note:** if we use `.save()`, we will get error: `AnalysisException: 'save' does not support bucketBy right now.`. So, instead of `.save()` we do: `.saveAsTable(name="__name__")`.

In [None]:
df.write.format("csv")\
        .option("header", "true")\
        .option("path", "/FileStore/tables/bucket_by_id")\
        .bucketBy(3, "id")\
        .saveAsTable(name="bucket_by_id_table")

In [None]:
dbutils.fs.ls("/FileStore/tables/bucket_by_id")

## 9. Create DataFrame in Spark

**Schema** is defined as **Column Name** + **Column Datatype**

In [None]:
data = [
  (1, "Tia"),
  (2, "Ria")
]

schema = ["id", "name"]

my_df = spark.createDataFrame(data=data, schema=schema)
my_df.show()

## 10. DataFrame Transformations in Spark

In [None]:
from pyspark.sql.types import *

emp_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("salary", IntegerType(), True),
  StructField("address", StringType(), True),
  StructField("nominee", StringType(), True),
  StructField("_corrupt_record", StringType(), True),
])

employee_df = spark.read.format("csv")\
                        .option("inferschema", "true")\
                        .option("header", "true")\
                        .option("mode", "PERMISSIVE")\
                        .schema(emp_schema)\
                        .load("/FileStore/tables/employee.csv")

employee_df.show()

**Note:** The dataframe so formed is being stored in the memory in the object type `Row()`. `Row()` type object means that each of the entire row is being converted into Byte type and being stored in the memory.

In [None]:
# Creating a row
from pyspark.sql import Row

row = Row("6", "Tia", "21", "100000", "Allahabad", "nominee3")

### Multiple ways of selecting columns


**Notes:**
* Columns are expressions.
* Columns are, thus, set of transformations on one or more than one values in a record.

In [None]:
# Simply viewing a columns
employee_df.select('id').show()

In [None]:
# Change values of a column
from pyspark.sql.functions import *
from pyspark.sql.types import *

employee_df.select(col("id")+5).show()

In [None]:
# Select multiple columns
employee_df.select("id", "name", "age").show()

employee_df.select("*").show()

#--------COMBINING ALL METHODS-------
employee_df.select(col("id"), col("name"), employee_df["age"], employee_df.address).show()

### Expressions

All the **sql** statements can be written inside of **expressions**.

In [None]:
employee_df.select(expr("id + 5")).show()

# SQL statements
employee_df.select(expr("id as employee_id"), expr("name as employee_name"), expr("concat(age, address)")).show()

### SparkSQL

SparkSQL is basically SQL only. We need to convert the **`Dataframe`** into a **`TempView`**

In [None]:
# Create a virtual view of the DataFrame
employee_df.createOrReplaceTempView("employee_table")

In [None]:
spark.sql(
    """
    SELECT * FROM employee_table
    """
).show(truncate=False)

### Operations
1. Aliasing - `alias()`
2. Filter/Where - `.filter()` `.where()`
3. Literal - `lit()`
4. Adding Columns - using `withColumn()`. If column is there, data is overwritten, if not there, then a new column is cerated
5. Renaming Columns - `withColumnRenamed()`
6. Casting Data Types - `withColumn()` and `.cast()`
7. Removing Columns - `.drop()`
8. Union & UnionAll
9. If-Else - `.when()` - `.otherwise()` in spark, `case when` - `then`
10. Find Unique - `.distinct()`
11. Drop Duplicates - `.drop_duplicates()`


**Note:**
* `.distinct()` - Removes duplicate rows based on all columns in the DataFrame.
* `.dropDuplicates()` - Removes duplicate rows based on selected subset of columns (can also work on all columns).
12. Sort data - `.sort(col("salary").asc())` | .asc or .desc

In [None]:
# Alias
employee_df.select(col("id").alias("emp_id"), "name").show()

In [None]:
# Filter
# Find Salary above 1.5Lakhs
employee_df.filter((col("salary")>150000) & (col("age")<18)).show()

In [None]:
# Add a column of `last_name` with default value of `null`
employee_df.select("*", lit("null").alias("last_name")).show()

In [None]:
# Add a column
employee_df.withColumn("Last_Name", lit("null")).show()

In [None]:
# Rename column - rename id --> emp_id
employee_df.withColumnRenamed("id", "emp_id").show()

In [None]:
# Casting - chnage data type of a column from one to another
employee_df.withColumn("id", col("id").cast("string")).withColumn("salary", col("salary").cast("long")).printSchema()

In [None]:
# Remove columns
employee_df.drop("age", col("address")).show

All the above operations can be done using SQL as well.

In [None]:
spark.sql("""
          select *, "Saxena" as last_name, concat(name, last_name) as full_name, cast(id as string) from employee_table where age < 18 and salary > 150000
          """).show()

spark.sql("""
          select *, "Saxena" as last_name, concat(name, last_name) as full_name, cast(id as string) from employee_table where age < 18 and salary > 150000
          """).printSchema()

In [None]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17)]

schema = ["id", "name", "salary", "mngr_id"]

manager_df = spark.createDataFrame(data, schema)

manager_df.show()

In [None]:
manager_df.count()

In [None]:
data1=[(19 ,'Sohan',50000, 18),
(20 ,'Sima',75000,  17),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17)]

manager_df1 = spark.createDataFrame(data1, schema)

manager_df1.show()

In [None]:
manager_df.union(manager_df1).show() # adds the UNIQUE records of table 2 at the bottom of table 1
manager_df.unionAll(manager_df1).show() # add ALL the records of table 2 at the bottom of table 1

In [None]:
manager_df.union(manager_df).distinct().show()

**Note:** Thus is PySpark **`union()`** and **`unionAll()`** behave the same. To consider distinct rows, use **`.distinct()`**.

In [None]:
wrong_column_data=[(19 ,50000, 18,'Sohan'),
(20 ,75000,  17,'Sima')]

new_schema = ["id", "salary", "mngr_id", "name"]

wrong_df = spark.createDataFrame(wrong_column_data, new_schema)

manager_df.union(wrong_df).show()

In [None]:
manager_df.union(wrong_df).show()
manager_df.unionAll(wrong_df).show()

**Thus, while combining with wrong data, `union()` and `unionAll()` does'nt give any error. However, if we want to match the columns, then we must use `unionByName()`**

In [None]:
manager_df.unionByName(wrong_df).show()

In [None]:
# If-else
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]
schema = ["id", "name", "age", "salary", "location", "field"]

employee_df = spark.createDataFrame(emp_data, schema=schema)
employee_df.withColumn("adult", when(col("age") > 18, "Yes").when(col("age") < 18, "No").otherwise("Novalue")).show()

In [None]:
table = employee_df.createTempView("temp_table")

spark.sql("""
          select *,
          case when age < 18 then "minor"
          when age > 18 then "major"
          else "no-value"
          end as adult
          from temp_table
          """).show()


In [None]:
data=[(10 ,'Anil',50000, 18),
(11 ,'Vikas',75000,  16),
(12 ,'Nisha',40000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',80000,  18),
(15 ,'Mohit',45000,  18),
(16 ,'Rajesh',90000, 10),
(17 ,'Raman',55000, 16),
(18 ,'Sam',65000,   17),
(15 ,'Mohit',45000,  18),
(13 ,'Nidhi',60000,  17),
(14 ,'Priya',90000,  18),
(18 ,'Sam',65000,   17)
     ]

emp_df = spark.createDataFrame(data, ["id", "name", "salary", "mngr_id"])
emp_df.show()

In [None]:
emp_df.count()

In [None]:
# Unique
emp_df.distinct().count()

In [None]:
# find distinct records on the bases of specific column
emp_df.select("name", "id").distinct().show()

In [None]:
# Drop duplicates
drop_dup_emp_df = emp_df.drop_duplicates(["id", "name", "salary", "mngr_id"])
drop_dup_emp_df.count()

In [None]:
manager_df.sort(col("salary").asc(), col("name").desc()).show() # order matters

### Aggregation Functions

All agggregation functions must be written inside of **`.select()`**
1. Count - `.count()` skips all the `null` values
2. Min
3. Max
4. Avg
5. Sum

**Note:** `**.count**` is **action** and **transformation** both, depending on how it is used.
1. .count("col) - action
2. .select(count("col)) - transformation --> this will not create any **job** til the time action like **.show()** is not hit.

In [None]:
emp_data = [
(1,'manish',26,20000,'india','IT'),
(2,'rahul',None,40000,'germany','engineering'),
(3,'pawan',12,60000,'india','sales'),
(4,'roshini',44,None,'uk','engineering'),
(5,'raushan',35,70000,'india','sales'),
(6,None,29,200000,'uk','IT'),
(7,'adam',37,65000,'us','IT'),
(8,'chris',16,40000,'us','sales'),
(None,None,None,None,None,None),
(7,'adam',37,65000,'us','IT')
]

emp_df = spark.createDataFrame(emp_data, ["id", "name", "age", "salary", "location", "field"])

In [None]:
emp_df.count()

In [None]:
emp_df.select(count("name")).show() # skips the null values

In [None]:
# Max, Min, Sum, Avg
emp_df.select(sum("salary").alias("Total Salary"), max("salary").alias("Max Salary"), min("salary"), avg("salary").cast("int")).show()

### GroupBy

Groups records based on specific column with similar values.

Ques
1. Total salary given to its employee
2. Total salary per department wise
3. Total salary per dpt and per country wise
4. all column name with one extra column where total salary of each dpt is mentioned
5. %age of salary given to each employee per dpt

In [None]:
emp_data = [(1,'manish',50000,'IT'),
(2,'vikash',60000,'sales'),
(3,'raushan',70000,'marketing'),
(4,'mukesh',80000,'IT'),
(5,'pritam',90000,'sales'),
(6,'nikita',45000,'marketing'),
(7,'ragini',55000,'marketing'),
(8,'rakesh',100000,'IT'),
(9,'aditya',65000,'IT'),
(10,'rahul',50000,'marketing')]

emp_df = spark.createDataFrame(emp_data, ['id', 'name', 'salary', 'department'])
emp_df.show()

In [None]:
# 1. Total salary given to its employee
emp_df.select(sum('salary')).show()

In [None]:
# 2. Total salary given per department
emp_df.groupBy('department')\
      .agg(sum("salary")).show()

Now, to the above result, if we want the table to include the name, dept, salary, sum(salary)....we will have to use:
1. **join** - store the result of this data in a table and then join on `department`
2. **window** function

In [None]:
# 3. Total salary per dpt and per country wise

emp_data_with_country = [
(1,'manish',50000,'IT','india'),
(2,'vikash',60000,'sales','us'),
(3,'raushan',70000,'marketing','india'),
(4,'mukesh',80000,'IT','us'),
(5,'pritam',90000,'sales','india'),
(6,'nikita',45000,'marketing','us'),
(7,'ragini',55000,'marketing','india'),
(8,'rakesh',100000,'IT','us'),
(9,'aditya',65000,'IT','india'),
(10,'rahul',50000,'marketing','us'),
]

emp_df_with_country = spark.createDataFrame(emp_data_with_country, ["id", "name", "salary", "department", "country"])
emp_df_with_country.groupBy("department", "country")\
                   .agg(sum("salary")).show()


### Join in Spark

* Condition to implement a join - **Atleast one column** should match in the corresponding tables.
* It is an **expensive** operation since it causes **data shuffling.**
* **Syntax** - `df1.join(df2, condition, join_type)`.
* **Multiple Joins** - `.join( (join1), (join2), ....)`

Types of joins:
1. Inner join - Records found in both
2. Left join - Left Table + Common b/w left & right(will set NULL in the required fields of right table)
3. Right join - Right table + Common b/w left & right(will set NULL in the required fields of left table)
4. Outer join - Left + right table
5. Left Semi - Same as Inner join only
6. Left Anti - Only Left - Common Part
7. Cross Join - Very expensive operation. Returns the dot product of both the table. len(A cross_join B) == len(A) * len(B)

In [None]:

customer_data = [(1,'manish','patna',"30-05-2022"),
(2,'vikash','kolkata',"12-03-2023"),
(3,'nikita','delhi',"25-06-2023"),
(4,'rahul','ranchi',"24-03-2023"),
(5,'mahesh','jaipur',"22-03-2023"),
(6,'prantosh','kolkata',"18-10-2022"),
(7,'raman','patna',"30-12-2022"),
(8,'prakash','ranchi',"24-02-2023"),
(9,'ragini','kolkata',"03-03-2023"),
(10,'raushan','jaipur',"05-02-2023")]

customer_schema=['customer_id','customer_name','address','date_of_joining']


sales_data = [(1,22,10,"01-06-2022"),
(1,27,5,"03-02-2023"),
(2,5,3,"01-06-2023"),
(5,22,1,"22-03-2023"),
(7,22,4,"03-02-2023"),
(9,5,6,"03-03-2023"),
(2,1,12,"15-06-2023"),
(1,56,2,"25-06-2023"),
(5,12,5,"15-04-2023"),
(11,12,76,"12-03-2023")]

sales_schema=['customer_id','product_id','quantity','date_of_purchase']


product_data = [(1, 'fanta',20),
(2, 'dew',22),
(5, 'sprite',40),
(7, 'redbull',100),
(12,'mazza',45),
(22,'coke',27),
(25,'limca',21),
(27,'pepsi',14),
(56,'sting',10)]

product_schema=['id','name','price']

customer_df = spark.createDataFrame(customer_data, customer_schema)
sales_df = spark.createDataFrame(sales_data, sales_schema)
product_df = spark.createDataFrame(product_data, product_schema)

In [None]:
# Give the customers id of customers who came to our platform but did buy product/products
customer_df.join(sales_df, customer_df["customer_id"] == sales_df["customer_id"], "inner").select(customer_df["customer_id"]).distinct().show()


In [None]:
# Give the name and customer id of all the customer who came to the site but did not shop anything.
customer_df.join(sales_df, customer_df["customer_id"] == sales_df["customer_id"], "left")\
          .filter(sales_df["customer_id"].isNull())\
          .select(customer_df["customer_id"], customer_df["customer_name"]).show()

## 11. Window function in Spark

A **window function** looks at a specific set of rows (called a *window*) and calculates something for each row based on that group. Unlike regular group functions (like `SUM` or `AVG`), which give just one result for the entire group, window functions give a result for every row in the group.

For example:

If you have a table of employees with their salaries, you could use a window function to calculate the **average salary** of all employees **but still show each individual employee's salary**.

**Syntax:**
`from pyspark.sql.window import Window`

`window = Window.partitionby("col_name)`

The above syntax will create windows for all the records as per the col_name provided.

Key componenets:
1. `partitionBy("col_name")` : This is like splitting the data into small, manageable groups based on a specific column.
2. `orderBy("col_name)`: This sorts the rows inside each partition, usually by a specific column.
3. `window frame`: Defines the specific range of rows to look at for each calculation (e.g., all rows, just the previous row, etc.).

Options: `UNBOUNDED PRECEDING` | `offset PRECEDING` | `CURRENT ROW` | `offset FOLLOWING` | `UNBOUNDED FOLLOWING`



In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
emp_data = [(1,'manish',50000,'IT','m'),
(2,'vikash',60000,'sales','m'),
(3,'raushan',70000,'marketing','m'),
(4,'mukesh',80000,'IT','m'),
(5,'priti',90000,'sales','f'),
(6,'nikita',45000,'marketing','f'),
(7,'ragini',55000,'marketing','f'),
(8,'rashi',100000,'IT','f'),
(9,'aditya',65000,'IT','m'),
(10,'rahul',50000,'marketing','m'),
(11,'rakhi',50000,'IT','f'),
(12,'akhilesh',90000,'sales','m')]

emp_df = spark.createDataFrame(emp_data, ["id", "name", "salary", "department", "gender"])
emp_df.show()

+---+--------+------+----------+------+
| id|    name|salary|department|gender|
+---+--------+------+----------+------+
|  1|  manish| 50000|        IT|     m|
|  2|  vikash| 60000|     sales|     m|
|  3| raushan| 70000| marketing|     m|
|  4|  mukesh| 80000|        IT|     m|
|  5|   priti| 90000|     sales|     f|
|  6|  nikita| 45000| marketing|     f|
|  7|  ragini| 55000| marketing|     f|
|  8|   rashi|100000|        IT|     f|
|  9|  aditya| 65000|        IT|     m|
| 10|   rahul| 50000| marketing|     m|
| 11|   rakhi| 50000|        IT|     f|
| 12|akhilesh| 90000|     sales|     m|
+---+--------+------+----------+------+



In [None]:
# List all column name with one extra column where total salary of each dpt is mentioned
from pyspark.sql.window import Window

window = Window.partitionBy("department")
print(f"The windows so created are: \n {window}")

emp_df.withColumn("total_salary", sum(col("salary")).over(window)).show(truncate=False)

The windows so created are: 
 <pyspark.sql.window.WindowSpec object at 0x7f2c26af11f0>
+---+--------+------+----------+------+------------+
|id |name    |salary|department|gender|total_salary|
+---+--------+------+----------+------+------------+
|1  |manish  |50000 |IT        |m     |345000      |
|4  |mukesh  |80000 |IT        |m     |345000      |
|8  |rashi   |100000|IT        |f     |345000      |
|9  |aditya  |65000 |IT        |m     |345000      |
|11 |rakhi   |50000 |IT        |f     |345000      |
|3  |raushan |70000 |marketing |m     |220000      |
|6  |nikita  |45000 |marketing |f     |220000      |
|7  |ragini  |55000 |marketing |f     |220000      |
|10 |rahul   |50000 |marketing |m     |220000      |
|2  |vikash  |60000 |sales     |m     |240000      |
|5  |priti   |90000 |sales     |f     |240000      |
|12 |akhilesh|90000 |sales     |m     |240000      |
+---+--------+------+----------+------+------------+



### Row Number, Rank, Dense Rank

Row Number - Assigns a **unique number** to each row, starting from 1, in the order specified. No ties are allowed; every row gets a different number.

Rank - Assigns a rank based on the order, but rows with the **same value (ties) get the same rank**. After ties, the rank **skips numbers**.

Dense Rank -  Similar to rank, but it **does not skip numbers after ties**. Rows with the same value still get the same rank.

In [None]:
# Give the row no., rank and dense rank for the window in asc order by salary over dept
window = Window.partitionBy("department").orderBy("salary")

emp_df.withColumn("row number", row_number().over(window)).withColumn("rank", rank().over(window)).withColumn("dense rank", dense_rank().over(window)).show(truncate=False)

+---+--------+------+----------+------+----------+----+----------+
|id |name    |salary|department|gender|row number|rank|dense rank|
+---+--------+------+----------+------+----------+----+----------+
|1  |manish  |50000 |IT        |m     |1         |1   |1         |
|11 |rakhi   |50000 |IT        |f     |2         |1   |1         |
|9  |aditya  |65000 |IT        |m     |3         |3   |2         |
|4  |mukesh  |80000 |IT        |m     |4         |4   |3         |
|8  |rashi   |100000|IT        |f     |5         |5   |4         |
|6  |nikita  |45000 |marketing |f     |1         |1   |1         |
|10 |rahul   |50000 |marketing |m     |2         |2   |2         |
|7  |ragini  |55000 |marketing |f     |3         |3   |3         |
|3  |raushan |70000 |marketing |m     |4         |4   |4         |
|2  |vikash  |60000 |sales     |m     |1         |1   |1         |
|5  |priti   |90000 |sales     |f     |2         |2   |2         |
|12 |akhilesh|90000 |sales     |m     |3         |2   |2      

In [None]:
# Give the row no., rank and dense rank for the window in asc order by salary, and gender over dept
window = Window.partitionBy("department", "gender").orderBy("salary")

emp_df.withColumn("row number", row_number().over(window)).withColumn("rank", rank().over(window)).withColumn("dense rank", dense_rank().over(window)).show(truncate=False)

+---+--------+------+----------+------+----------+----+----------+
|id |name    |salary|department|gender|row number|rank|dense rank|
+---+--------+------+----------+------+----------+----+----------+
|11 |rakhi   |50000 |IT        |f     |1         |1   |1         |
|8  |rashi   |100000|IT        |f     |2         |2   |2         |
|1  |manish  |50000 |IT        |m     |1         |1   |1         |
|9  |aditya  |65000 |IT        |m     |2         |2   |2         |
|4  |mukesh  |80000 |IT        |m     |3         |3   |3         |
|6  |nikita  |45000 |marketing |f     |1         |1   |1         |
|7  |ragini  |55000 |marketing |f     |2         |2   |2         |
|10 |rahul   |50000 |marketing |m     |1         |1   |1         |
|3  |raushan |70000 |marketing |m     |2         |2   |2         |
|5  |priti   |90000 |sales     |f     |1         |1   |1         |
|2  |vikash  |60000 |sales     |m     |1         |1   |1         |
|12 |akhilesh|90000 |sales     |m     |2         |2   |2      

In [None]:
# Calculate the top 2 performers from each dpt
window = Window.partitionBy("department").orderBy(desc("salary"))

emp_df.withColumn("Dense Rank", dense_rank().over(window)).filter(col("Dense Rank") <= 2).show(truncate=False)

+---+--------+------+----------+------+----------+
|id |name    |salary|department|gender|Dense Rank|
+---+--------+------+----------+------+----------+
|8  |rashi   |100000|IT        |f     |1         |
|4  |mukesh  |80000 |IT        |m     |2         |
|3  |raushan |70000 |marketing |m     |1         |
|7  |ragini  |55000 |marketing |f     |2         |
|5  |priti   |90000 |sales     |f     |1         |
|12 |akhilesh|90000 |sales     |m     |1         |
|2  |vikash  |60000 |sales     |m     |2         |
+---+--------+------+----------+------+----------+



### Lead and Lag in Spark

- **Lead**: Fetches the value from the **next row** in the same column within a partition.  
- **Lag**: Fetches the value from the **previous row** in the same column within a partition.  

In [None]:
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]


product_df = spark.createDataFrame(product_data, ['product_id', 'product_name', 'sales_date', 'sales'])
product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         1|      iphone|01-01-2023|1500000|
|         2|     samsung|01-01-2023|1100000|
|         3|     oneplus|01-01-2023|1100000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [None]:
# list the %age of loss/gain based on previous months sale

window = Window.partitionBy("product_name").orderBy('sales_date')

product_df.withColumn('previous_month_sale', lag(col('sales'), 1).over(window))\
          .withColumn('percentage_loss_gain', round((col('sales') - col('previous_month_sale')) * 100 / col('sales'), 2))\
          .show(truncate=False)

+----------+------------+----------+-------+-------------------+--------------------+
|product_id|product_name|sales_date|sales  |previous_month_sale|percentage_loss_gain|
+----------+------------+----------+-------+-------------------+--------------------+
|1         |iphone      |01-01-2023|1500000|null               |null                |
|1         |iphone      |01-02-2023|1300000|1500000            |-15.38              |
|1         |iphone      |01-03-2023|1600000|1300000            |18.75               |
|1         |iphone      |01-04-2023|1700000|1600000            |5.88                |
|1         |iphone      |01-05-2023|1200000|1700000            |-41.67              |
|1         |iphone      |01-06-2023|1100000|1200000            |-9.09               |
|3         |oneplus     |01-01-2023|1100000|null               |null                |
|3         |oneplus     |01-02-2023|1120000|1100000            |1.79                |
|3         |oneplus     |01-03-2023|1160000|1120000   

### `Rows Between` and `Rows Between` in Spark

* They solve the issues which are not solvable by Lead, Lag, Row number, Rank, Dense Rank

Questions-
1. Find out the difference in sales, of each product from their first month sales to latest month
2. Send a mail to all the employees who have not completed 8 hours in office when they work from office
3. Find out the performance of sales based on last 3 months average

* **`UNBOUNDED_PRECEDING`** (**default**) - Refers to the first row in the partition, regardless of its position.

Syntax: `.rowsBetween(Window.unboundedPreceding, Window.currentRow)`

* **`UNBOUNDED FOLLOWING`** - Refers to the last row in the partition, regardless of its position.

Syntax: `.rowsBetween(Window.currentRow, Window.unboundedFollowing)`

* **`CURRENT ROW`** - Refers to the row being processed at the moment.

Syntax: `.rowsBetween(Window.currentRow, Window.currentRow)`

* **`ROWS BETWEEN`** - Defines a fixed range of rows to include around the current row based on row positions.

Syntax: `.rowsBetween(start_offset, end_offset)`

* **`RANGE BETWEEN`** - Defines a range of values (e.g., time or scores) to include around the current row based on ordered values.

Syntax: `.rangeBetween(Window.unboundedPreceding, Window.currentRow)`


**Note: ** For debugging and knowing the physical plan, we can use **`.explain()`** after the query.

In [None]:
# Data for Question 1
product_data = [
(2,"samsung","01-01-1995",11000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-01-2006",15000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(3,"oneplus","01-01-2010",23000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

product_df = spark.createDataFrame(data=product_data,schema=product_schema)

product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         2|     samsung|01-01-1995|  11000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-01-2006|  15000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         3|     oneplus|01-01-2010|  23000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [None]:
# 1. Find out the difference in sales, of each product from their first month sales to latest month
window = Window.partitionBy("product_name").orderBy("sales_date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

product_df.withColumn("first_month_sale", first(col("sales")).over(window))\
          .withColumn("latest_month_sale", last(col("sales")).over(window))\
          .withColumn("latest_minus_first_month_sales", col("latest_month_sale") - col("first_month_sale"))\
          .select("product_id","product_name", "latest_minus_first_month_sales").distinct().show()

+----------+------------+------------------------------+
|product_id|product_name|latest_minus_first_month_sales|
+----------+------------+------------------------------+
|         1|      iphone|                       1085000|
|         3|     oneplus|                       1177000|
|         2|     samsung|                       1089000|
+----------+------------+------------------------------+



In [None]:
# Data for Question 2
emp_data = [(1,"manish","11-07-2023","10:20"),
        (1,"manish","11-07-2023","11:20"),
        (2,"rajesh","11-07-2023","11:20"),
        (1,"manish","11-07-2023","11:50"),
        (2,"rajesh","11-07-2023","13:20"),
        (1,"manish","11-07-2023","19:20"),
        (2,"rajesh","11-07-2023","17:20"),
        (1,"manish","12-07-2023","10:32"),
        (1,"manish","12-07-2023","12:20"),
        (3,"vikash","12-07-2023","09:12"),
        (1,"manish","12-07-2023","16:23"),
        (3,"vikash","12-07-2023","18:08")]

emp_schema = ["id", "name", "date", "time"]
emp_df = spark.createDataFrame(data=emp_data, schema=emp_schema)

emp_df.show()

+---+------+----------+-----+
| id|  name|      date| time|
+---+------+----------+-----+
|  1|manish|11-07-2023|10:20|
|  1|manish|11-07-2023|11:20|
|  2|rajesh|11-07-2023|11:20|
|  1|manish|11-07-2023|11:50|
|  2|rajesh|11-07-2023|13:20|
|  1|manish|11-07-2023|19:20|
|  2|rajesh|11-07-2023|17:20|
|  1|manish|12-07-2023|10:32|
|  1|manish|12-07-2023|12:20|
|  3|vikash|12-07-2023|09:12|
|  1|manish|12-07-2023|16:23|
|  3|vikash|12-07-2023|18:08|
+---+------+----------+-----+



In [None]:
# 2. Send a mail to all the employees who have not completed 8 hours in office when they work from office

# First we need to add a column for unix timestamp so that we can +/-, etc
emp_df = emp_df.withColumn("timestamp", from_unixtime(unix_timestamp(expr("CONCAT(date, ' ', time)"), "dd-MM-yyyy HH:mm")))

emp_df.show()

+---+------+----------+-----+-------------------+
| id|  name|      date| time|          timestamp|
+---+------+----------+-----+-------------------+
|  1|manish|11-07-2023|10:20|2023-07-11 10:20:00|
|  1|manish|11-07-2023|11:20|2023-07-11 11:20:00|
|  2|rajesh|11-07-2023|11:20|2023-07-11 11:20:00|
|  1|manish|11-07-2023|11:50|2023-07-11 11:50:00|
|  2|rajesh|11-07-2023|13:20|2023-07-11 13:20:00|
|  1|manish|11-07-2023|19:20|2023-07-11 19:20:00|
|  2|rajesh|11-07-2023|17:20|2023-07-11 17:20:00|
|  1|manish|12-07-2023|10:32|2023-07-12 10:32:00|
|  1|manish|12-07-2023|12:20|2023-07-12 12:20:00|
|  3|vikash|12-07-2023|09:12|2023-07-12 09:12:00|
|  1|manish|12-07-2023|16:23|2023-07-12 16:23:00|
|  3|vikash|12-07-2023|18:08|2023-07-12 18:08:00|
+---+------+----------+-----+-------------------+



In [None]:
window = Window.partitionBy('id', 'date').orderBy('date').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

new_df = emp_df.withColumn('login', first('timestamp').over(window))\
               .withColumn('logout', last('timestamp').over(window))\
               .withColumn('login', to_timestamp('login', 'yyyy-MM-dd HH:mm:ss'))\
               .withColumn('logout', to_timestamp('logout', 'yyyy-MM-dd HH:mm:ss'))\
                .withColumn("total_time_seconds", (unix_timestamp('logout') - unix_timestamp('login')))\
               .withColumn("total_time_hours", round(col("total_time_seconds") / 3600, 2))

new_df.show(truncate=False)

+---+------+----------+-----+-------------------+-------------------+-------------------+------------------+----------------+
|id |name  |date      |time |timestamp          |login              |logout             |total_time_seconds|total_time_hours|
+---+------+----------+-----+-------------------+-------------------+-------------------+------------------+----------------+
|1  |manish|11-07-2023|10:20|2023-07-11 10:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|32400             |9.0             |
|1  |manish|11-07-2023|11:20|2023-07-11 11:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|32400             |9.0             |
|1  |manish|11-07-2023|11:50|2023-07-11 11:50:00|2023-07-11 10:20:00|2023-07-11 19:20:00|32400             |9.0             |
|1  |manish|11-07-2023|19:20|2023-07-11 19:20:00|2023-07-11 10:20:00|2023-07-11 19:20:00|32400             |9.0             |
|1  |manish|12-07-2023|10:32|2023-07-12 10:32:00|2023-07-12 10:32:00|2023-07-12 16:23:00|21060             |5.85      

In [None]:
mailing_list = new_df.filter(col('total_time_hours') < 8).select("id", "name", "date", "total_time_hours").distinct()
mailing_list.show()

+---+------+----------+----------------+
| id|  name|      date|total_time_hours|
+---+------+----------+----------------+
|  1|manish|12-07-2023|            5.85|
|  2|rajesh|11-07-2023|             6.0|
+---+------+----------+----------------+



In [None]:
# Question 3 Data
product_data = [
(1,"iphone","01-01-2023",1500000),
(2,"samsung","01-01-2023",1100000),
(3,"oneplus","01-01-2023",1100000),
(1,"iphone","01-02-2023",1300000),
(2,"samsung","01-02-2023",1120000),
(3,"oneplus","01-02-2023",1120000),
(1,"iphone","01-03-2023",1600000),
(2,"samsung","01-03-2023",1080000),
(3,"oneplus","01-03-2023",1160000),
(1,"iphone","01-04-2023",1700000),
(2,"samsung","01-04-2023",1800000),
(3,"oneplus","01-04-2023",1170000),
(1,"iphone","01-05-2023",1200000),
(2,"samsung","01-05-2023",980000),
(3,"oneplus","01-05-2023",1175000),
(1,"iphone","01-06-2023",1100000),
(2,"samsung","01-06-2023",1100000),
(3,"oneplus","01-06-2023",1200000)
]

product_schema=["product_id","product_name","sales_date","sales"]

product_df = spark.createDataFrame(data=product_data,schema=product_schema)

product_df.show()

+----------+------------+----------+-------+
|product_id|product_name|sales_date|  sales|
+----------+------------+----------+-------+
|         1|      iphone|01-01-2023|1500000|
|         2|     samsung|01-01-2023|1100000|
|         3|     oneplus|01-01-2023|1100000|
|         1|      iphone|01-02-2023|1300000|
|         2|     samsung|01-02-2023|1120000|
|         3|     oneplus|01-02-2023|1120000|
|         1|      iphone|01-03-2023|1600000|
|         2|     samsung|01-03-2023|1080000|
|         3|     oneplus|01-03-2023|1160000|
|         1|      iphone|01-04-2023|1700000|
|         2|     samsung|01-04-2023|1800000|
|         3|     oneplus|01-04-2023|1170000|
|         1|      iphone|01-05-2023|1200000|
|         2|     samsung|01-05-2023| 980000|
|         3|     oneplus|01-05-2023|1175000|
|         1|      iphone|01-06-2023|1100000|
|         2|     samsung|01-06-2023|1100000|
|         3|     oneplus|01-06-2023|1200000|
+----------+------------+----------+-------+



In [None]:
# 3. Find out the performance of sales based on last 3 months average
# Note: We need to drop the data of January and February beacuse we want the last 3 months average

# Define the window spec to look at the last 2 rows (including current row)
window = Window.partitionBy("product_id").orderBy("sales_date").rowsBetween(-2, 0)

product_df = product_df.withColumn("running_sum", sum("sales").over(window))\

product_df.show()

+----------+------------+----------+-------+-----------+
|product_id|product_name|sales_date|  sales|running_sum|
+----------+------------+----------+-------+-----------+
|         1|      iphone|01-01-2023|1500000|    1500000|
|         1|      iphone|01-02-2023|1300000|    2800000|
|         1|      iphone|01-03-2023|1600000|    4400000|
|         1|      iphone|01-04-2023|1700000|    4600000|
|         1|      iphone|01-05-2023|1200000|    4500000|
|         1|      iphone|01-06-2023|1100000|    4000000|
|         2|     samsung|01-01-2023|1100000|    1100000|
|         2|     samsung|01-02-2023|1120000|    2220000|
|         2|     samsung|01-03-2023|1080000|    3300000|
|         2|     samsung|01-04-2023|1800000|    4000000|
|         2|     samsung|01-05-2023| 980000|    3860000|
|         2|     samsung|01-06-2023|1100000|    3880000|
|         3|     oneplus|01-01-2023|1100000|    1100000|
|         3|     oneplus|01-02-2023|1120000|    2220000|
|         3|     oneplus|01-03-

In [None]:
product_df = product_df.withColumn("row_number", row_number().over(Window.partitionBy("product_id").orderBy("sales_date")))
product_df.show()

+----------+------------+----------+-------+-----------+----------+
|product_id|product_name|sales_date|  sales|running_sum|row_number|
+----------+------------+----------+-------+-----------+----------+
|         1|      iphone|01-01-2023|1500000|    1500000|         1|
|         1|      iphone|01-02-2023|1300000|    2800000|         2|
|         1|      iphone|01-03-2023|1600000|    4400000|         3|
|         1|      iphone|01-04-2023|1700000|    4600000|         4|
|         1|      iphone|01-05-2023|1200000|    4500000|         5|
|         1|      iphone|01-06-2023|1100000|    4000000|         6|
|         2|     samsung|01-01-2023|1100000|    1100000|         1|
|         2|     samsung|01-02-2023|1120000|    2220000|         2|
|         2|     samsung|01-03-2023|1080000|    3300000|         3|
|         2|     samsung|01-04-2023|1800000|    4000000|         4|
|         2|     samsung|01-05-2023| 980000|    3860000|         5|
|         2|     samsung|01-06-2023|1100000|    

In [None]:
product_df = product_df.filter(col("row_number") > 2)

product_df.show()


+----------+------------+----------+-------+-----------+----------+
|product_id|product_name|sales_date|  sales|running_sum|row_number|
+----------+------------+----------+-------+-----------+----------+
|         1|      iphone|01-03-2023|1600000|    4400000|         3|
|         1|      iphone|01-04-2023|1700000|    4600000|         4|
|         1|      iphone|01-05-2023|1200000|    4500000|         5|
|         1|      iphone|01-06-2023|1100000|    4000000|         6|
|         2|     samsung|01-03-2023|1080000|    3300000|         3|
|         2|     samsung|01-04-2023|1800000|    4000000|         4|
|         2|     samsung|01-05-2023| 980000|    3860000|         5|
|         2|     samsung|01-06-2023|1100000|    3880000|         6|
|         3|     oneplus|01-03-2023|1160000|    3380000|         3|
|         3|     oneplus|01-04-2023|1170000|    3450000|         4|
|         3|     oneplus|01-05-2023|1175000|    3505000|         5|
|         3|     oneplus|01-06-2023|1200000|    

In [None]:
product_df.withColumn("running_avg", round(col("running_sum")/3, 2)).show()

+----------+------------+----------+-------+-----------+----------+-----------+
|product_id|product_name|sales_date|  sales|running_sum|row_number|running_avg|
+----------+------------+----------+-------+-----------+----------+-----------+
|         1|      iphone|01-03-2023|1600000|    4400000|         3| 1466666.67|
|         1|      iphone|01-04-2023|1700000|    4600000|         4| 1533333.33|
|         1|      iphone|01-05-2023|1200000|    4500000|         5|  1500000.0|
|         1|      iphone|01-06-2023|1100000|    4000000|         6| 1333333.33|
|         2|     samsung|01-03-2023|1080000|    3300000|         3|  1100000.0|
|         2|     samsung|01-04-2023|1800000|    4000000|         4| 1333333.33|
|         2|     samsung|01-05-2023| 980000|    3860000|         5| 1286666.67|
|         2|     samsung|01-06-2023|1100000|    3880000|         6| 1293333.33|
|         3|     oneplus|01-03-2023|1160000|    3380000|         3| 1126666.67|
|         3|     oneplus|01-04-2023|1170

## 12. Flatten nested JSON in Spark

In [None]:
restaurant_df = spark.read.format("json").option("multiline", "true").option("inferschema", "true").load("/FileStore/tables/resturant_json_data.json")
restaurant_df.show()

+----+-------+--------------------+-------------+-------------+-------------+------+
|code|message|         restaurants|results_found|results_shown|results_start|status|
+----+-------+--------------------+-------------+-------------+-------------+------+
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17066603}, b9...|         6835|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17093124}, b9...|         8680|           20|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|                  []|            0|            0|            1|  null|
|null|   null|[{{{17580142}, b9...|          943|           20|            1|  null|
|null|   null|                  []|            0|            0|  

In [None]:
restaurant_df.printSchema()

root
 |-- code: long (nullable = true)
 |-- message: string (nullable = true)
 |-- restaurants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- restaurant: struct (nullable = true)
 |    |    |    |-- R: struct (nullable = true)
 |    |    |    |    |-- res_id: long (nullable = true)
 |    |    |    |-- apikey: string (nullable = true)
 |    |    |    |-- average_cost_for_two: long (nullable = true)
 |    |    |    |-- cuisines: string (nullable = true)
 |    |    |    |-- currency: string (nullable = true)
 |    |    |    |-- deeplink: string (nullable = true)
 |    |    |    |-- establishment_types: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- events_url: string (nullable = true)
 |    |    |    |-- featured_image: string (nullable = true)
 |    |    |    |-- has_online_delivery: long (nullable = true)
 |    |    |    |-- has_table_booking: long (nullable = true)
 |    |    |    |-- i

- We have to thus, flatten **array** and **struct** type. For array type, we **`explode()`** and for struct type, we will use **`.(dot)`** to access children.


When we explode, if the value stored is `null`, then the records are dropped. In such a case, we use **`explode_outer`**

In [None]:
restaurant_df.select("*", explode("restaurants").alias("flattened_restaurants"))\
            .drop("restaurants")\
            .select("*", "flattened_restaurants.restaurant.R.res_id",
                explode_outer("flattened_restaurants.restaurant.establishment_types").alias("flattened_establishment_types"))\
            .drop("flattened_restaurants.restaurant.establishment_types")\
            .show()

+----+-------+-------------+-------------+-------------+------+---------------------+--------+-----------------------------+
|code|message|results_found|results_shown|results_start|status|flattened_restaurants|  res_id|flattened_establishment_types|
+----+-------+-------------+-------------+-------------+------+---------------------+--------+-----------------------------+
|null|   null|         6835|           20|            1|  null| {{{17066603}, b90...|17066603|                         null|
|null|   null|         6835|           20|            1|  null| {{{17059541}, b90...|17059541|                         null|
|null|   null|         6835|           20|            1|  null| {{{17064405}, b90...|17064405|                         null|
|null|   null|         6835|           20|            1|  null| {{{17057797}, b90...|17057797|                         null|
|null|   null|         6835|           20|            1|  null| {{{17057591}, b90...|17057591|                         null|
