In [217]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit,from_json,get_json_object,sum,avg
from pyspark.sql.types import StructType,StructField, StringType,IntegerType


spark = SparkSession.builder.appName("q1 answer").getOrCreate()

In [218]:
spark

In [219]:
schema = StructType([ \
    StructField("OrderId",StringType(),True), \
    StructField("OrderItemId",StringType(),True), \
    StructField("QuantityOrdered",IntegerType(),True), \
    StructField("ItemPrice", IntegerType(), True), \
    StructField("PromotionDiscount", StringType(), True), \
    StructField("batch_id", IntegerType(), True) \
  ])
 

In [220]:
promoSchema = StructType([ \
    StructField("CurrencyCode",StringType(),True), \
    StructField("Amount",StringType(),True)
  ])

In [221]:
df1 = spark.read.options(header=True, quote = '"').option('escape','"').schema(schema).csv("order_region_a.csv")

In [222]:
df2 = spark.read.options(header=True).option('escape','"').schema(schema).csv("order_region_b.csv")

In [223]:
df1 = df1.withColumn("region",lit("A"))
df2 = df2.withColumn("region",lit("B"))


In [224]:
df1.show(truncate=False,n=5)
df1.count()

+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+
|OrderId            |OrderItemId|QuantityOrdered|ItemPrice|PromotionDiscount                         |batch_id|region|
+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+
|171-0001135-1657958|1.11689E+13|1              |949      |{ "CurrencyCode": "INR", "Amount": "10"}  |359     |A     |
|171-0001497-9165123|1.97603E+13|1              |699      |{ "CurrencyCode": "INR", "Amount": "10.1"}|1135    |A     |
|171-0002127-1363507|5.94976E+12|1              |399      |{ "CurrencyCode": "INR", "Amount": "10"}  |297     |A     |
|171-0002370-0601169|5.75719E+13|1              |499      |{ "CurrencyCode": "INR", "Amount": "10.1"}|114     |A     |
|171-0004526-2028348|3.38513E+13|1              |1699     |{ "CurrencyCode": "INR", "Amount": "10"}  |764     |A     |
+-------------------+-----------+---------------

44494

In [225]:
df2.show(truncate=False,n=5)
df2.count()

+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+
|OrderId            |OrderItemId|QuantityOrdered|ItemPrice|PromotionDiscount                         |batch_id|region|
+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+
|171-0001135-1657958|1.11689E+13|1              |949      |{ "CurrencyCode": "INR", "Amount": "10"}  |359     |B     |
|171-0001497-9165123|1.97603E+13|1              |699      |{ "CurrencyCode": "INR", "Amount": "10.1"}|1135    |B     |
|171-0002127-1363507|5.94976E+12|1              |399      |{ "CurrencyCode": "INR", "Amount": "10"}  |297     |B     |
|171-0002370-0601169|5.75719E+13|1              |499      |{ "CurrencyCode": "INR", "Amount": "10.1"}|114     |B     |
|171-0004526-2028348|3.38513E+13|1              |1699     |{ "CurrencyCode": "INR", "Amount": "10"}  |764     |B     |
+-------------------+-----------+---------------

44494

In [226]:
df = df1.union(df2)

In [227]:
#total records
df.count()

88988

In [228]:
#df = df.withColumn("values",from_json(df["PromotionDiscount"],promoSchema))

In [229]:
df = df.withColumn("discount",get_json_object(df["PromotionDiscount"],'$.Amount'))

In [230]:
df = df.withColumn("discount",df["discount"].cast('float'))

In [231]:
df = df.withColumn("total_sales",df["QuantityOrdered"] * df["ItemPrice"])

In [232]:
df = df.filter(df["total_sales"] >= 0)

In [233]:
df.show(truncate = False,n=5)

+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+
|OrderId            |OrderItemId|QuantityOrdered|ItemPrice|PromotionDiscount                         |batch_id|region|discount|total_sales|
+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+
|171-0001135-1657958|1.11689E+13|1              |949      |{ "CurrencyCode": "INR", "Amount": "10"}  |359     |A     |10.0    |949        |
|171-0001497-9165123|1.97603E+13|1              |699      |{ "CurrencyCode": "INR", "Amount": "10.1"}|1135    |A     |10.1    |699        |
|171-0002127-1363507|5.94976E+12|1              |399      |{ "CurrencyCode": "INR", "Amount": "10"}  |297     |A     |10.0    |399        |
|171-0002370-0601169|5.75719E+13|1              |499      |{ "CurrencyCode": "INR", "Amount": "10.1"}|114     |A     |10.1    |499        |
|171-0004526-2028348

