## Spark-DataFrame  Fundamental Practice


In [1]:
import os
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
#set the PySpark environment veriable
os.environ['JAVA_HOME']="E:\jdk-22"
os.environ['SPARK_HOME'] = "E:\spark-3.5.1-bin-hadoop3"
os.environ['PYSPARK_DRIVER_PYTHON'] = "jupyter"
os.environ['PYSPARK_DRIVER_PYTHON_OPS'] = "notebook"
os.environ['PYSPARK_PYTHON'] = "python"

In [3]:
#Sart the Sparksession
spark = SparkSession.builder.appName("Practice")\
.config("spark.executor.memory","4g")\
.config("spark.sql.shuffle.partitions","4")\
.config("spark.jars","E:\jdbc-driver\postgresql-42.7.3.jar")\
.getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
sc

### 1. Creating DataFrame and Loading CSV, Text, Json,  Parquet File Into DataFrame

In [None]:
# Create DataFrame
data = [
  ('James','Smith','1991-04-01','M',3000),
  ('Michael','Rose','2000-05-19','M',4000),
  ('Robert','Williams','1978-09-05','M',4000),
  ('Maria','Jones','1967-12-01','F',4000),
  ('Jen','Brown','1980-02-17','F',8500),
  ('Keith','Lina','1982-09-26','F',8000)
]

columns = ["firstname","lastname","dob","gender","salary"]

pyspark_df = spark.createDataFrame(data=data, schema = columns)

In [None]:
#show the list of Schema
pyspark_df.printSchema()

In [None]:
#show data values
pyspark_df.show()

In [None]:
#Read git from csv and store pysparkDataFrame
url_github = r"https://raw.githubusercontent.com/muttinenisairohith/Datasets/b0bb96f293adbb803e24c26b7780e078372d3703/data/test2.csv"
pd_df = pd.read_csv(url_github)
df_pyspark1 = spark.createDataFrame(pd_df)
df_pyspark1.show()

In [None]:
#group by Departments which gives summation of salaries
df_pyspark1.groupBy("Departments").sum("salary").show()

In [None]:
df_pyspark1.groupBy("Departments").agg(({"salary":"sum"})).show()

In [None]:
df_pyspark1.groupBy("Departments").min("salary").show()
df_pyspark1.groupBy("Departments").max("salary").show()
df_pyspark1.groupBy("Departments").avg("salary").show()
df_pyspark1.groupBy("Departments").mean("salary").show()

In [None]:
#Using SparkFiles Load the dataframe
from pyspark import SparkFiles
student_info = r"https://raw.githubusercontent.com/AISCIENCES/course-master-big-data-with-pyspark-and-aws/main/Code/03-Spark%20DFs/StudentData.csv"
spark.sparkContext.addFile(student_info)
student_df = spark.read.csv(SparkFiles.get("StudentData.csv"),inferSchema=True, header=True)
student_df.show()

In [None]:
print(student_df.count())
print(len(student_df.dtypes))

In [None]:
#Dropping rows based on null values
student_df.na.drop().show()

In [None]:
#show Pandase Data Frame from sparkDataframe
student_df.toPandas()

## Using RDDs texFile Reading storing data

In [None]:
rdd_data = spark.sparkContext.textFile("./data/data.txt")
result_rdd = rdd_data.flatMap(lambda line: line.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda x: x[1], ascending=False)

In [None]:
result_rdd.take(10)

### Using DataFrames

In [None]:
from pyspark.sql.functions import desc
df_textData = spark.read.text("./data/data.txt")
result_textData = df_textData.selectExpr("explode(split(value, ' ')) as word") \
    .groupBy("word").count().orderBy(desc("count"))

In [None]:
result_textData.show()

In [None]:
result_textData.take(10)

## Read CSV file into DataFrame

In [None]:
%%bash 
head -10 ./data/products.csv

## Read CSV with header

In [None]:
# Read CSV file into DataFrame
csv_file_path = "./data/products.csv"
product_df = spark.read.csv(csv_file_path, header=True)

In [None]:
# Display schema of DataFrame
product_df.printSchema()

# Display content of DataFrame
product_df.show(5)

### Read CSV with an explicit schema definition

In [None]:
# import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [None]:
# Define the schema
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="name", dataType=StringType(), nullable=True),
    StructField(name="category", dataType=StringType(), nullable=True),
    StructField(name="quantity", dataType=IntegerType(), nullable=True),
    StructField(name="price", dataType=DoubleType(), nullable=True)
])

