<a href="https://colab.research.google.com/github/rohith7livingston/Book-store/blob/main/ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Exp - 1
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, round, desc, when
import matplotlib.pyplot as plt

# 1Ô∏è‚É£ Create Spark Session
spark = SparkSession.builder.appName("SalesDataAnalysis").master("local[*]").getOrCreate()

# 2Ô∏è‚É£ Load CSV Dataset
df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

print("‚úÖ Original Data")
df.show()

# -------------------------------
# üîπ DATA OPERATIONS
# -------------------------------

# 3Ô∏è‚É£ Select specific columns
selected_df = df.select("Product", "Category", "Region", "Sales")
print("\nüîπ Selected Columns:")
selected_df.show()

# 4Ô∏è‚É£ Filter rows (Sales > 10000)
filtered_df = df.filter(df["Sales"] > 10000)
print("\nüîπ Filtered Rows (Sales > 10000):")
filtered_df.show()

# 5Ô∏è‚É£ Add a new column: total revenue = Sales * Quantity
df = df.withColumn("Revenue", df["Sales"] * df["Quantity"])
print("\nüîπ Added Revenue Column:")
df.show()

# 6Ô∏è‚É£ Rename a column
df = df.withColumnRenamed("Revenue", "Total_Revenue")
print("\nüîπ Renamed Revenue Column:")
df.show()

# 7Ô∏è‚É£ Sort data by Sales descending
sorted_df = df.orderBy(df["Sales"].desc())
print("\nüîπ Sorted by Sales (Descending):")
sorted_df.show()

# -------------------------------
# üîπ DATA ANALYSIS OPERATIONS
# -------------------------------

# 8Ô∏è‚É£ Group by Category and aggregate total + average sales
category_agg = df.groupBy("Category").agg(
    sum("Sales").alias("Total_Sales"),
    avg("Sales").alias("Avg_Sales")
)
print("\nüìä Total & Average Sales by Category:")
category_agg.show()

# 9Ô∏è‚É£ Group by Region: total quantity and average revenue
region_stats = df.groupBy("Region").agg(
    sum("Quantity").alias("Total_Quantity"),
    round(avg("Total_Revenue"), 2).alias("Avg_Revenue")
)
print("\nüìä Quantity & Avg Revenue by Region:")
region_stats.show()

# üîü Top 3 selling products by revenue
top3 = df.orderBy(desc("Total_Revenue")).limit(3)
print("\nüèÜ Top 3 Products by Revenue:")
top3.show()

# 11Ô∏è‚É£ Descriptive statistics
print("\nüìà Summary Statistics:")
df.describe(["Sales", "Quantity", "Total_Revenue"]).show()

# 12Ô∏è‚É£ Correlation between Sales and Quantity
correlation = df.stat.corr("Sales", "Quantity")
print(f"\nüîó Correlation between Sales and Quantity: {correlation:.3f}")

# -------------------------------
# üîπ Visualization
# -------------------------------
cat_pd = category_agg.toPandas()
region_pd = region_stats.toPandas()

# Total Sales by Category
plt.figure(figsize=(6,4))
plt.bar(cat_pd["Category"], cat_pd["Total_Sales"], color='lightblue', edgecolor='black')
plt.title("Total Sales by Category")
plt.xlabel("Category")
plt.ylabel("Total Sales")
plt.tight_layout()
plt.show()

# Average Revenue by Region
plt.figure(figsize=(6,4))
plt.bar(region_pd["Region"], region_pd["Avg_Revenue"], color='orange', edgecolor='black')
plt.title("Average Revenue by Region")
plt.xlabel("Region")
plt.ylabel("Average Revenue")
plt.tight_layout()
plt.show()

# Stop Spark session
spark.stop()



In [1]:
#exp2

#box1
import kagglehub
from pyspark.sql import SparkSession

#box2
spark = SparkSession.builder \
.appName("Supermarket Sales by Gender Analysis") \
.getOrCreate()

path = kagglehub.dataset_download("faresashraf1001/supermarket-sales")
csv_file_path = path

df = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load(csv_file_path)

gender_group = df.groupBy("Gender").count()
gender_group.show()
spark.stop()

