<a href="https://colab.research.google.com/github/sachinibuddhika/Big-Data-Project-Analysis-on-Swiggy-Zomato-Order-Information-Dataseton/blob/main/BigData_195006X.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Mount google drive**

In [324]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Initialize spark context**

In [325]:
#install pyspark
!pip install pyspark

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col



In [326]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("OrderAnalysis") \
    .getOrCreate()

In [327]:
spark

In [328]:
conf = spark.sparkContext.getConf()

# Print the configuration settings
print("spark.app.name = ", conf.get("spark.app.name"))
print("spark.master = ", conf.get("spark.master"))
print("spark.executor.memory = ", conf.get("spark.executor.memory"))

spark.app.name =  OrderAnalysis
spark.master =  local[*]
spark.executor.memory =  None


In [329]:
# Load the CSV file into a DataFrame
df = spark.read.csv('/content/drive/My Drive/BigData/DataSet/Rider-Info.csv', header=True)


**Read the dataset**

In [330]:
#specifying the dataset to read
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path: Union[str, List[str]], schema: Union[pyspark.sql.types.StructType, str, NoneType] = None, sep: Optional[str] = None, encoding: Optional[str] = None, quote: Optional[str] = None, escape: Optional[str] = None, comment: Optional[str] = None, header: Union[bool, str, NoneType] = None, inferSchema: Union[bool, str, NoneType] = None, ignoreLeadingWhiteSpace: Union[bool, str, NoneType] = None, ignoreTrailingWhiteSpace: Union[bool, str, NoneType] = None, nullValue: Optional[str] = None, nanValue: Optional[str] = None, positiveInf: Optional[str] = None, negativeInf: Optional[str] = None, dateFormat: Optional[str] = None, timestampFormat: Optional[str] = None, maxColumns: Union[str, int, NoneType] = None, maxCharsPerColumn: Union[str, int, NoneType] = None, maxMalformedLogPerPartition: Union[str, int, NoneType] = None, mode: Optional[str] = None, columnNameOfCorruptRecord: Optional[str] = None, multiLine: Union[bool, str, NoneType] 

In [331]:
# Load the CSV file into a DataFrame
df = spark.read.csv('/content/drive/My Drive/BigData/DataSet/Rider-Info.csv', header=True)

In [332]:
df.show(5)

+-------------------+--------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+------------------+--------------+----------------+---------+------------------+--------------------+-------------------+-------------------+----------------+------------------+--------------+
|         order_time|order_id|         order_date|         allot_time|        accept_time|        pickup_time|     delivered_time|rider_id|first_mile_distance|last_mile_distance|alloted_orders|delivered_orders|cancelled|undelivered_orders|lifetime_order_count|reassignment_method|reassignment_reason|reassigned_order|      session_time|cancelled_time|
+-------------------+--------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+------------------+--------------+----------------+---------+------------------+--------------------+-------------------+----------------

In [333]:
df.columns


['order_time',
 'order_id',
 'order_date',
 'allot_time',
 'accept_time',
 'pickup_time',
 'delivered_time',
 'rider_id',
 'first_mile_distance',
 'last_mile_distance',
 'alloted_orders',
 'delivered_orders',
 'cancelled',
 'undelivered_orders',
 'lifetime_order_count',
 'reassignment_method',
 'reassignment_reason',
 'reassigned_order',
 'session_time',
 'cancelled_time']

**Preprocess data**

In [334]:
from pyspark.sql import functions as F

Dropping columns

In [335]:
columns_to_drop = ['accept_time', 'undelivered_orders','reassignment_method', 'reassignment_reason','cancelled_time']
df2 = df.drop(*columns_to_drop)

df2.show(5)

+-------------------+--------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------------+
|         order_time|order_id|         order_date|         allot_time|        pickup_time|     delivered_time|rider_id|first_mile_distance|last_mile_distance|alloted_orders|delivered_orders|cancelled|lifetime_order_count|reassigned_order|      session_time|
+-------------------+--------+-------------------+-------------------+-------------------+-------------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------------+
|2021-01-26 02:21:35|  556753|2021-01-26 00:00:00|2021-01-26 02:21:59|2021-01-26 02:32:51|2021-01-26 02:49:47|   11696|             1.5666|              2.65|          46.0|            46.0|        0|               621.0|     

In [336]:
df2.columns

['order_time',
 'order_id',
 'order_date',
 'allot_time',
 'pickup_time',
 'delivered_time',
 'rider_id',
 'first_mile_distance',
 'last_mile_distance',
 'alloted_orders',
 'delivered_orders',
 'cancelled',
 'lifetime_order_count',
 'reassigned_order',
 'session_time']

Dropping null rows

In [337]:
# display null values
df2.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|order_time|order_id|order_date|allot_time|pickup_time|delivered_time|rider_id|first_mile_distance|last_mile_distance|alloted_orders|delivered_orders|cancelled|lifetime_order_count|reassigned_order|session_time|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|         0|       0|         0|         0|       2421|          5218|       0|                  0|                 0|         16948|           17341|        0|                  53|          436247|        3675|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+--

In [338]:
df2 = df2.filter(df.accept_time.isNotNull())

In [339]:
rows_to_drop = ['pickup_time','delivered_time']

In [340]:
for row in rows_to_drop:
  df2 = df2.filter(df[row].isNotNull())

In [341]:
# display null values
df2.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|order_time|order_id|order_date|allot_time|pickup_time|delivered_time|rider_id|first_mile_distance|last_mile_distance|alloted_orders|delivered_orders|cancelled|lifetime_order_count|reassigned_order|session_time|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|         0|       0|         0|         0|          0|             0|       0|                  0|                 0|         16299|           16486|        0|                   2|          431751|        3531|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+--

In [342]:
df2.count()

444782

Filling missing values

In [343]:
from pyspark.sql.functions import mean

In [344]:
# Fill missing values with mean in specified columns
columns_to_fill = ['alloted_orders', 'delivered_orders', 'lifetime_order_count', 'session_time']
mean_values = df2.select([mean(col).alias(col) for col in columns_to_fill]).collect()[0].asDict()
df2 = df2.fillna(mean_values)

In [345]:
# display null values
df2.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df2.columns]).show()

+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|order_time|order_id|order_date|allot_time|pickup_time|delivered_time|rider_id|first_mile_distance|last_mile_distance|alloted_orders|delivered_orders|cancelled|lifetime_order_count|reassigned_order|session_time|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+---------+--------------------+----------------+------------+
|         0|       0|         0|         0|          0|             0|       0|                  0|                 0|             0|               0|        0|                   0|          431751|           0|
+----------+--------+----------+----------+-----------+--------------+--------+-------------------+------------------+--------------+----------------+--