In [1]:
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col

# Initialize Glue context and Spark context
sc = SparkContext.getOrCreate() #get or create, important for jupyter.
glueContext = GlueContext(sc)
spark = glueContext.spark_session


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
df=spark.read.csv("sample_data/retail_db/orders/part-00000_orders.csv", header=True)
df.count()

                                                                                

68883

In [3]:
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)



In [17]:
distinct_count_ostatus=df.select("order_status").distinct().count()
print(f" there are total {distinct_count_ostatus} distinct order status available in the table")

 there are total 9 distinct order status available in the table


In [27]:
df.createOrReplaceTempView("dft")
spark.sql("select * from dft").show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [31]:
spark.sql("select count(*) from dft").show()

+--------+
|count(1)|
+--------+
|   68883|
+--------+



In [40]:
# print the top 5 dates having max orders placed
spark.sql("select date(order_date),count(*) from dft group by date(order_date)\
order by count(*) desc ")\
.show(5)

+----------+--------+
|order_date|count(1)|
+----------+--------+
|2013-11-03|     347|
|2013-11-24|     292|
|2013-11-14|     287|
|2013-10-04|     287|
|2013-12-26|     286|
+----------+--------+
only showing top 5 rows



In [158]:
# print the top 5 dates having max orders placed but using API
df.groupBy(df.order_date.cast("date").alias("order_Date")).count().sort(col("count").desc()).show(5)

+----------+-----+
|order_Date|count|
+----------+-----+
|2013-11-03|  347|
|2013-11-24|  292|
|2013-11-14|  287|
|2013-10-04|  287|
|2013-12-26|  286|
+----------+-----+
only showing top 5 rows



In [159]:
df.show(5)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 5 rows



In [190]:
o=spark.read.csv("sample_data/retail_db/orders/part-00000_orders.csv", header=True)
oi=spark.read.csv("sample_data/retail_db/order_items/part-00000-order_items.csv", header=True)
o.printSchema()
oi.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)

root
 |-- order_item_id: string (nullable = true)
 |-- order_item_order_id: string (nullable = true)
 |-- order_item_product_id: string (nullable = true)
 |-- order_item_quantity: string (nullable = true)
 |-- order_item_subtotal: string (nullable = true)
 |-- order_item_product_price: string (nullable = true)



In [161]:
oi.count()

172198

In [173]:
o.createOrReplaceTempView("o")
oi.createOrReplaceTempView("oi")

In [174]:
spark.sql("select count(*) from oi").show()

+--------+
|count(1)|
+--------+
|  172198|
+--------+



In [177]:
spark.sql("select * from o").show(3)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
+--------+--------------------+-----------------+---------------+
only showing top 3 rows



In [187]:
spark.sql("select * from oi").show(10)
print(199.99+ 250+129.99)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6| 

In [194]:
# print order id wise total sales
query='''
select order_id,sum(oi.order_item_subtotal) from o
inner join oi
on o.order_id=oi.order_item_order_id
group by o.order_id
order by sum(oi.order_item_subtotal) desc

'''
spark.sql(query).show(5)

+--------+------------------------+
|order_id|sum(order_item_subtotal)|
+--------+------------------------+
|   68703|      3449.9100000000003|
|   68724|      2859.8900000000003|
|   68858|                 2839.91|
|   68809|                 2779.86|
|   68766|                  2699.9|
+--------+------------------------+
only showing top 5 rows



In [222]:
# help(o.join)

# cond = [ o["order_id"] == oi["order_item_order_id"] ]
# cond = [ o["order_id"] == oi["order_item_order_id"] ]
cond = [ o.order_id == oi.order_item_order_id ]
# o.join(oi, cond, 'inner').groupBy(o.order_id).agg(sum(oi.order_item_subtotal))

o.join(oi, cond, "inner") \
    .groupBy(o.order_id) \
    .agg(sum(oi.order_item_subtotal).alias("total_sales")) \
    .orderBy(col("total_sales").desc()) \
    .show(5)



+--------+------------------+
|order_id|       total_sales|
+--------+------------------+
|   68703|3449.9100000000003|
|   68724|2859.8900000000003|
|   68858|           2839.91|
|   68809|           2779.86|
|   68766|            2699.9|
+--------+------------------+
only showing top 5 rows



In [239]:
o.select("order_id")

DataFrame[order_id: string]

In [240]:
o.select(o["order_id"])

DataFrame[order_id: string]

In [241]:
o.select(o.order_id)

DataFrame[order_id: string]

In [242]:
o.select(col("order_id"))

DataFrame[order_id: string]