# Read CSV file into DataFrame with schema definition
csv_file_path = "./data/products.csv"
product_df1 = spark.read.csv(csv_file_path, header=True, schema=schema)

In [None]:
# Display schema of DataFrame
product_df1.printSchema()

# Display content of DataFrame
product_df1.show(5)

#### Read CSV with using inferSchema Atomatically define schema type by using InferSchema=True

In [None]:
# Read CSV file into DataFrame with inferSchema
csv_file_path = "./data/products.csv"
product_df2 = spark.read.csv(csv_file_path, header=True, inferSchema=True)

In [None]:
# Display schema of DataFrame
product_df2.printSchema()
# Display content of DataFrame
product_df2.show(5)

### Read JSON file into DataFrame
### Single Line JSON

In [None]:
%%bash
head -15 ./data/products_singleline.json

In [None]:
# Read single line JSON
# Each row is a JSON record, records are separated by new line
json_file_path = "./data/products_singleline.json"
json_data_df = spark.read.json(json_file_path)

In [None]:
# Display schema of DataFrame
json_data_df.printSchema()
# Display content of DataFrame
json_data_df.show(15)

## Multi-lines JSON and hass of Array

In [None]:
%%bash
head -20 ./data/products_multiline.json

In [None]:
# Read multi-line JSON
# JSON is an array of record, records are separated by a comma.
# each record is defined in multiple lines
json_file_path = "./data/products_multiline.json"
json_data_df = spark.read.json(json_file_path, multiLine=True)

In [None]:
# Display schema of DataFrame
json_data_df.printSchema()
# Display content of DataFrame
json_data_df.show(5)

## Save in Parquet file 

In [None]:
# write dataframe into parquet file First time execute second time will fail
parquet_file_path = "./data/products.parquet"
#json_data_df.write.parquet(parquet_file_path)

## Read parquet file into DataFrame

In [None]:
#Reading the Parquet fiels 
parquet_df = spark.read.parquet(parquet_file_path)

In [None]:
# Display schema of DataFrame
parquet_df.printSchema()
# Display content of DataFrame
parquet_df.show(5)

## 2. DataFrame Operation

In [None]:
%%bash
head -10 ./data/stocks.txt

In [None]:
# Load the synthetic data into a DataFrame
data_file_path = "./data/stocks.txt"
stocks_data_df = spark.read.csv(data_file_path, header=True, inferSchema=True)

In [None]:
stocks_data_df.printSchema()
stocks_data_df.show(10)

### Select: Choose specific columns.

In [None]:
# Select specific columns 
selected_columns = stocks_data_df.select("id", "name", "price")
print("Selected Columns:")
selected_columns.show(10)

### Filter: Apply conditions to filter rows.

In [None]:
# Filter rows based on a condition
filtered_data = stocks_data_df.filter(stocks_data_df.quantity > 15)
print("Filtered Data:", filtered_data.count())
filtered_data.show()

**GroupBy: Group data based on specific columns**

**Aggregations: Perform functions like sum, average, etc., on grouped data**

In [None]:
# GroupBy and Aggregations
grouped_data = stocks_data_df.groupBy("category").agg({"quantity": "sum", "price": "avg"})
print("Grouped and Aggregated Data:")
grouped_data.show()

print("Count Row no:",grouped_data.count())

### Join: Combine multiple DataFrames based on specified columns.

In [None]:
# Join with another DataFrame
df2 = stocks_data_df.select("id", "category").limit(15)
joined_data = stocks_data_df.join(df2, "id", "inner")
print("Joined Data:")
joined_data.show()

### Sort: Arrange rows based on one or more columns.

In [None]:
# Sort by a column
sorted_data = stocks_data_df.orderBy("price")
print("Sorted Data:")
sorted_data.show(10)

In [None]:
# Sort by a column desc
from pyspark.sql.functions import col, desc
sorted_data = stocks_data_df.orderBy(col("price").desc(), col("id").desc())
print("Sorted Data Descending:")
sorted_data.show(10)

## Distinct: Get unique rows.

In [None]:
# Get distinct product category
distinct_rows = stocks_data_df.select("category").distinct()
print("Distinct Product Categories:")
distinct_rows.show()

### Drop: Remove specified columns.

In [None]:
# Drop columns
dropped_columns = stocks_data_df.drop("quantity", "category")
print("Dropped Columns:")
dropped_columns.show(10)