SyntaxError: invalid character 'üìä' (U+1F4CA) (ipython-input-2928454268.py, line 142)

In [2]:
#Exp - 3
# Experiment 3 - Employee Data Analysis using PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, min, count
import matplotlib.pyplot as plt

# 1Ô∏è‚É£ Create Spark Session
spark = SparkSession.builder \
    .appName("Employee Data Analysis") \
    .master("local[*]") \
    .getOrCreate()

# 2Ô∏è‚É£ Load JSON dataset
file_path = "employees.json"
df = spark.read.json(file_path)

print("‚úÖ Employee Data Loaded Successfully\n")

# 3Ô∏è‚É£ Display the dataset
df.show()

# 4Ô∏è‚É£ Display schema
df.printSchema()

# ------------------------------
# üîπ DATA MANIPULATION OPERATIONS
# ------------------------------

# 5Ô∏è‚É£ Filtering: employees with salary > 50000
high_salary = df.filter(df["salary"] > 50000)
print("\nüí∞ Employees with Salary > 50000:")
high_salary.show()

# 6Ô∏è‚É£ Grouping: group by department and find average salary
dept_avg_salary = df.groupBy("department").agg(avg("salary").alias("avg_salary"))
print("\nüìä Average Salary by Department:")
dept_avg_salary.show()

# 7Ô∏è‚É£ Sorting: sort by salary (descending)
sorted_df = df.orderBy(df["salary"].desc())
print("\n‚¨áÔ∏è Employees Sorted by Salary:")
sorted_df.show()

# 8Ô∏è‚É£ Aggregation: count employees per department
dept_count = df.groupBy("department").agg(count("*").alias("employee_count"))
print("\nüë• Number of Employees per Department:")
dept_count.show()

# ------------------------------
# üîπ VISUALIZATION
# ------------------------------

# Convert to Pandas for plotting
dept_avg_pd = dept_avg_salary.toPandas()
dept_count_pd = dept_count.toPandas()

# --- Bar Chart: Average Salary by Department ---
plt.figure(figsize=(6,4))
plt.bar(dept_avg_pd["department"], dept_avg_pd["avg_salary"], color='skyblue', edgecolor='black')
plt.title("Average Salary by Department")
plt.xlabel("Department")
plt.ylabel("Average Salary")
plt.tight_layout()
plt.show()

# --- Pie Chart: Employee Count by Department ---
plt.figure(figsize=(6,6))
plt.pie(
    dept_count_pd["employee_count"],
    labels=dept_count_pd["department"],
    autopct="%1.1f%%",
    startangle=140,
    colors=['lightcoral', 'lightgreen', 'lightskyblue', 'gold']
)
plt.title("Employee Distribution by Department")
plt.tight_layout()
plt.show()

# ------------------------------
# üîπ Stop Spark Session
# ------------------------------
spark.stop()
print("\n‚úÖ Spark Session stopped successfully.")


In [None]:
#exp4
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
# Step 1: Create Spark Session
spark = SparkSession.builder.appName("Spark SQL Operations").getOrCreate()
# Step 2: Create a DataFrame
data = [
(1, "Lakshmi", "HR", 50000),
(2, "Subbu", "IT", 60000),
(3, "Gowri", "HR", 55000),
(4, "Durga", "Finance", 70000),
(5, "Ganesh", "IT", 75000)
]
columns = ["id", "name", "department", "salary"]
df = spark.createDataFrame(data, columns)
# Step 3: Register the DataFrame as a Temporary View
df.createOrReplaceTempView("employees")
# Step 4: Write SQL Queries
# 4.1 Select all records
result_all = spark.sql("SELECT * FROM employees")
result_all.show()
# 4.2 Filter employees with salary greater than 60000
result_filter = spark.sql("SELECT * FROM employees WHERE salary > 60000")
result_filter.show()
# 4.3 Group by department and calculate average salary
result_group = spark.sql("SELECT department, AVG(salary) as avg_salary FROM employees GROUP BY department")
result_group.show()
# 4.4 Order employees by salary in descending order
result_order = spark.sql("SELECT * FROM employees ORDER BY salary DESC")
result_order.show()
# 4.5 Find the maximum salary
result_max = spark.sql("SELECT MAX(salary) as max_salary FROM employees")
result_max.show()
# 4.6 Visualization: Average Salary by Department
pandas_group = result_group.toPandas()
plt.bar(pandas_group['department'], pandas_group['avg_salary'], color='purple')
plt.xlabel('Department')
plt.ylabel('Average Salary')
plt.title('Average Salary by Department (SQL)')
plt.xticks(rotation=45)
plt.show()
# 4.7 Visualization: Employee Salary Distribution
pandas_order = result_order.toPandas()
plt.bar(pandas_order['name'], pandas_order['salary'], color='pink')
plt.xlabel('Employee Name')
plt.ylabel('Salary')
plt.title('Employee Salary Distribution (SQL)')
plt.xticks(rotation=45)
plt.show()
# Optional: Stop the Spark Session
spark.stop()

