<a href="https://colab.research.google.com/github/ktatarch/Coffee_Machine/blob/master/Big_Data_Internship.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Big Data Internship test task (Avenga Academy)**

---
# Goal:

Using [a dataset](https://www.kaggle.com/datasets/anandaramg/taxi-trip-data-nyc?resource=download) and Apache Spark (Scala or Python), get the following report:

* **Vendor**: string (nullable = true) - name of vendor
* **Payment Type**: string (nullable = true) - name of payment type
* **Payment Rate**: double (nullable = true) - payment_total / passengers count per vendor and payment type
* **Next Payment Rate**: double (nullable = true) - next record(bigger) payment rate for vendor
* **Max Payment Rate**: double (nullable = true) - max payment rate for vendor
* **Percents to next rate**: string (nullable = true) - how many points (in percents) is necessary to achieve the next record payment rate

I start with updating our environment and instaling:

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark
!pip install pandas

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [902 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelea

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate() 

In [5]:
import pandas as pd
import numpy as np

After everything necessary is installed - I retrieve data from the taxi_tripdata dataset using spark.read (the dataset itself was initially downloaded from [kaddle](https://www.kaggle.com/datasets/anandaramg/taxi-trip-data-nyc?resource=download)) and uploaded to Google Colab folder for further use.

As at the moment I'm more familiar with Pandas - I'll convert received spark dataset to Pandas on read. However, I plan to rewrite this project with PySpark as soon as I investigate PySpark a bit more. 

In [6]:
df = spark.read.csv('taxi_tripdata.csv', header=True, inferSchema=True).toPandas()

I'm copying received dataset specifying columns I would work with, in order to prevent calculations and displaying of the data from unused fields:

In [7]:
df = df[['VendorID', 'payment_type', 'passenger_count', 'total_amount']].copy()

I've decided to start with getting **Payment Rate** and **Max Payment Rate** column values. 

**Payment Rate** should be calculated as `payment_total / passengers count` per vendor and payment type. It also can include Null values. 

As our dataset also includes 0 values for **Passengers Count**, which will cause the **Payment Rate** to be infinity and negatively infulence on our report quality.

As by the information provideed at the data source, data for this column is the number of passengers in the vehicle, and is a driver-entered value. So, I decided to replace zero values with Null.

After **Payment Rate** values are calculated, I can proceed with adding a new column for storing **Max Payment Rate** values, which should display `max payment rate` for vendor.

In [8]:
df['passenger_count'] = df['passenger_count'].replace(0, np.nan)
df['payment_rate'] = df.total_amount / df.passenger_count

df = df.join(df.groupby('VendorID')['payment_rate'].max(), on='VendorID', rsuffix='_max')
print(df)

       VendorID  payment_type  passenger_count  total_amount  payment_rate  \
0           1.0           2.0              1.0          7.30          7.30   
1           2.0           2.0              2.0         43.30         21.65   
2           2.0           1.0              1.0         10.14         10.14   
3           2.0           2.0              1.0          7.80          7.80   
4           2.0           2.0              1.0          8.30          8.30   
...         ...           ...              ...           ...           ...   
83686       NaN           NaN              NaN         59.84           NaN   
83687       NaN           NaN              NaN         25.87           NaN   
83688       NaN           NaN              NaN         22.75           NaN   
83689       NaN           NaN              NaN         54.12           NaN   
83690       NaN           NaN              NaN         48.89           NaN   

       payment_rate_max  
0                184.21  
1          

Next, we'll be getting **Next Payment Rate** values. It should be represented as next record(bigger) payment rate for vendor. 

In order to determine next(bigger) payment rates for provider - let's sort and get unique payment rates values per provider `using sort_values` and `groupby`

After we get all values per provider, we can shift it by one using `Numpy roll()`
It's also important to take care about the last sorted value per each provider, in order to make sure that it's not shifted to the smaller value than it was.

In case the maximum payment rate is already achieved - no need to shift this value. There is no Next Payment Rate for it, so we'll replace it with NaN.


In [9]:
unique = df.sort_values(['payment_rate']).groupby('VendorID')['payment_rate'].unique()

print(f"Unique Payment Rate values per provider: {unique}")

new = np.roll(unique[1], -1)
new[-1] = np.nan

new1 = np.roll(unique[2], -1)
new1[-1] = np.nan

print(f"Shifted by 1 Payment Rate values per provider: {new}{new1}")

df1 = pd.DataFrame({'VendorID': 1.0, 'payment_rate': unique[1]}) #create a dataset from retrieved values for Vendor 1.0
df1['next_payment_rate'] = new

df2 = pd.DataFrame({'VendorID': 2.0, 'payment_rate': unique[2]}) #create a dataset from retrieved values for Vendor 2.0
df2['next_payment_rate'] = new1

combined_df = df1.append(df2) #append datasets in order to merge it with our main dataset later on
print(combined_df)

Unique Payment Rate values per provider: VendorID
1.0    [0.0, 0.8, 1.075, 1.0999999999999999, 1.45, 1....
2.0    [-150.3, -57.3, -30.3, -25.15, -20.3, -18.3, -...
Name: payment_rate, dtype: object
Shifted by 1 Payment Rate values per provider: [  0.8     1.075   1.1   ... 184.21      nan     nan][-57.3  -30.3  -25.15 ... 480.31    nan    nan]
      VendorID  payment_rate  next_payment_rate
0          1.0         0.000              0.800
1          1.0         0.800              1.075
2          1.0         1.075              1.100
3          1.0         1.100              1.450
4          1.0         1.450              1.600
...        ...           ...                ...
3450       2.0       207.480            225.000
3451       2.0       225.000            443.370
3452       2.0       443.370            480.310
3453       2.0       480.310                NaN
3454       2.0           NaN                NaN

[4484 rows x 3 columns]


Last value we did not get yet - is the **Percents to next rate**, which should describe how many points (in percents) is necessary to achieve the next record payment rate. 

We can calculate it as `Next Payment Rate (in persents) - Payment Rate (in persents)` for each data entry, taking the Max Payment Rate per provider as 100%

As we have a request of string type for this column - I can assume, that % mark should be added.

In [10]:
merged_data = df.merge(combined_df, how="left", on=["VendorID","payment_rate"]) #performing left join to get the data merged

merged_data['percent'] = ((merged_data.next_payment_rate * 100.00 / merged_data.payment_rate_max) - (merged_data.payment_rate * 100.00 / merged_data.payment_rate_max)) #getting Next Payment Rate values
merged_data.loc[merged_data['payment_rate'] == merged_data['payment_rate_max'], 'percent'] = "0" #taking care of our max payment rate values, by setting percent to 0%
merged_data.loc[merged_data['percent'].notna() == True, 'percent'] = merged_data['percent'].astype(str) + "%" #converting column into String, and adding % sign to not NaN values

print(merged_data)

       VendorID  payment_type  passenger_count  total_amount  payment_rate  \
0           1.0           2.0              1.0          7.30          7.30   
1           2.0           2.0              2.0         43.30         21.65   
2           2.0           1.0              1.0         10.14         10.14   
3           2.0           2.0              1.0          7.80          7.80   
4           2.0           2.0              1.0          8.30          8.30   
...         ...           ...              ...           ...           ...   
83686       NaN           NaN              NaN         59.84           NaN   
83687       NaN           NaN              NaN         25.87           NaN   
83688       NaN           NaN              NaN         22.75           NaN   
83689       NaN           NaN              NaN         54.12           NaN   
83690       NaN           NaN              NaN         48.89           NaN   

       payment_rate_max  next_payment_rate                 perc

Final steps: replacing values from VendorID and payment_type with corresponding names from documentation. Also, renaming all fields and dropping columns don't need in our final report.

In [11]:
merged_data.VendorID.replace([1.0, 2.0], ["Creative Mobile Technologies, LLC", "VeriFone Inc."], inplace=True)
merged_data.payment_type.replace([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], ["Credit card", "Cash", "No charge", "Dispute", "Unknown", "Voided trip"], inplace=True)

merged_data.rename(columns = {'VendorID':'Vendor', 'payment_type' : 'Payment Type', 'payment_rate' : 'Payment Rate', 'next_payment_rate' : 'Next Payment Rate', 'payment_rate_max' : 'Max Payment Rate', 'percent': 'Percents to next rate'}, inplace = True)
final = merged_data.drop(['passenger_count', 'total_amount'], axis=1)
print(merged_data)


                                  Vendor Payment Type  passenger_count  \
0      Creative Mobile Technologies, LLC         Cash              1.0   
1                          VeriFone Inc.         Cash              2.0   
2                          VeriFone Inc.  Credit card              1.0   
3                          VeriFone Inc.         Cash              1.0   
4                          VeriFone Inc.         Cash              1.0   
...                                  ...          ...              ...   
83686                                NaN          NaN              NaN   
83687                                NaN          NaN              NaN   
83688                                NaN          NaN              NaN   
83689                                NaN          NaN              NaN   
83690                                NaN          NaN              NaN   

       total_amount  Payment Rate  Max Payment Rate  Next Payment Rate  \
0              7.30          7.30    

Creating a custom Schema with prefered column types, and converting Pandas dataframe back into PySpark. 

In [12]:
mySchema = StructType([
    StructField('Vendor', StringType(), True),
    StructField('Payment Type', StringType(), True),
    StructField('Payment Rate', DoubleType(), True),
    StructField('Next Payment Rate', DoubleType(), True),
    StructField('Max Payment Rate', DoubleType(), True),
    StructField('Percents to next rate', StringType(), True)
    ])

taxiReport = spark.createDataFrame(final, schema=mySchema)
taxiReport.printSchema()
taxiReport.show()


root
 |-- Vendor: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Payment Rate: double (nullable = true)
 |-- Next Payment Rate: double (nullable = true)
 |-- Max Payment Rate: double (nullable = true)
 |-- Percents to next rate: string (nullable = true)

+--------------------+------------+------------+-----------------+-----------------+---------------------+
|              Vendor|Payment Type|Payment Rate|Next Payment Rate| Max Payment Rate|Percents to next rate|
+--------------------+------------+------------+-----------------+-----------------+---------------------+
|Creative Mobile T...|        Cash|         7.3|           184.21|            7.315| 0.008142880408229...|
|       VeriFone Inc.|        Cash|       21.65|           480.31|            21.66| 0.002081988715620...|
|       VeriFone Inc.| Credit card|       10.14|           480.31|            10.15| 0.002081988715621...|
|       VeriFone Inc.|        Cash|         7.8|           480.31|          

Next part of the task - is to save the data as csv file, and send it via email using python/scala function. 

Be default, spark is deviding csv files on creation to a few parts. I've used `coalesce` in order to get the complete file as one document.

In [24]:
taxiReport.coalesce(1).write.format("csv").option("header", "true").save("reportDataTaxi.csv") # saving dataframe as csv file

# at this step, I'll manually rename the generated csv file under reportTaxiData.csv folder to reportDataTaxi.csv

In [25]:
df = spark.read.csv("reportDataTaxi.csv/reportDataTaxi.csv", header=True, inferSchema=True)

df.printSchema()
df.show()

root
 |-- Vendor: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Payment Rate: double (nullable = true)
 |-- Next Payment Rate: double (nullable = true)
 |-- Max Payment Rate: double (nullable = true)
 |-- Percents to next rate: string (nullable = true)

+--------------------+------------+------------+-----------------+-----------------+---------------------+
|              Vendor|Payment Type|Payment Rate|Next Payment Rate| Max Payment Rate|Percents to next rate|
+--------------------+------------+------------+-----------------+-----------------+---------------------+
|Creative Mobile T...|        Cash|         7.3|           184.21|            7.315| 0.008142880408229...|
|       VeriFone Inc.|        Cash|       21.65|           480.31|            21.66| 0.002081988715620...|
|       VeriFone Inc.| Credit card|       10.14|           480.31|            10.15| 0.002081988715621...|
|       VeriFone Inc.|        Cash|         7.8|           480.31|          

Now when we have the data ready for submitting, I'll create a secure connection with Gmail’s SMTP server, using the SMTP_SSL() of smtplib to initiate a TLS-encrypted connection, and send email with attached CSV file. 

In [26]:
import smtplib, ssl

import email, smtplib, ssl

from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

port = 465  # For SSL
smtp_server = "smtp.gmail.com"
mail_subject = 'Test Email'
sender_email = input('enter sender mail address:')  # Enter sender address
receiver_email = input('enter receiver mail address:')  # Enter receiver address
password = input("Type your password and press enter: ")

subject = "Avenga Academy Big Data Internship test task"
body = """Test task submission by Kateryna Tatarchenko"""
html = """\
<html>
  <body>
    <p>Hi there! Hope you are doing well!<br>
       Thank you for sending me this task, this was my first practice with Apache Spark (Python) and it was a bit challenging but really interesting!<br>
       For this attemp, I've used spark through Google Colab (.ipunb), and most data manipulation were performed using Pandas.<br>
       I've tried to keep the initial dataset without changes, as NUll values were allowed for all fields. However, there was a necessary replacement of 0 values by Null for Passengers Count,in order to perform correct Rate Calculations.
      <br><br>
       All the explanations of performed steps can be found in my Big Data Internship.ipynb file,
       <a href="http://www.realpython.com">link to Github</a> 
       <br>
       Any feedback would be greatly appreaciated!

       <br>
       <br>
       Have a great day ahead, and look forward to hearing from you!
       <br>
       Kind regards,<br>
       Kate Tatarchenko
    </p>
  </body>
</html>
"""

message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = subject

part1 = MIMEText(body, "plain")
part2 = MIMEText(html, "html")

message.attach(part1)
message.attach(part2)

with open("/content/reportDataTaxi.csv/reportDataTaxi.csv", "rb") as attachment:
    # Add file as application/octet-stream
    # Email client can usually download this automatically as attachment
    part = MIMEBase("application", "octet-stream")
    part.set_payload(attachment.read())

# Encode file in ASCII characters to send by email    
encoders.encode_base64(part)

# Add header as key/value pair to attachment part
part.add_header(
    "Content-Disposition",
    f"attachment; filename= {'reportDataTaxi.csv'}",
)

# Add attachment to message and convert message to string
message.attach(part)
text = message.as_string()

context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:
    server.login(sender_email, password)
    server.sendmail(sender_email, receiver_email, message.as_string())

print("Message Sent!")

enter sender mail address:katetatarchenko@gmail.com
enter receiver mail address:ktatarch.test@gmail.com
Type your password and press enter: oeyscbxkaqxdqdan
Message Sent!
