<a href="https://colab.research.google.com/github/suwarnalatha-m/Task-1-Big-Data-Analysis/blob/main/Task1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install PySpark
!pip install pyspark

In [None]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
import matplotlib.pyplot as plt
from IPython.display import display

In [None]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("CODTECH Big Data Analysis") \
    .getOrCreate()

print("Spark Session Created Successfully")

In [None]:
# To check the format of the files
!ls /content

In [None]:
# CONVERTING Excel → CSV
df_excel = pd.read_excel("/content/Online Retail.xlsx")
df_excel.to_csv("/content/online_retail.csv", index=False)

print("Conversion Completed")

In [None]:
# To check the file is in csv format
!ls /content

In [None]:
# Loading Dataset using Pyspark
df = spark.read.csv(
    "/content/online_retail.csv",
    header=True,
    inferSchema=True
)

In [None]:
# Preview
display(
    df.limit(10).toPandas().style
    .set_caption("Online Retail Dataset Preview")
    .background_gradient(cmap="Blues")
)

In [None]:
print("Total Rows:", df.count())
print("Total Columns:", len(df.columns))

In [None]:
# Data Cleaning
df_clean = df.dropna()
df_clean = df_clean.dropDuplicates()

print("Rows after cleaning:", df_clean.count())

In [None]:
# Revenue Column
df_clean = df_clean.withColumn(
    "Revenue",
    col("Quantity") * col("UnitPrice")
)
# Preview
display(
    df_clean.select("Description","Quantity","UnitPrice","Revenue")
    .limit(10)
    .toPandas()
)

In [None]:
# Top Selling Products
top_products = df_clean.groupBy("Description") \
    .sum("Quantity") \
    .orderBy(col("sum(Quantity)").desc())

# Preview
display(
    top_products.limit(10).toPandas()
    .style.background_gradient(cmap="Greens")
    .set_caption("Top Selling Products")
)

In [None]:
# Country-wise Revenue
country_sales = df_clean.groupBy("Country") \
    .sum("Revenue") \
    .orderBy(col("sum(Revenue)").desc())
# Preview
display(
    country_sales.limit(10).toPandas()
    .style.background_gradient(cmap="Oranges")
    .set_caption("Revenue by Country")
)


In [None]:
# Monthly Sales Trend
# Extract month
df_clean = df_clean.withColumn(
    "Month",
    month("InvoiceDate")
)
# Aggregate
monthly_sales = df_clean.groupBy("Month") \
    .sum("Revenue") \
    .orderBy("Month")
# Convert for visualization
monthly_pd = monthly_sales.toPandas()

In [None]:
!pip install plotly
import plotly.express as px

In [None]:
# Monthly Revenue Trend
fig = px.line(
    monthly_pd,
    x="Month",
    y="sum(Revenue)",
    markers=True,
    title="Interactive Monthly Revenue Trend"
)

fig.update_layout(
    xaxis_title="Month",
    yaxis_title="Revenue"
)

fig.show()

In [None]:
# Market Contribution --> Country Revenue
fig = px.bar(
    country_pd,
    x="sum(Revenue)",
    y="Country",
    orientation="h",
    title="Interactive Revenue by Country",
    text="sum(Revenue)"
)

fig.show()

In [None]:
# Top Products
fig = px.bar(
    top_products_pd,
    x="sum(Quantity)",
    y="Description",
    orientation="h",
    title="Top Selling Products"
)

fig.update_layout(yaxis={'categoryorder':'total ascending'})
fig.show()

In [None]:
# User interactive country wise transactions
from ipywidgets import interact
from IPython.display import display

@interact(country=country_pd["Country"].tolist())
def show_country(country):

    # Filter data
    filtered_df = (
        df_clean
        .filter(col("Country") == country)
        .select("Description", "Quantity", "UnitPrice", "Revenue")
        .limit(10)
        .toPandas()
    )

    # Professional styled table
    styled_table = (
        filtered_df.style
        .set_caption(f"Top Transactions — {country}")
        .set_table_styles([
            {"selector": "th",
             "props": [("background-color", "#2c3e50"),
                       ("color", "white"),
                       ("text-align", "center")]},
            {"selector": "td",
             "props": [("text-align", "center")]},
            {"selector": "caption",
             "props": [("font-size", "16px"),
                       ("font-weight", "bold")]}
        ])
        .set_properties(**{
            "background-color": "white",
            "border": "1px solid #ddd"
        })
    )

    display(styled_table)

In [None]:
# SCALABILITY DEMONSTRATION USING PYSPARK

import pandas as pd
import time
from pyspark.sql.functions import spark_partition_id

print("\n--- Scalability Demonstration Started ---")

# Check Original Number of Partitions
print("\nOriginal Partitions:",
      df.rdd.getNumPartitions())

# Increase Partitions (Simulating Scalability)
df_scaled = df.repartition(8)

print("Partitions After Scaling:",
      df_scaled.rdd.getNumPartitions())

# Pandas Execution Timing (Single Machine Processing)
start = time.time()

pandas_df = pd.read_csv("/content/online_retail.csv")
pandas_result = (
    pandas_df.groupby("Country")["Quantity"]
    .sum()
)

end = time.time()

print("\nPandas Total Execution Time:",
      f"{end - start:.3f} seconds")


# PySpark Execution Timing (Original Dataset)

start = time.time()

df.groupBy("Country").sum("Quantity").show()

end = time.time()

print("PySpark Execution Time (Original):",
      f"{end - start:.3f} seconds")

# PySpark Execution Timing AFTER SCALING

start = time.time()

df_scaled.groupBy("Country").sum("Quantity").show()

end = time.time()

print("PySpark Execution Time (Scaled):",
      f"{end - start:.3f} seconds")
# Show Partition Distribution (Proof of Parallelism)

print("\nPartition Distribution:")
df_scaled.groupBy(spark_partition_id()).count().show()



# Spark Execution Plan (Distributed Processing Proof)

print("\nExecution Plan:")
df_clean.groupBy("Country").sum("Revenue").explain(True)

print("\n--- Scalability Demonstration Completed ---")

**Analytics:**

1.Sales Performance Analysis

2.Revenue Trends

3.Interactive Exploration

4.Big Data Processing


**Outcome:**

This work demonstrated big data analysis using PySpark to clean, transform, and analyze retail transaction data for business insights.
Scalability was shown through distributed processing and a performance comparison with Pandas, highlighting differences between single-machine and distributed frameworks.
While Pandas performed faster on small data, PySpark proved more suitable for large-scale analytics requiring scalability and efficient processing.