In [None]:
# ‚úÖ Experiment 5 - Building a Data Pipeline with Apache Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, round

# 1Ô∏è‚É£ Initialize Spark Session
spark = SparkSession.builder \
    .appName("Sales Data Pipeline") \
    .master("local[*]") \
    .getOrCreate()

# ----------------------------
# üîπ STAGE 1: Extract (Read Raw Data)
# ----------------------------

# Example raw CSV dataset: "sales_data.csv"
# Columns: id, product, category, region, sales, quantity

data = [
    (1, "Laptop", "Electronics", "North", 60000, 2),
    (2, "Mobile", "Electronics", "South", 25000, 1),
    (3, "Chair", "Furniture", "East", 8000, 3),
    (4, "Desk", "Furniture", "West", 12000, 2),
    (5, "Headphones", "Electronics", "North", 5000, 5),
    (6, "Sofa", "Furniture", "East", 20000, 1),
    (7, "Monitor", "Electronics", "South", 15000, 2),
    (8, "Table", "Furniture", "North", 9000, 4)
]

columns = ["id", "product", "category", "region", "sales", "quantity"]

df_raw = spark.createDataFrame(data, columns)
print("‚úÖ Raw Data Loaded (Extract Stage):")
df_raw.show()

# ----------------------------
# üîπ STAGE 2: Transform (Clean & Process)
# ----------------------------

# Example transformations:
#  - Remove records with null values
#  - Filter high-value transactions (sales > 10000)
#  - Compute average and total sales by category and region

# Clean data
df_clean = df_raw.na.drop()

# Filter
df_filtered = df_clean.filter(df_clean["sales"] > 10000)

# Aggregate
df_summary = df_filtered.groupBy("category", "region").agg(
    sum("sales").alias("total_sales"),
    round(avg("sales"), 2).alias("avg_sales"),
    count("*").alias("transaction_count")
)

print("\nüìä Transformed Data (Transform Stage):")
df_summary.show()

# ----------------------------
# üîπ STAGE 3: Load (Save Processed Data)
# ----------------------------

output_path = "output/sales_summary"
df_summary.write.mode("overwrite").csv(output_path, header=True)

print(f"\n‚úÖ Processed data written to: {output_path}")

# ----------------------------
# üîπ Stop Spark
# ----------------------------
spark.stop()
print("\n‚úÖ Data pipeline executed successfully.")


In [None]:
#exp6
from pyspark.sql import SparkSession

# 1Ô∏è‚É£ Create Spark session
spark = SparkSession.builder.appName("SupermarketSalesSQL").getOrCreate()

# 2Ô∏è‚É£ Read CSV file
df = spark.read.csv("/content/supermarket_sales.csv", header=True, inferSchema=True)

# 3Ô∏è‚É£ Display schema and first rows
print("üìÑ Schema of dataset:")
df.printSchema()

print("\nüìÑ First 10 rows from CSV:")
df.show(10)

# 4Ô∏è‚É£ Save as Parquet
df.write.mode("overwrite").parquet("supermarket_sales_parquet")

# 5Ô∏è‚É£ Register as SQL table
df.createOrReplaceTempView("sales")

