# <div style="font-family: Trebuchet MS; background-color: #58D68D; color: #000000; padding: 12px; line-height: 1.5; font-size:"> Introduction 🎻</div>

### <div style="font-family: Trebuchet MS; background-color: #F4D03F; color: #000000; padding: 12px; line-height: 1.5;"> Hey Kagglers!! Today I am gonna share with you a simple tool that you can leverage to speeden up the big data processing involved in your own projects. For freshers/experienced practioners, I believe that it is important for y'all to get a basic understanding of the Spark ecosystem as many data-centric companies are continuing to adopt this technology.<br><br> In this notebook, I have tried to compile all the basic functionalities to get you started with Spark effortlessly.</div>

<div style="font-family: Trebuchet MS; background-color: #EAECEE; color: #000000; padding: 12px; line-height: 1;"><h3> Some basic guidelines that I have followed to make this notebook look interactive:</h3><h4><ul style=“list-style-type:square”><li>Whenever there is a definition, I have highlighted it with a  <span style="background-color: #2E31FD;font-size: 25px">📣</span></li><br><li>Whenever there is a new function/method, I have highlighted it with a <span style="background-color: #00FF00;font-size: 25px">🌼</span></li><br><li>Whenever there is a suggestion from my side, I have highlighted it with a <span style="background-color: #F3FF00;font-size: 25px">📌</span></li></ul></h4></div> 

### So what are you waiting for! Let's get started with the basics:

## <div style="padding: 12px"><span style="background-color: #2E31FD;font-size: 35px">📣</span> What is Apache Spark in Technical terms.</div>

- Apache Spark is an open-source, distributed data processing and analytics framework designed for large-scale data processing tasks. 

- It provides a unified and flexible platform for performing various data processing operations, including batch processing, interactive queries, real-time stream processing, machine learning, and graph processing.

## <div style="padding: 12px"><span style="background-color: #2E31FD;font-size: 35px">📣</span> What is this Apache Spark with a simple analogy? </div>

- Apache Spark is like a supercharged engine for processing and analyzing really big piles of data. Imagine you have a massive amount of information, like a gigantic puzzle with millions of pieces. Trying to solve this puzzle on a single computer could take forever. But Spark lets you use many computers at once, like a team of puzzle solvers, to work on different parts of the puzzle together.

- These "puzzle solvers" (computers) can talk to each other and share their findings, making the work faster and more efficient. Spark also keeps everything organized and makes sure that even if one of the "puzzle solvers" takes a break or has a problem, the others can still continue working without losing progress.

- In simple words, Apache Spark helps you process huge amounts of data much faster by getting a bunch of computers to work together and collaborate on the job. It's like a team effort that makes solving big data problems much easier and quicker!

## <div style="padding: 12px"><span style="background-color: #2E31FD;font-size: 35px">📣</span> What is PySpark?</div>

- PySpark is the Python API to use Spark, just like Pandas.

- In simple words, PySpark is a special tool that combines the power of many computers with the simplicity of Python to help you handle really big piles of data without breaking a sweat!

## <div style="padding: 12px"><span style="background-color: #2E31FD;font-size: 35px">📣</span> Benefits of using PySpark over Pandas for Data Processing:</div>

#### 1. Scalability and Distributed Computing:

- PySpark is designed for processing large-scale data across clusters of machines. It can handle data sizes that may not fit in memory, as it utilizes distributed computing.
- Pandas, on the other hand, is designed for single-machine data processing and may struggle with extremely large datasets that exceed available memory.

#### 2. Performance:

- PySpark's in-memory processing and distributed computing can lead to better performance for certain operations on large datasets compared to pandas.
- While pandas is fast for single-machine operations, PySpark's parallel processing can provide significant performance gains for operations that can be parallelized across multiple nodes.

# <div style="font-family: Trebuchet MS; background-color: #B0E0E6; color: #000000; padding: 12px; line-height: 1.5;"> Importing Libraries 📚</div>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import datetime as dt
import regex as re
import os
from IPython.display import Image,display

