In [1]:
#importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, format_number, col


# defining what goes into our main construct
if __name__ == "__main__":
    
    #getting the sparkcontext from the sparksession and setting log level
    session = SparkSession.builder.appName('emretl').getOrCreate()
    sc = session.sparkContext
    sc.setLogLevel('ERROR')
    
    #creating a dataframe reader
    dataframereader = session.read
    sales_data = dataframereader \
        .option("header", "true") \
        .option("inferSchema", value = True) \
        .csv("sales_data.csv")
    
    #printing out the schema of our data
    print("===Schema ===")
    sales_data.printSchema()

    #previewing top 20 rows of our data
    sales_data.show()
    
    #checking the data types of the various columns
    sales_data.dtypes
    
    #specifying selected columns and the data type we want to cast them to
    columns_to_cast = {
    "sales": "int", 
    "quantity": "int",
    "discount": "double",
    "profit": "double",
    "shipping_cost": "double"    
}
    
    #casting columns using a simple for loop
    for column, data_type in columns_to_cast.items():
        if column in sales_data.columns:
            sales_data = sales_data.withColumn(column, col(column).cast(data_type))
            
    
    #showing the total sales and profit for each market
    print("=======Showing the amount of sales for each market in descending order======" )
    sales_data.groupBy("market") \
        .agg(
            sum("sales").alias("Total_Sales"), \
            sum("profit").alias("Total_Profit") \
        ).orderBy("Total_Profit", ascending=False).show()
    
    
    #showing the total sales and profit for each market at sub-cat level
    print("=======Saving the amount of sales and profit for each market at the sub category level======" )
    
    profit_and_sales_at_sub_cat = \
    (sales_data.groupBy("market","sub_category") 
        .agg(
            sum("quantity").alias("Number_Of_Items"),
            sum("sales").alias("TotalSales"), 
            sum("profit").alias("Profit_Made")
        ) 
        .orderBy("Profit_Made", ascending=False))

===Schema ===
root
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- ship_date: string (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market: string (nullable = true)
 |-- region: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- sales: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- profit: double (nullable = true)
 |-- shipping_cost: double (nullable = true)
 |-- order_priority: string (nullable = true)
 |-- year: string (nullable = true)

+---------------+----------+---------+--------------+-----------------+-----------+-----------------+--------------+------+-----

In [2]:
#showing the results of our aggregation
profit_and_sales_at_sub_cat.show()

+------+------------+---------------+----------+------------------+
|market|sub_category|Number_Of_Items|TotalSales|       Profit_Made|
+------+------------+---------------+----------+------------------+
|  APAC|      Phones|           2631|    207396| 81314.05619999998|
|  APAC|     Copiers|           2385|    246704|        80854.0452|
|  APAC|   Bookcases|           2361|    224562| 67656.50850000003|
|  APAC|      Chairs|           3594|    276254| 63779.61820000002|
|    EU|   Bookcases|           1785|    181844|56407.269000000015|
|    EU|     Copiers|           1733|    178332|56199.653999999995|
|    US|     Copiers|            234|     19860|        55617.8249|
|    EU|  Appliances|           1187|     67400|46337.433000000005|
|    US|      Phones|           3509|    214721|        44449.0791|
|  APAC|  Appliances|           1310|     81650| 42128.70900000001|
|    US| Accessories|           2976|    119408| 41936.63570000002|
| LATAM|     Copiers|           2211|    226741|