## WithColumn: Add new calculated columns.

In [None]:
# Add a new calculated column
df_with_new_column = stocks_data_df.withColumn("revenue", stocks_data_df.quantity * stocks_data_df.price)
print("DataFrame with New Column:")
df_with_new_column.show(10)

### Alias: Rename columns for better readability.

In [None]:
# Rename columns using alias
df_with_alias = stocks_data_df.withColumnRenamed("price", "product_price").withColumnRenamed('category', 'Category_Name')
print("DataFrame with Aliased Column:")
df_with_alias.show(10)

## 3. Spark SQL Operation 

In [None]:
%%bash
head -10 ./data/persons.csv

In [None]:
# Load the synthetic data into a DataFrame
data_file_path = "./data/persons.csv"
person_data_df = spark.read.csv(data_file_path, header=True, inferSchema=True)

In [None]:
# Display schema of DataFrame
person_data_df.printSchema()
# Show the initial DataFrame
print("Initial DataFrame:")
person_data_df.show(10)

## Register the DataFrame as a Temporary Table

In [None]:
# Register the DataFrame as a Temporary Table
person_data_df.createOrReplaceTempView("person_table")

## Now, We Can Perform SQL-like Queries

In [None]:
# Select all rows where age is greater than 25
result = spark.sql("SELECT * FROM person_table WHERE age > 25")
result.show()

In [None]:
# Compute the average salary by gender
avg_salary_by_gender = spark.sql("SELECT gender, AVG(salary) as avg_salary FROM person_table GROUP BY gender")
avg_salary_by_gender.show()

## Creating and managing temporary views.

In [None]:
# Create a temporary view
person_data_df.createOrReplaceTempView("people")

In [None]:
# Query the temporary view
result = spark.sql("SELECT * FROM people WHERE age > 30")
result.show()

In [None]:
# Check if a temporary view exists
view_exists = spark.catalog.tableExists("people")
view_exists

In [None]:
# Drop a temporary view
spark.catalog.dropTempView("people")

In [None]:
# Check if a temporary view exists
view_exists = spark.catalog.tableExists("people")
view_exists

## SQL Subquries

In [None]:
# Create DataFrames
employee_data = [
    (1, "John"), (2, "Alice"), (3, "Bob"), (4, "Emily"),
    (5, "David"), (6, "Sarah"), (7, "Michael"), (8, "Lisa"),
    (9, "William")
]
employees = spark.createDataFrame(employee_data, ["id", "name"])

salary_data = [
    ("HR", 1, 60000), ("HR", 2, 55000), ("HR", 3, 58000),
    ("IT", 4, 70000), ("IT", 5, 72000), ("IT", 6, 68000),
    ("Sales", 7, 75000), ("Sales", 8, 78000), ("Sales", 9, 77000)
]
salaries = spark.createDataFrame(salary_data, ["department", "id", "salary"])

employees.show()

salaries.show()

In [None]:
# Register as temporary views
employees.createOrReplaceTempView("employees")
salaries.createOrReplaceTempView("salaries")

In [None]:
# Subquery to find employees with salaries above average
result = spark.sql("""
    SELECT name
    FROM employees
    WHERE id IN (
        SELECT id
        FROM salaries
        WHERE salary > (SELECT AVG(salary) FROM salaries)
    )
""")
result.show()

### Working with Window Functions in PySpark

#### Reference
https://www.analyticsvidhya.com/blog/2024/03/working-with-window-functions-in-pyspark/#:~:text=Approach%20for%20PySpark%20code,sal%E2%80%9D%20column%20in%20descending%20order.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [None]:
employee_salary = spark.sql("""
    select  salaries.*, employees.name
    from salaries 
    left join employees on salaries.id = employees.id
""")

employee_salary.show()

In [None]:
# Create a window specification
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))

In [None]:
# Calculate the rank of employees within each department based on salary
employee_salary.withColumn("rank", F.rank().over(window_spec)).show()

## Reference Link
https://www.analyticsvidhya.com/blog/2024/03/working-with-window-functions-in-pyspark/#:~:text=Approach%20for%20PySpark%20code,sal%E2%80%9D%20column%20in%20descending%20order