## Supressing warnings:
import warnings
warnings.filterwarnings("ignore")

In [None]:
!pip install pyspark

In [None]:
## importing essential spark libraries:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, count, when, regexp_replace, udf, struct, lit, isnull, trim, asc, desc, round, mean
from pyspark.sql.functions import to_timestamp, to_date, unix_timestamp, date_format   ### --> Date manipulation
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType

# <div style="font-family: Trebuchet MS; background-color: #B0E0E6; color: #000000; padding: 12px; line-height: 1.5;"> Getting Started with the Analysis 🔬</div>


#### The first step towards your adventure in Spark is to create a Spark Session. It is the entry point to the Spark ecosystem. Once you<br><br>reach the Spark environment via the entry point, you can freely create and manipulate Spark RDDs, Dataframes and Datasets. 

## <span style="background-color: #2E31FD;font-size: 35px">📣</span> What is a RDD?

You might be wondering what this new term is. Well RDD stands for **Resilient Distributed Dataset**. It is the fundamental data structure of Spark.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> SparkSession.builder()

#### SparkSession will be created using SparkSession.builder() builder patterns::

In [None]:
##  Creating a Spark session:
spark = SparkSession.builder.appName('Sample').getOrCreate()

In [None]:
## Quick glance at the object
spark

##### Here, the spark object acts as the gateway to the Spark ecosystem. 

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> read.csv(), show()

To read a csv file.

In [None]:
df=spark.read.csv("/kaggle/input/food-delivery-dataset/train.csv",
                  header=True,
                  inferSchema=True)
#  Parameters:
## - inferSchema parameter ensures that the data formatting stays the same as the original dataframe. If False, then the 
##     columns will be of class string.
## - header parameter tells that the columns names are provided along with the dataset.

## Displaying the first 5 rows:
df.show(5)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> first()

To view the data points of the first row.

In [None]:
df.first()   ### returns a Row object

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> toPandas(), head()

To view the dataframe in the form of Pandas dataframe fashion.

In [None]:
## To convert a spark dataframe into a pandas dataframe
df.toPandas().head()

#### As you can see above, Time_taken(min) is the target variable.

#### Now we have read the csv file into Spark. Lets view the dataframe:

In [None]:
## Viewing the type
type(df)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> printSchema()

To print the schema of the dataset.

In [None]:
## Printing the attributes of the table:
df.printSchema()

In [None]:
## Displaying the first 5 rows in the form of col-value pairs
df.head(5)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> describe(), summary()

To view the basic statistics of the dataset.

In [None]:
## Basic statistics of the data:
df.describe()    ### df.summary()
df.describe().show()

#### NOTE: describe() represents the statistical summary of dataframe but it also uses the string variables

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> count(), columns

To count the number of rows present, To display the various columns present in the dataframe.

In [None]:
## Shape of the dataframe is:
df.count(),len(df.columns)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> col(), isNull(), alias()

To select a particular column for applying a transformation, To check whether a column has Null values, To rename a column after a transformation.

In [None]:
## Checking for null values:
df.select([count(when(df[c].isNull(), c)).alias(c) for c in df.columns]).show()

### <span style="background-color: #F3FF00;font-size: 35px">📌</span>Breaking down the above query by taking one sample column:

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> when()-otherwise()

To fill in values of a column based on a condition.

In [None]:
## Then when-otherwise pair effectively works as the CASE WHEN THEN ELSE END expression of SQL:
df.select((when(df['Weatherconditions'].isNull(),"None").otherwise(df['Weatherconditions']))).show(2)

In [None]:
## Placing the alias changes the column name:
df.select((when(df['Weatherconditions'].isNull(),"None").otherwise(df['Weatherconditions'])).alias('Weatherconditions')).show(2)

In [None]:
## Placing a count() function returns the number of empty/None/Null rows:
df.select(count(when(df['Weatherconditions'].isNull(),"None")).alias("Count_Null_Weather")).show(2)

## Automating this expression for multiple columns using the list comprehension will yield the desired output.