# 6Ô∏è‚É£ SQL Query 1: Total Sales per Product line
print("\nüìä Total Sales per Product line:")
spark.sql("""
    SELECT `Product line`, ROUND(SUM(Total), 2) AS Total_Sales
    FROM sales
    GROUP BY `Product line`
    ORDER BY Total_Sales DESC
""").show()

# 7Ô∏è‚É£ SQL Query 2: Average Sales per Branch (since no gross income column)
print("\nüè¢ Average Sales per Branch:")
spark.sql("""
    SELECT Branch, ROUND(AVG(Total), 2) AS Avg_Sales
    FROM sales
    GROUP BY Branch
    ORDER BY Avg_Sales DESC
""").show()

# 8Ô∏è‚É£ SQL Query 3: Number of Transactions by City (instead of Payment)
print("\nüåÜ Number of Transactions by City:")
spark.sql("""
    SELECT City, COUNT(*) AS Transactions
    FROM sales
    GROUP BY City
    ORDER BY Transactions DESC
""").show()


In [None]:
# ‚úÖ Experiment 7 - Developing a Parquet Table into a Data Platform Container

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, round

# 1Ô∏è‚É£ Create Spark Session
spark = SparkSession.builder \
    .appName("Develop Parquet Table to Data Container") \
    .master("local[*]") \
    .getOrCreate()

# -----------------------------
# üîπ Step 1: Create Sample Data
# -----------------------------
data = [
    (1, "Laptop", "Electronics", "North", 60000, 2),
    (2, "Mobile", "Electronics", "South", 25000, 1),
    (3, "Chair", "Furniture", "East", 8000, 3),
    (4, "Desk", "Furniture", "West", 12000, 2),
    (5, "Headphones", "Electronics", "North", 5000, 5),
    (6, "Sofa", "Furniture", "East", 20000, 1),
    (7, "Monitor", "Electronics", "South", 15000, 2),
    (8, "Table", "Furniture", "North", 9000, 4)
]
columns = ["id", "product", "category", "region", "sales", "quantity"]

df = spark.createDataFrame(data, columns)
print("‚úÖ Sample DataFrame Created:\n")
df.show()

# -----------------------------
# üîπ Step 2: Transform the Data
# -----------------------------
df_summary = df.groupBy("category", "region").agg(
    round(avg("sales"), 2).alias("avg_sales"),
    sum("sales").alias("total_sales"),
    count("*").alias("transactions")
)

print("\nüìä Aggregated Sales Summary:")
df_summary.show()

# -----------------------------
# üîπ Step 3: Define Data Container Path (Local or Cloud)
# -----------------------------

# üóÇÔ∏è Local data container (you can replace this path with S3, ADLS, etc.)
data_container_path = "data_container/sales_data/"

# -----------------------------
# üîπ Step 4: Write DataFrame as Parquet Table into the Container
# -----------------------------
df_summary.write \
    .mode("overwrite") \
    .partitionBy("category") \
    .parquet(data_container_path)

print(f"\n‚úÖ Parquet Table successfully written to Data Container: {data_container_path}")

# -----------------------------
# üîπ Step 5: Read from the Parquet Container (Verify)
# -----------------------------
print("\nüì¶ Reading Parquet Table from Data Container:")
parquet_df = spark.read.parquet(data_container_path)
parquet_df.show()

# -----------------------------
# üîπ Step 6: Register as SQL Table
# -----------------------------
parquet_df.createOrReplaceTempView("sales_summary")

print("\nüß† Running SQL Query on Container Table:")
spark.sql("""
    SELECT category, region, total_sales
    FROM sales_summary
    WHERE total_sales > 20000
    ORDER BY total_sales DESC
""").show()

# -----------------------------
# üîπ Step 7: Stop Spark Session
# -----------------------------
spark.stop()
print("\n‚úÖ Spark Session stopped successfully.")


In [None]:
# ‚úÖ Experiment 8 - Running SQL Queries on Data in a NoSQL (MongoDB) Table

from pyspark.sql import SparkSession

