# Setting Up Pyspark

In [2]:
!apt-get update
!apt-get install openjdk-11-jdk -y
!pip install pyspark

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:7 http://security.ubuntu.com/ubuntu jammy-security/multiverse amd64 Packages [56.4 kB]
Get:8 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,682 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,668 kB]
Get:11 http://archive.ubuntu.com/ubuntu jammy-updates/multiverse amd64 Packages [64.2 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [3,774 kB]
Hit:13 https://ppa.launchpadcontent.net/ubuntugis/ppa

In [3]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"


# Working with PySpark

First things first, we need to get or create a spark session
## Set a Spark session
The SparkSession is the entry point for high-level Spark functionality.
it is initiated using the `SparkSession.builder`

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType

In [5]:
spk = SparkSession.builder.master("local[*]").appName('globalSales').getOrCreate()

In [12]:
print(spk.version)

3.5.5


Then we read in the csv file using the `spark.read.csv`

In [13]:
global_df = spk.read.csv("global_sales_records.csv", header=True, inferSchema=True)

global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [14]:
# get more info about the dataset
print(f"Dataset Shape\nRows: {global_df.count():,}\nColumns: {len(global_df.columns)}")

Dataset Shape
Rows: 100,000
Columns: 14


In [23]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Ship_Date: string (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Unit_Cost: double (nullable = true)
 |-- Total_Revenue: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Total_Profit: double (nullable = true)



In [20]:
# Rename the columns, to remove the spaces
def rename_columns_remove_spaces(df: DataFrame) -> DataFrame:
    """Renames columns in a PySpark DataFrame to replace spaces with underscores."""
    for col_name in df.columns:
        new_col_name = col_name.replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

global_df = rename_columns_remove_spaces(global_df)

In [21]:
global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [15]:
# Get null values if any
null_counts = global_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for c in global_df.columns])
null_counts.show()

+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|Region|Country|Item Type|Sales Channel|Order Priority|Order Date|Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|     0|      0|        0|            0|             0|         0|       0|        0|         0|         0|        0|            0|         0|           0|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+



In [22]:
# Data Transformation
# Tranfrom the order and ship date
global_df = global_df.withColumn("Order_Date", F.to_date(F.col("Order_Date"), "M/d/yyyy"))
# assignment


### Top-performing regions by revenue


In [35]:
region_perf_rev = global_df.groupBy("Region").agg(F.sum("Total_Revenue") \
                                                  .alias("Revenue_Generated")) \
                                                  .orderBy(F.desc("Revenue_Generated"))

region_perf_rev = region_perf_rev.withColumn("Revenue_Generated",
                                             F.format_number(F.col("Revenue_Generated"), 2))
region_perf_rev.show()

+--------------------+-----------------+
|              Region|Revenue_Generated|
+--------------------+-----------------+
|  Sub-Saharan Africa|34,958,453,406.17|
|              Europe|34,241,150,923.39|
|                Asia|19,293,401,219.82|
|Middle East and N...|16,921,412,794.52|
|Central America a...|14,553,730,165.29|
|Australia and Oce...|10,701,522,223.73|
|       North America| 2,937,002,333.49|
+--------------------+-----------------+



In [36]:
rg_pd = region_perf_rev.toPandas()
rg_pd

Unnamed: 0,Region,Revenue_Generated
0,Sub-Saharan Africa,34958453406.17
1,Europe,34241150923.39
2,Asia,19293401219.82
3,Middle East and North Africa,16921412794.52
4,Central America and the Caribbean,14553730165.29
5,Australia and Oceania,10701522223.73
6,North America,2937002333.49


### Underperforming countries by profit


### Sales channel comparison (online vs. offline) by Revenue