#### Looks like there are no null values.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> dtypes

To view the datatypes of a column(s).

In [None]:
## Checking the dtypes:
df.dtypes

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> select()

To select columns for display.

In [None]:
## To view a few selected columns:
df.select(["ID","Delivery_person_ID"]).show()

In [None]:
df.printSchema()

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> cast()

To change the datatype of a column(s).

#### The various datatypes that a column can take up are integers, string, double, float, timestamp, etc...

#### To convert a column into:

1. double ---> use DoubleType()

2. int    ---> use IntegerType()

3. float  ---> use FloatType()

4. string ---> use StringType()

5. long   ---> use LongType()

#### all inside the cast() method.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> withColumn()

#### In PySpark, the withColumn() function is widely used and defined as the **transformation function** of the DataFrame

#### which is further

- used to change the value, 

- convert the datatype of an existing column, 

- create the new column etc...

In [None]:
## Have to correct the datatypes of some columns. Delivery_person_Age, Vehicle_condition, multiple_deliveries
df=df.withColumn('Delivery_person_Age',col('Delivery_person_Age').cast(IntegerType()))\
.withColumn('Vehicle_condition',col('Vehicle_condition').cast(IntegerType()))\
.withColumn('multiple_deliveries',col('multiple_deliveries').cast(IntegerType()))

In [None]:
## Checking after conversion:
df.dtypes

In [None]:
df.select(['Delivery_person_Age','Vehicle_condition','multiple_deliveries']).dtypes

In [None]:
## To display the PySpark dataframe as a pandas dataframe:
df.toPandas().head()

In [None]:
## Checking the numeric columns:
def num_cols(dataframe):
    num_cols = [col for col in dataframe.columns if dataframe.select(col).dtypes[0][1] in ['double', 'int']]
    return num_cols

num_cols = num_cols(df)  ### list of numeric columns
    
df.describe(num_cols).show()

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> distinct()

To get the unique values.

In [None]:
### There are 1320 unique IDs
df.select('Delivery_person_ID').distinct().count()  

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> orderBy()

To sort a column(s).

In [None]:
### Counts of unique delivery person ids::
df.select('Delivery_person_ID').distinct().show(5)  ### 20 
df.groupBy('Delivery_person_ID').count().orderBy('count').show(5)

## <div style="color:white;display:fill;border-radius:5px;background-color:#DE3163;font-size:110%;font-family:Verdana;letter-spacing:0.5px"><p style="padding: 10px;color:white;">Visualising the Distributions</p></div>