In [234]:
df = df.withColumn("net_sales",df["total_sales"] - df["discount"])

In [235]:
df.show(truncate=False,n=5)

+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+---------+
|OrderId            |OrderItemId|QuantityOrdered|ItemPrice|PromotionDiscount                         |batch_id|region|discount|total_sales|net_sales|
+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+---------+
|171-0001135-1657958|1.11689E+13|1              |949      |{ "CurrencyCode": "INR", "Amount": "10"}  |359     |A     |10.0    |949        |939.0    |
|171-0001497-9165123|1.97603E+13|1              |699      |{ "CurrencyCode": "INR", "Amount": "10.1"}|1135    |A     |10.1    |699        |688.9    |
|171-0002127-1363507|5.94976E+12|1              |399      |{ "CurrencyCode": "INR", "Amount": "10"}  |297     |A     |10.0    |399        |389.0    |
|171-0002370-0601169|5.75719E+13|1              |499      |{ "CurrencyCode": "INR", "Amount": "10.1"

In [236]:
##  total sales based on region 
df_regionSales = df.groupBy("region").agg(sum("total_sales").alias("Region Sales"))
df_regionSales.show(truncate=False,n=5)




+------+------------+
|region|Region Sales|
+------+------------+
|A     |37210388    |
|B     |37210388    |
+------+------------+



In [237]:
##  net sales based on region 
df_regionSales = df.groupBy("region").agg(sum("net_sales").alias("net Region Sales"))
df_regionSales.show(truncate=False,n=5)

+------+--------------------+
|region|net Region Sales    |
+------+--------------------+
|A     |3.6767371867889404E7|
|B     |3.6767371867889404E7|
+------+--------------------+



In [238]:
## avgerage sales based on total sales
df_avgTotalSale = df.agg(avg("total_sales"))
df_avgTotalSale.show(truncate=False,n=5)

+----------------+
|avg(total_sales)|
+----------------+
|844.136657516844|
+----------------+



In [239]:
## avgerage sales based on net sales
df_avgTotalSale = df.agg(avg("net_sales"))
df_avgTotalSale.show(truncate=False,n=5)

+-----------------+
|avg(net_sales)   |
+-----------------+
|834.0866102831017|
+-----------------+



In [240]:


df_avgNetSale = df.agg(avg("net_sales"))
df_avgNetSale.show(truncate=False,n=5)

+-----------------+
|avg(net_sales)   |
+-----------------+
|834.0866102831017|
+-----------------+



In [241]:
### Removing duplicates based on OrderID

df = df.drop_duplicates(subset=['OrderId'])

In [242]:
df.show(truncate=False,n=5)


+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+---------+
|OrderId            |OrderItemId|QuantityOrdered|ItemPrice|PromotionDiscount                         |batch_id|region|discount|total_sales|net_sales|
+-------------------+-----------+---------------+---------+------------------------------------------+--------+------+--------+-----------+---------+
|171-0008037-3788355|2.24569E+13|1              |499      |{ "CurrencyCode": "INR", "Amount": "10.1"}|232     |A     |10.1    |499        |488.9    |
|171-0008662-0057942|6.63267E+13|1              |599      |{ "CurrencyCode": "INR", "Amount": "10.1"}|122     |A     |10.1    |599        |588.9    |
|171-0010803-5365973|2.11352E+13|1              |299      |{ "CurrencyCode": "INR", "Amount": "10.1"}|669     |A     |10.1    |299        |288.9    |
|171-0015668-5065935|8.91807E+12|1              |699      |{ "CurrencyCode": "INR", "Amount": "10"} 

In [243]:
df.printSchema()

root
 |-- OrderId: string (nullable = true)
 |-- OrderItemId: string (nullable = true)
 |-- QuantityOrdered: integer (nullable = true)
 |-- ItemPrice: integer (nullable = true)
 |-- PromotionDiscount: string (nullable = true)
 |-- batch_id: integer (nullable = true)
 |-- region: string (nullable = false)
 |-- discount: float (nullable = true)
 |-- total_sales: integer (nullable = true)
 |-- net_sales: float (nullable = true)