In [None]:
# Create Sample Dataframe
employees = [
    (7369, "SMITH", "CLERK", "17-Dec-80", 800, 20, 10),
    (7499, "ALLEN", "SALESMAN", "20-Feb-81", 1600, 300, 30),
    (7521, "WARD", "SALESMAN", "22-Feb-81", 1250, 500, 30),
    (7566, "JONES", "MANAGER", "2-Apr-81", 2975, 0, 20),
    (7654, "MARTIN", "SALESMAN", "28-Sep-81", 1250, 1400, 30),
    (7698, "BLAKE", "MANAGER", "1-May-81", 2850, 0, 30),
    (7782, "CLARK", "MANAGER", "9-Jun-81", 2450, 0, 10),
    (7788, "SCOTT", "ANALYST", "19-Apr-87", 3000, 0, 20),
    (7629, "ALEX", "SALESMAN", "28-Sep-79", 1150, 1400, 30),
    (7839, "KING", "PRESIDENT", "17-Nov-81", 5000, 0, 10),
    (7844, "TURNER", "SALESMAN", "8-Sep-81", 1500, 0, 30),
    (7876, "ADAMS", "CLERK", "23-May-87", 1100, 0, 20)    
]

In [None]:
# create dataframe
emp_df = spark.createDataFrame(employees, 
           ["empno", "ename", "job", "hiredate", "sal", "comm", "deptno"])
emp_df.show()


In [None]:
# Checking the schema
emp_df.printSchema()

In [None]:
emp_df.createOrReplaceTempView("emp")

In [None]:
# Using spark sql
rank_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) AS rank FROM emp""")
rank_df.show()


In [None]:
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
ranking_result_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', F.rank().over(windowSpec).alias('rank'))
ranking_result_df.show()

In [None]:
# Using Spark SQL
dense_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        DENSE_RANK() OVER (PARTITION BY deptno ORDER BY sal DESC) 
        AS dense_rank FROM emp""")
dense_df.show()

In [None]:
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
dense_ranking_df=emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', F.dense_rank().over(windowSpec).alias('dense_rank'))
dense_ranking_df.show()


In [None]:
# Using Spark SQL 
row_df = spark.sql(
        """SELECT empno, ename, job, deptno, sal, 
        ROW_NUMBER() OVER (PARTITION BY deptno ORDER BY sal DESC)
         AS row_num FROM emp """)
row_df.show()

In [None]:
# Using PySpark code
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
row_num_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.row_number().over(windowSpec).alias('row_num'))
row_num_df.show()

In [None]:
# Using Spark SQL
running_sum_df = spark.sql(
          """SELECT empno, ename, job, deptno, sal, 
          SUM(sal) OVER (PARTITION BY deptno ORDER BY sal DESC) 
          AS running_total FROM emp
          """)
running_sum_df.show()

In [None]:
# Using PySpar
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
running_sum_sal_df= emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
                         F.sum('sal').over(windowSpec).alias('running_total'))
running_sum_sal_df.show()

In [None]:
# Using Spark SQL
next_sal_df = spark.sql(
    """SELECT empno, ename, job, deptno, sal, LEAD(sal, 1) 
    OVER (PARTITION BY deptno ORDER BY sal DESC) AS next_val FROM emp
    """)
next_sal_df.show()

In [None]:
# Using PySpark
windowSpec = Window.partitionBy(col('deptno')).orderBy(col('sal').desc())
next_salary_df = emp_df.select('empno', 'ename', 'job', 'deptno', 'sal', 
               F.lead('sal', offset=1, default=0).over(windowSpec).alias('next_val'))
next_salary_df.show()

In [None]:
# Stop the SparkSession
# spark.stop()

### Reference

### pySpark Tutorials 

https://sparkbyexamples.com/pyspark/pyspark-read-csv-file-into-dataframe/

https://medium.com/codex/pyspark-for-begineers-part-2-pyspark-dataframe-60008da53e30

https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm

https://blog.devgenius.io/pyspark-for-begineers-part-3-pyspark-dataframe-db02f0fcd275


### Big Data Hadoop Setup & Configuration and Command Line Data processing 

https://medium.com/@jonty2245/install-apache-hadoop-on-windows-11-a-beginners-guide-45a149f47f8a

https://intellipaat.com/blog/tutorial/hadoop-tutorial/hdfs-operations/

https://www.projectpro.io/hadoop-tutorial/hadoop-hdfs-commands

https://www.geeksforgeeks.org/hdfs-commands/

https://medium.com/@ashwin_kumar_/hadoop-hdfs-commands-with-examples-and-usage-570038cbef07
