In [1]:
import sys
import os

In [2]:
os.environ['SPARK_HOME']='/usr/hdp/current/spark2-client'
os.environ['HADOOP_CONF_DIR']='/etc/hadoop/conf'
os.environ['JAVA_HOME']='/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64'
os.environ['PYSPARK_PYTHON']='/bin/python3.6'
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
#PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH

In [3]:
from pyspark.sql import SparkSession,Row
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd

In [4]:
jobName='order_det_summary'

In [5]:
spark=SparkSession.builder.appName(jobName).enableHiveSupport().\
getOrCreate()

In [6]:

print("processing started")

oiDf=spark.read.json("/public/retail_db_json/order_items")
oDf=spark.read.json("/public/retail_db_json/orders")

print(f"count of orders:{oDf.count()}")
print(f"count of order items:{oiDf.count()}")

orders=oiDf.join(oDf,oiDf.order_item_id==oDf.order_id,'left').select([*oiDf.columns,'order_customer_id','order_status'])

#Processed order summarization


#Order summary per status
orders_sum_to_status=orders.groupBy("order_status").\
agg(F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"),F.count("order_item_id").alias("order_items_total")).\
withColumn("order_status",F.when(F.col("order_status").isNull(),F.lit("EXCEPTION")).otherwise(F.col("order_status")))

#order summary per product
orders_sum_to_product=orders.where("order_status is not null").groupBy("order_item_product_id").\
agg(F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"),F.sum("order_item_quantity").alias("tot_qty"))

#order summary per customer
orders_sum_to_customer=orders.where("order_status is not null").groupBy("order_customer_id").\
agg(F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"))

#order summary per customer for each product
orders_sum_to_customer_product=orders.where("order_status is not null").groupBy("order_customer_id","order_item_product_id").\
agg(F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"),F.sum("order_item_quantity").alias("tot_qty"))

#order summary per customer for each status
orders_sum_to_customer_status=orders.where("order_status is not null").groupBy("order_customer_id","order_status").\
agg(F.count("order_item_order_id").alias("orders_total"),F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"))


#Exception orders
orders_exception=orders.where("order_status is null").\
withColumn('order_status',F.lit('EXCEPTION')).drop("order_customer_id")

#Exception order summary
orders_ex_sum_to_prod=orders_exception.groupBy("order_item_product_id").\
agg(F.sum("order_item_subtotal").cast(DecimalType(18, 2)).alias("total_amt"),F.sum("order_item_quantity").alias("tot_qty"))

#wrting aggregated tables to hive


orders_sum_to_status.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_sum_to_status")
orders_sum_to_product.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_sum_to_product")
orders_sum_to_customer.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_sum_to_customer")
orders_sum_to_customer_product.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_sum_to_customer_product")
orders_sum_to_customer_status.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_sum_to_customer_status")
orders_exception.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_exception")
orders_ex_sum_to_prod.write.mode("overwrite").saveAsTable("pathirippilly_db.orders_ex_sum_to_prod")

print("processing completed successfully")


processing started
count of orders:68883
count of order items:172198
processing completed successfully


In [7]:
df=spark.read.csv('/user/pathirippilly/test_data/testData.csv',header=True,inferSchema=True)

In [9]:
df.show()

+----+-------------------+
|port|          timestamp|
+----+-------------------+
|9200|2020-06-19 02:12:41|
|9200|2020-06-19 03:54:23|
|  51|2020-06-19 05:32:11|
|  22|2020-06-20 06:07:43|
|  22|2020-06-20 01:11:12|
|  51|2020-06-20 07:38:49|
+----+-------------------+



In [15]:
df.groupBy(F.to_date('timestamp').alias('date'),'port').count().orderBy('date','port').show()

+----------+----+-----+
|      date|port|count|
+----------+----+-----+
|2020-06-19|  51|    1|
|2020-06-19|9200|    2|
|2020-06-20|  22|    2|
|2020-06-20|  51|    1|
+----------+----+-----+



In [None]:
orders_sum_to_product.write.format('avro')