In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("FoodOrderDelivery").getOrCreate()


In [2]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType
import pyspark.sql.functions as f
from pyspark.sql.functions import *

schema = StructType([
    StructField("orders", StringType())
])

ordersDF = spark.read.csv('/datacartridge/streamData/foodDelivery/Raw/delTimeAssigned/*.csv', header=True, schema=schema)

#ordersDF.show(10)

finalDF=ordersDF.withColumn('source_zip',f.split('orders',',')[0])\
                .withColumn('order_id',f.split('orders',',')[1])\
                .withColumn('order_time',f.split('orders',',')[2])\
                .withColumn('order_value',f.split('orders',',')[3])\
                .withColumn('destination_zip',f.split('orders',',')[4])\
                .withColumn('deliveryPerson_id',f.split('orders',',')[5])\
                .withColumn('delivery_time',f.split('orders',',')[6]).drop('orders')#.show()
                
timeFmt = "yyyy-MM-dd HH:mm:ss"
timeDiff = (f.unix_timestamp('delivery_time', format=timeFmt) - f.unix_timestamp('order_time', format=timeFmt))

finalDF.show()

+----------+--------+-------------------+-----------+---------------+-----------------+-------------------+
|source_zip|order_id|         order_time|order_value|destination_zip|deliveryPerson_id|      delivery_time|
+----------+--------+-------------------+-----------+---------------+-----------------+-------------------+
|     65009| 729-975|2018-08-09 18:32:18|        592|          75415|              420|2018-08-11 06:54:19|
|     58603| 366-147|2018-08-09 01:17:32|        077|          44332|              175|2018-08-11 12:12:25|
|     80669| 451-711|2018-08-09 05:46:17|        383|          66459|              932|2018-08-11 11:39:03|
|     81133| 180-810|2018-08-09 03:37:44|        801|          81045|              527|2018-08-11 11:22:14|
|     62615| 385-628|2018-08-09 19:04:27|        361|          24615|              827|2018-08-11 06:22:26|
|     89569| 080-189|2018-08-09 17:26:35|        784|          48874|              247|2018-08-11 01:14:42|
|     09868| 113-542|2018-08

**Top 10 from_zip_code**

In [3]:
finalDF.groupby("destination_zip").count().sort(desc("count")).show()

+---------------+-----+
|destination_zip|count|
+---------------+-----+
|          31487|    4|
|          55458|    4|
|          43896|    4|
|          92407|    4|
|          79111|    4|
|          43462|    3|
|          81194|    3|
|          84316|    3|
|          86278|    3|
|          44954|    3|
|          13447|    3|
|          39167|    3|
|          28180|    3|
|          36884|    3|
|          92929|    3|
|          68012|    3|
|          24155|    3|
|          53401|    3|
|          89128|    3|
|          38798|    3|
+---------------+-----+
only showing top 20 rows



**Total value of orders that are in order_delivered per delivery person**

In [4]:
finalDF.groupBy("deliveryPerson_id").agg(f.sum("order_value")).show()

+-----------------+----------------+
|deliveryPerson_id|sum(order_value)|
+-----------------+----------------+
|              829|          8267.0|
|              296|         18893.0|
|              675|         10118.0|
|              691|         13406.0|
|              467|          5848.0|
|              451|          9912.0|
|              125|         12686.0|
|              853|          8795.0|
|              800|          9433.0|
|              944|         14673.0|
|              919|          9343.0|
|              870|          6445.0|
|              666|          7392.0|
|              926|          8304.0|
|              124|         12417.0|
|              591|          8612.0|
|              447|         10443.0|
|              718|         10368.0|
|              574|          7872.0|
|              307|         10997.0|
+-----------------+----------------+
only showing top 20 rows



**Average delivery time for delivered orders per delivery person**

In [8]:
durationDF=finalDF.withColumn("Duration", timeDiff/3600)#.show()

durationDF.groupBy("deliveryPerson_id").agg(f.avg("Duration")).show()

+-----------------+------------------+
|deliveryPerson_id|     avg(Duration)|
+-----------------+------------------+
|              829| 49.43887037037037|
|              296| 47.68906862745097|
|              675|47.938225308641975|
|              467| 48.07998015873016|
|              691| 50.47842171717173|
|              451| 49.76264619883041|
|              125| 45.95642361111111|
|              853| 49.86302469135804|
|              944| 51.70226666666667|
|              800|51.352048611111115|
|              919|48.333763888888896|
|              870| 44.31285185185185|
|              666|46.334357638888896|
|              926| 47.58458333333334|
|              124| 49.59237268518519|
|              591|45.816765873015875|
|              447| 48.40842995169083|
|              574| 44.45122222222222|
|              475| 46.74936213991769|
|              613| 46.73893518518518|
+-----------------+------------------+
only showing top 20 rows

