In [None]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [1 InRelease 3,626 B/3,0% [Connecting to archive.ubuntu.com (185.125.190.36)] [Waiting for headers] [Connecting to ppa.laun                                                                                                    Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [737 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:9 http://archive.

In [2]:
df=spark.read.format("csv").option("header", "true").load("sample_data/sales_data_sample.csv")

In [4]:
df.show(5);

+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|ORDERNUMBER|QUANTITYORDERED|PRICEEACH|ORDERLINENUMBER|  SALES|      ORDERDATE| STATUS|QTR_ID|MONTH_ID|YEAR_ID|PRODUCTLINE|MSRP|PRODUCTCODE|        CUSTOMERNAME|           PHONE|        ADDRESSLINE1|ADDRESSLINE2|         CITY|STATE|POSTALCODE|COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-----------+---------------+---------+---------------+-------+---------------+-------+------+--------+-------+-----------+----+-----------+--------------------+----------------+--------------------+------------+-------------+-----+----------+-------+---------+---------------+----------------+--------+
|      10107|             30|     95.7|              2|   2871| 2/24/2003 0:00|Shipped| 

In [5]:
from pyspark import SparkContext, SparkConf


In [13]:
# Initialize Spark context using getOrCreate()
sc = SparkContext.getOrCreate(SparkConf().setAppName("Sales Analysis"))

In [17]:
sales_rdd = sc.textFile("sample_data/sales_data_sample.csv").filter(lambda line: not line.startswith("ORDERNUMBER"))

In [18]:

# Example: Filter the dataset to get sales with a quantity greater than 40
filtered_sales_rdd = sales_rdd.filter(lambda line: int(line.split(',')[1]) > 40)


In [19]:
# Example: Calculate total sales amount
total_sales_amount = sales_rdd.map(lambda line: float(line.split(',')[4])).sum()
print("Total Sales Amount:", total_sales_amount)


Total Sales Amount: 10032628.849999994


In [20]:
# Count the number of records in the RDD
record_count = sales_rdd.count()
print("Number of records:", record_count)

Number of records: 2823


DATA CLEANING

In [22]:
spark = SparkSession.builder.appName("DataCleaning").getOrCreate()

In [24]:
df = spark.read.csv("sample_data/sales_data_sample.csv", header=True, inferSchema=True)


In [29]:
# Use describe to get summary statistics
summary = df.describe()

# Show the summary statistics
summary.show()


+-------+------------------+-----------------+------------------+-----------------+------------------+--------------+---------+------------------+------------------+------------------+------------+------------------+-----------+-----------------+--------------------+--------------------+------------+------------+--------+------------------+---------+---------+---------------+----------------+--------+
|summary|       ORDERNUMBER|  QUANTITYORDERED|         PRICEEACH|  ORDERLINENUMBER|             SALES|     ORDERDATE|   STATUS|            QTR_ID|          MONTH_ID|           YEAR_ID| PRODUCTLINE|              MSRP|PRODUCTCODE|     CUSTOMERNAME|               PHONE|        ADDRESSLINE1|ADDRESSLINE2|        CITY|   STATE|        POSTALCODE|  COUNTRY|TERRITORY|CONTACTLASTNAME|CONTACTFIRSTNAME|DEALSIZE|
+-------+------------------+-----------------+------------------+-----------------+------------------+--------------+---------+------------------+------------------+------------------+------

In [35]:


# Convert the count column from string to integer
count_col = [col for col in summary.columns if "count" in col]
if count_col:
    count_col = count_col[0]
    summary = summary.withColumn(count_col, summary[count_col].cast("int"))

    # Identify columns with missing values (count less than total rows)
    missing_columns = [col for col in df.columns if summary.where(summary[count_col] < df.count()).select(col).count() > 0]

    print("Columns with missing values:", missing_columns)
else:
    print("Count column not found in the summary DataFrame.")


Count column not found in the summary DataFrame.