# 1Ô∏è‚É£ Create Spark Session with MongoDB Connector
spark = SparkSession.builder \
    .appName("Run SQL Queries on NoSQL Table") \
    .master("local[*]") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/company.employees") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/company.output") \
    .getOrCreate()

# -----------------------------
# üîπ Step 2: Read Data from MongoDB Collection
# -----------------------------
df = spark.read.format("mongo").load()

print("‚úÖ Data successfully loaded from MongoDB collection 'employees':")
df.show()

# -----------------------------
# üîπ Step 3: Register as Temporary View for SQL Queries
# -----------------------------
df.createOrReplaceTempView("employees")

# -----------------------------
# üîπ Step 4: Run SQL Queries
# -----------------------------

# üü¢ Query 1: Show All Employees
print("\nüü¢ Query 1: SELECT * FROM employees")
spark.sql("SELECT * FROM employees").show()

# üü¢ Query 2: Employees with salary > 50000
print("\nüü¢ Query 2: Employees with Salary > 50000")
spark.sql("""
    SELECT name, department, salary
    FROM employees
    WHERE salary > 50000
    ORDER BY salary DESC
""").show()

# üü¢ Query 3: Average Salary by Department
print("\nüü¢ Query 3: Average Salary by Department")
spark.sql("""
    SELECT department, ROUND(AVG(salary), 2) AS avg_salary
    FROM employees
    GROUP BY department
""").show()

# üü¢ Query 4: Count Employees per Department
print("\nüü¢ Query 4: Count of Employees by Department")
spark.sql("""
    SELECT department, COUNT(*) AS emp_count
    FROM employees
    GROUP BY department
""").show()

# -----------------------------
# üîπ Step 5: Stop Spark Session
# -----------------------------
spark.stop()
print("\n‚úÖ Spark Session stopped successfully.")


In [None]:
#Exp - 9
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col

# 1Ô∏è‚É£ Create Spark Session with Delta Lake Support
spark = SparkSession.builder \
    .appName("Modify Delta Lake Table") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# 2Ô∏è‚É£ Define Delta Table Path
path = "delta_table/employees"

# 3Ô∏è‚É£ Create Sample Table (if not exists)
data = [
    (1, "Alice", "HR", 45000),
    (2, "Bob", "IT", 75000),
    (3, "Charlie", "Finance", 55000)
]
cols = ["id", "name", "department", "salary"]

df = spark.createDataFrame(data, cols)
df.write.format("delta").mode("overwrite").save(path)
print("‚úÖ Delta Table Created Successfully")

# 4Ô∏è‚É£ Load Existing Delta Table
deltaTable = DeltaTable.forPath(spark, path)
print("\nüìÑ Original Table:")
deltaTable.toDF().show()

# 5Ô∏è‚É£ Update: Increase salary by 10% for IT dept
deltaTable.update(
    condition=col("department") == "IT",
    set={"salary": col("salary") * 1.10}
)

# 6Ô∏è‚É£ Insert: Add a new employee record
new_data = [(4, "David", "Sales", 50000)]
new_df = spark.createDataFrame(new_data, cols)
new_df.write.format("delta").mode("append").save(path)

# 7Ô∏è‚É£ Delete: Remove employees with salary < 48000
deltaTable.delete(condition=col("salary") < 48000)

# 8Ô∏è‚É£ Show Final Updated Table
print("\n‚úÖ Updated Delta Table:")
deltaTable.toDF().show()

# 9Ô∏è‚É£ Stop Spark Session
spark.stop()
print("\n‚úÖ Spark Session Stopped Successfully.")


In [None]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("DeltaLakeExample") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Create sample data
data = [(1, "Alice", 45000), (2, "Bob", 55000)]
cols = ["id", "name", "salary"]
df = spark.createDataFrame(data, cols)

# Write as Delta table
df.write.format("delta").mode("overwrite").save("delta_table/employees")

# Read Delta table
df_read = spark.read.format("delta").load("delta_table/employees")
df_read.show()

# Update Delta table
deltaTable = DeltaTable.forPath(spark, "delta_table/employees")
deltaTable.update(col("name") == "Alice", {"salary": col("salary") + 5000})

# Show updated data
deltaTable.toDF().show()

spark.stop()