In [None]:
#Visualisating the distribution of the categorical variables:
cols = ['Delivery_person_Age','Delivery_person_Ratings','Weatherconditions','Road_traffic_density','multiple_deliveries','Festival','City']
num_plots = len(cols)
num_rows = (num_plots // 2) + (num_plots % 2)

fig, axes = plt.subplots(num_rows, 2, figsize=(20,15))

for i, column_name in enumerate(cols):
    row = i // 2
    col = i % 2

    ax = axes[row, col]
    sns.countplot(data=df.toPandas(), x=column_name, order=df.toPandas()[column_name].value_counts().sort_index().index, ax=ax)

    ax.set_xlabel(column_name)
    ax.set_ylabel('No. of Orders')
    ax.set_title(column_name)
    ax.tick_params(axis='x', rotation=45)
    
if num_plots % 2 != 0:
    fig.delaxes(axes[-1, -1])

plt.tight_layout()
plt.show()

### The Delivery_person_Age column contains Age=0 values which are clearly outliers.

### Looks like we have to handle all the missing values. We can notice that although the data showed no missing values initially, it seems that the NaN values have been represented as a string, hence the isNa() function wasnt able to detect it earlier. 

### We can also bin categories as well!

In [None]:
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
address = [(1,"14851 Jeffrey Rd","DE"),
    (2,"Address nan ","NY"),
    (3,"13111 Siemon Ave","CA"),
    (4,"bougain nan ","WA")]
sample =spark.createDataFrame(address,["id","address","state"])
sample.show()

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> trim()

To remove trailing spaces.

In [None]:
### Stripping all the white space present in categorical columns:
cat=[i for i in df.columns if df.select(i).dtypes[0][1] in ('string')]
for i in cat:
    df=df.withColumn(i,trim(df[i]))

In [None]:
## checking whether rows are trimmed:
df.show(2)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> regexp_replace()

To replace characters of a string.

#### Here we are replacing the "NaN" string values with "None"

In [None]:
cols = ['Delivery_person_Age','Delivery_person_Ratings','Weatherconditions','Road_traffic_density','multiple_deliveries','Festival','City']
for i in cols:
    ## Displaying the changes realtime
    ## NOTE: THE PARAMETER OF THE FUNCTION DOES NOT SUPPORT SQL-LIKE STRING MATCHING LIKE %,_,etc...
    df.withColumn(f"{i}_new",regexp_replace(i,"^(.*?)NaN","None")).select(f"{i}_new").distinct().show(5)
    ## Replacing the dataframe:
    df=df.withColumn(i,regexp_replace(i,"^(.*?)NaN","None"))

#### Now that we have temporarily replaced the NaN values with "None", we will treat them later one-by-one in the upcoming sections.

In [None]:
df.select(df.Weatherconditions).distinct().show()

## <div style="color:white;display:fill;border-radius:5px;background-color:#5642C5;font-size:110%;font-family:Verdana;letter-spacing:0.5px"><p style="padding: 10px;color:white;">Feature Engineering Overview</p></div>
    
As observed from the above dataset, we can extract the following:

1. City from Delivery_person_ID ----> city  ✅

2. Bucket cities into Zones - North, South, East, West  ----> city_zone  ✅

3. Cleaning the Weatherconditions column ✅

4. Removing Zero-Aged delivery agents ✅

5. Time taken to process and package the delivery using Time_Orderd and Time_Order_picked ----> processing_time ✅

6. Time of the day - Morning, Lunch, Evening, Night, Midnight ----> day_zone

7. To clean up target variable - Time_taken(min)  ✅

8. Bucket Age - Delivery_person_Age ----> life_stage

9. Features using Latitude and Longitude ----> geosidic

10. Handle NaN values in all other column

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">1. City from delivery id:</p></blockquote>

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> udf()

#### In order to apply a function into a particular column, we have create the function and register it as a UDF(User Defined Function) on Spark.

#### It is imported from the pyspark.sql.functions module.

In [None]:
# Create custom function
def city_extract(x):
    return re.findall("(\S+)RES\S+",x)[0]

# Convert the function as a UDF using the udf function:
city_extract_UDF = udf(lambda x:city_extract(x),StringType()) 

# Apply the function on the desired column:
df=df.withColumn("City_code",city_extract_UDF(df["Delivery_person_ID"]))

## Having a glance at the new column:
df.select(['Delivery_person_ID','City_code']).show(5)

In [None]:
## There are 22 unique city codes in our data:
df.select("City_code").distinct().show(22)

### <span style="background-color: #F3FF00;font-size: 35px">📌</span> There are three ways by which you can apply a custom function to rows of a spark dataframe:

- User Defined Functions
- Map functions
- Custom Spark-native functions

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> groupBy(), sort()--asc()/desc()

To group by the data based on column(s), To sort based on column(s) in ascending/descending fashion.

In [None]:
## To get count of the distinct cities:: (equivalent to value_counts() method in pandas)
df.groupBy("City_code").count().sort(desc("count")).show(22)  ### orderBy(desc(col("count")))  ## orderBy(desc("count")) ## orderBy('count', ascending=False) 

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> rdd.flatMap().collect()

Methods to convert a pyspark column into a list/array:

In [None]:
df.select("City_code").distinct().rdd.flatMap(lambda x: x).collect()  ### to convert a column into a list
df.select("City_code").distinct().toPandas().values.flatten()  ### to convert a column into a numpy array

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> withColumnRenamed()

To rename a column.

In [None]:
## Renaming the column to avoid name clash:
df=df.withColumnRenamed("City_code","City_short_form")

In [None]:
## Checking to see if change has reflected:
df.columns

In [None]:
### Created a manual list of the full form of the city encodings:
dic_city={"LUDH":"Ludhiana",
"CHEN":"Chennai",
"KOC":"Kochi",
"GOA":"Goa",
"AURG":"Aurangabad",
"JAP":"Jaipur",
"DEH":"Delhi",
"MUM":"Mumbai",
"AGR":"Agra",
"SUR":"Surat",
"INDO":"Indore",
"PUNE":"Pune",
"ALH":"Allahabad",
"MYS":"Mysore",
"COIMB":"Coimbatore",
"HYD":"Hyderabad",
"VAD":"Vadodara",
"RANCHI":"Ranchi",
"BHP":"Bhopal",
"KOL":"Kolkatta",
"KNP":"Kanpur",
"BANG":"Bangalore"}

NOTE: You cannot pass dictionaries as a parameter for a UDF! Hence the below code cell will raise an error:

In [None]:
## Creating a udf to map the encodings with their original names:
def city_map(x,dic):
    return dic[x['City_short_form']]
udf_city_map=udf(lambda x:city_map(x,dic),StringType())
df=df.withColumn("City",udf_city_map("City_short_form"))

## Raises Error
# df.City.show(12)

Hence you have to make a small change in the way you define the function by creating a nested function for indirectly passing the dictionary as a parameter to the UDF:

In [None]:
def get_city(mapping):
    def f(x):
        return mapping.get(x)
    return udf(f)

df=df.withColumn('City', get_city(dic_city)('City_short_form'))

In [None]:
## Checking the dataset for the new column:
df.select("City").show(5)

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">2. Bucketing cities into various Zones - North, South, East, West:</p></blockquote>

In [None]:
## NOTE: THIS IS COMPLETELY BASED ON MY INTUTION. IF YOU FEEL LIKE SOMETHING IS OUT OF PLACE, PLZ CORRECT THIS IN YOUR OWN ANALYSIS:
dic_zones={"Ludhiana":"North",
"Chennai":"South",
"Kochi":"South",
"Goa":"West",
"Aurangabad":"West",
"Jaipur":"North",
"Delhi":"North",
"Mumbai":"West",
"Agra":"North",
"Surat":"East",
"Indore":"Central",
"Pune":"West",
"Allahabad":"North",
"Mysore":"South",
"Coimbatore":"South",
"Hyderabad":"South",
"Vadodara":"West",
"Ranchi":"North",
"Bhopal":"North",
"Kolkatta":"East",
"Kanpur":"North",
"Bangalore":"South"}

In [None]:
def get_zone(mapping):
    def f(x):
        return mapping.get(x)
    return udf(f)

df=df.withColumn('city_zone', get_zone(dic_zones)('City'))

In [None]:
## Checking the new columns:
df.select(["City","city_zone"]).show(5)

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">3. Cleaning the Weatherconditions column:</p></blockquote>

In [None]:
df.select("Weatherconditions").show(5)

In [None]:
df.groupBy("Weatherconditions").count().sort(desc("count")).show()

Looks like there are None values as well. We will have to clean those **616** data points.

### To clean this, we will just randomly fill it with any weather as the distributions for all weather conditions are uniform.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> filter(), sample()

In order to get a sample, we use the sample() function:

#### collect() method returns a list of row objects. In order to get an attribute from a row object, we use the index of the row, followed by the \_\_getitem\_\_(<col_name>) magic method.

In [None]:
### list of unique weathers
weather=[i.__getitem__('Weatherconditions') for i in df.filter(df['Weatherconditions']!="None").select('Weatherconditions').distinct().collect()]
weather

In [None]:
df.select('Weatherconditions').show()

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> alias()

To create a copy of a table.

In [None]:
## creating a copy:  ### For testing.
df2=df.alias('df2')

In [None]:
### Checking the working of the sample() method:
def random_weather(x):
    ind=np.random.randint(6)
    k=weather[ind]
    return k

## Creating a udf:
udf_random_weather=udf(lambda x:random_weather(x),StringType())

In [None]:
df=df.withColumn('Weatherconditions',when(df['Weatherconditions']=="None",udf_random_weather(df['Weatherconditions'])).otherwise(df['Weatherconditions']))

In [None]:
### Now we have successfully replaced the None with random values
df.groupBy('Weatherconditions').count().orderBy(desc('count')).show()

#### Now it seems that the None values have been replaced.

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">4. Removing Zero Aged delivery agents:</p></blockquote>

In [None]:
df.columns

In [None]:
df.count()

In [None]:
## Around 4% of the delivery agents dont have an age:
df.filter(df["Delivery_person_Age"]==0).count()/df.count() * 100

In [None]:
## Seems to be that there is no correlation between 'Delivery_person_Age' and any other columns, 
## apart from 'Time_Orderd', as viewed through naked eyes
df.filter(df["Delivery_person_Age"]==0).show()

In [None]:
## Removing them from the dataset:
df=df.filter(df["Delivery_person_Age"]!=0)

## Verifying counts:
df.count()
df.filter(df["Delivery_person_Age"]==0).count()

In [None]:
## There are order dates which are NaNs:
df.filter(df["Time_Orderd"]==np.NaN).count()

In [None]:
### As you can see, there are 
df.filter(df["Time_Orderd"]==np.NaN).select(['Delivery_person_Age','Time_Orderd']).show(5)

In [None]:
df.filter(df["Time_Orderd"]==np.NaN).select(['Delivery_person_Age','Time_Orderd']).groupby(df['Delivery_person_Age']).count().orderBy(desc('count')).show()

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">5. Getting Processing time:</p></blockquote>

The important variables involved in the calculation of the processing time is **Time_Orderd**, **Order_Date** and **Time_Order_picked**.

#### - Analysing the **Time_Orderd** variable:

In [None]:
## Looks like there are ~1700 rows of null values in this column.
df.groupBy('Time_Orderd').count().sort(desc("count")).show(10)
# df.groupBy('Time_Orderd').count().sort(col("count").desc()).select("Time_Orderd").show(1)  ### To view the NaN string

In [None]:
## In this dataset, as we have seen earlier, the NaNs are encoded in string, hence this will show error:
## df.filter(df["Time_Orderd"].isNaN()).select("Time_Orderd").show()

In [None]:
### Using normal string NaN in the filter function:
df.filter(df['Time_Orderd']=="NaN").select('Time_Orderd').show(5)

In [None]:
df.filter(df['Time_Orderd']=="NaN").count()

#### - Analysing the **Order_Date** variable:

In [None]:
## There no null values here:
df.groupBy('Order_Date').count().sort(desc("count")).show(10)

In [None]:
## No null values as below line returns error:
## df.filter(df["Order_Date"].isNaN()).select("Order_Date").show()

In [None]:
df.filter(df['Order_Date']=="NaN").count()

In [None]:
df.select('Order_Date').show(5)

#### We are combining the date and time of ordering together into a single timestamp.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> struct()

We use struct to pass multiple columns as an argument to a udf.

In [None]:
df.select(['Order_Date','Time_Orderd']).show(2)

## Creating a udf without an explicit function, and using multiple columns with the help of struct function::
order_timestamp_udf = udf(lambda x: x[0]+" "+x[1], StringType())
df=df.withColumn("order_time_timestamp",order_timestamp_udf(struct('Order_Date','Time_Orderd')))#.select("order_time_timestamp")#.show(5)
## Viewing the created column:
df.select("order_time_timestamp").show(2)

#### -  Analysing the **Time_Order_picked** variable:

In [None]:
df.groupBy('Time_Order_picked').count().sort(desc("count")).show(10)

#### As you can see, there are NaN values present in the 'Time_Orderd' attribute only. We cannot calculate processing time with NaNs in this column. How do we tackle this. ANY IDEAS?? Let me know your ideas in the comments.

### <span style="background-color: #F3FF00;font-size: 35px">📌</span>A go-to approach will be to calculate average pickup time using other non null rows and then imputing the null rows with the average obtained.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> to_timestamp()

To convert a string to timestamp

Here's the list of all the metacharacters and its corresponding meaning:

In [None]:
from PIL import Image
img=Image.open('/kaggle/input/pyspark/Metacharacters Pyspark.png')
img=img.resize((700,500))
img

As seen from the above table, we can conclude that the metacharacter format for the final order placed timestamp can be represented as:

'**dd-MM-yyyy HH:mm:ss**'

In [None]:
## selecting the Non-null rows
temp=df.filter(df['Time_Orderd']!="NaN").select(['Order_Date','Time_Orderd','order_time_timestamp'])\
            .withColumn('order_time_timestamp',to_timestamp('order_time_timestamp','dd-MM-yyyy HH:mm:ss'))
temp.show(5)

### <span style="background-color: #F3FF00;font-size: 35px">📌</span>NOTE: There is **to_date()** function also available which lets you convert a string date column into date format.

In [None]:
### Now we have converted a string to a timestep
temp.dtypes

In [None]:
df.groupBy('order_time_timestamp').count().sort(desc("count")).show(5)

In [None]:
### Getting first row from the above table:
df.groupBy('order_time_timestamp').count().sort(desc("count")).collect()[:1][0].__getitem__('order_time_timestamp')

#### - Looks like NaN has been appended along with the date. Now we will calculate the difference between all the non-NaN containing timestamps of the ordered timestamp with their corresponding delivered timestamp to get the processing time in  minutes.

#### - Then we will impute the NaN containing timestamps with the mean of the processing time.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> contains(), like()

To extract text containing a certain characters/sequence.

In [None]:
## We remove the NaN filled order time from the existing data and treat them separately:
## To get the non-NaN containing timestamps:(There are two ways to do so.)
# df.filter(~df["order_time_timestamp"].like("%NaN%")).show(2)
temp=df.filter(~df["order_time_timestamp"].contains("NaN")).select(['order_time_timestamp','Time_Order_picked'])

In [None]:
temp.show(5)

### Notice the above table, the date components of both the timestamps dont match, and ideally they have to match. Hence they seem misleading. So we will use the time components alone to calculate the difference.

In [None]:
temp.dtypes

In [None]:
## Converting datatype into timestamp:
temp=temp.withColumn('order_time_timestamp',to_timestamp(temp['order_time_timestamp'],'dd-MM-yyyy HH:mm:ss'))
temp.show(2)

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> date_format()

To extract parts of a timestamp.

In [None]:
temp = temp.withColumn('order_time', date_format('order_time_timestamp', 'HH:mm:ss'))\
           .withColumn('Time_picked', date_format('Time_Order_picked', 'HH:mm:ss'))

temp.show(2)

In [None]:
temp.dtypes

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> unix_timestamp()

To convert Date and Timestamp Column to Unix Time.

In [None]:
### Calculating the difference in seconds, then dividing by 60 to get minutes
temp=temp.withColumn('order_time',to_timestamp(temp['order_time']))\
         .withColumn('Time_picked', to_timestamp(temp['Time_picked']))\
         .withColumn('processing_time',round((unix_timestamp("Time_picked") - unix_timestamp('order_time'))/60))

In [None]:
## Got the desired processing_time in minutes:
temp.show(5)

In [None]:
## Replicating the above steps onto the actual dataframe:
df=df.withColumn('order_time_timestamp',to_timestamp(df['order_time_timestamp'],'dd-MM-yyyy HH:mm:ss'))\
     .withColumn('order_time', date_format('order_time_timestamp', 'HH:mm:ss'))\
     .withColumn('Time_picked', date_format('Time_Order_picked', 'HH:mm:ss'))\
     
df=df.withColumn('order_time',to_timestamp(df['order_time']))\
     .withColumn('Time_picked', to_timestamp(df['Time_picked']))

In [None]:
## Seems to be right so far
df.show(1)

In [None]:
### Creating a column called processing time:
df=df.withColumn('processing_time',when(df['Time_Orderd']!="NaN",round((unix_timestamp("Time_picked") - unix_timestamp('order_time'))/60))\
                                   .otherwise(lit(np.NaN)))

#### Checking whether columns are created properly:

In [None]:
df.filter(df['Time_Orderd']=="NaN").show(3)

In [None]:
df.filter(df['Time_Orderd']!="NaN").show(3)

Looks like we have done a good job. Now we will replace these NaN values with the mean of the corresponding column:

In [None]:
df.filter(df['processing_time']<0).select(['order_time','Time_picked','processing_time']).show()
## Looks like we have negative values. This can be because:
## 00:05 - 23:50 = -1425. Hence to correct these negative values, we add 1440(24*60):
df=df.withColumn('processing_time',when(df['processing_time']<0,df['processing_time']+1440).otherwise(df['processing_time']))
df.filter(df['processing_time']<0).select(['order_time','Time_picked','processing_time']).show()

Looks like we have made the correction.

### <span style="background-color: #00FF00;font-size: 35px">🌼</span> na.fill()

To fill the NaN values of a specified column with a given value.(Missing value imputation) 

In [None]:
## The processing time taken by the restaurant is::
mean_processing=df.filter(df['Time_Orderd']!="NaN").select(mean("processing_time")).collect()[0].__getitem__('avg(processing_time)')
mean_processing

In [None]:
## Takes column name as parameter:
df=df.na.fill(mean_processing,'processing_time')

#### As we can see from below, we have got our final output:

In [None]:
df.filter(df['Time_Orderd']=="NaN").select(['order_time','Time_picked','processing_time']).show()

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">6. Splitting the time of ordering into zones of a day - Morning, Lunch, Evening, Night, Midnight:</p></blockquote>

In [None]:
## All the available Datetime columns:
df.select(['Order_Date','Time_Orderd','order_time','order_time_timestamp','Time_picked','Time_Order_picked']).show(5)

#### We will use the Time_Orderd column for the preprocessing:

In [None]:
df.filter(df['Time_Orderd']==np.NaN).show(5)

In [None]:
df=df.withColumn('Hour_order',df['Time_Orderd'].split()[])

In [None]:
## a udf to split hour from time:
hour_udf=udf(lambda x:x.split(":")[0],StringType())
df=df.withColumn('Hour_order',hour_udf(df['Time_Orderd']))

In [None]:
def time_of_day()

In [None]:
df.dtypes

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">7. Cleaning the target variable:</p></blockquote>

In [None]:
## Before transformation:
df.select("Time_taken(min)").show(5)

In [None]:
## Renaming the column name::
df=df.withColumnRenamed('Time_taken(min)','time_taken')

## Removing the preffix (i.e. '(min)') in the column values with the help of a UDF:
def target_clean(x):
    return x[-2:]

target_clean_udf=udf(lambda x:target_clean(x),StringType())
## Cleaning and Converting type to integer:
df=df.withColumn("time_taken",target_clean_udf(col("time_taken"))).withColumn("time_taken",col("time_taken").cast(IntegerType()))

In [None]:
## As you can see, the values have been cleaned and the type has been changed:
df.select("time_taken").show(5),df.select("time_taken").dtypes

<blockquote><p style="font-size:20px; color:#159364; font-family:verdana;">8. Handling the Geo Data:</p></blockquote>

In [None]:
7. 

# from geopy.distance import geodesic 

# train['distance_diff_KM']=np.zeros(len(train))
# restaurant_cordinates_train=train[['Restaurant_latitude','Restaurant_longitude']].to_numpy()
# delivery_location_cordinates_train=train[['Delivery_location_latitude','Delivery_location_longitude']].to_numpy()

# for i in range(len(train)):
#     train['distance_diff_KM'].loc[i]=geodesic(restaurant_cordinates_train[i],delivery_location_cordinates_train[i])