In [1]:
from email.policy import default
import mailbox
import email
import datetime
from datetime import datetime

# Data Cleaning

In [2]:
def replace_escape_characters(input_string):
    escape_characters = ["\\n", "\\t", "\\r", "\\b", "\\\\", "\\\"", "\\'", "\\f", "\\v", "\\a", '\"']
    
    for escape_char in escape_characters:
        input_string = input_string.replace(escape_char, "")
    
    return input_string.strip()

In [3]:
mbox_path = '/path/to/Unread.mbox'
mbox = mailbox.mbox(mbox_path, factory=lambda f: email.message_from_binary_file(f, policy=default))
 
email_data = []
for message in mbox:
    subject = message['subject']

    from_ = message['from'].replace(">", "").split("<")
    if len(from_) == 2:
        sender = replace_escape_characters(from_[0])
        email_sender = from_[1]
    else:
        if "@" in replace_escape_characters(from_[0]):
            sender = " "
            email_sender = replace_escape_characters(from_[0])
        else:
            sender = replace_escape_characters(from_[0])
            email_sender = " "

    email_receiver = "your mail"

    if "+" in message['date']:
        date = message['date'].split('+')[0].strip()
    elif "-" in message['date']:
        date = message['date'].split('-')[0].strip()
    extraction_date = datetime.strptime(date, '%a, %d %b %Y %H:%M:%S')

    if message.is_multipart():
        body = ""
        for part in message.walk():
            if part.get_content_type() == "text/plain":
                body += part.get_payload(decode=True).decode(part.get_content_charset())
    else:
        body = message.get_payload(decode=True).decode(message.get_content_charset())
    
    email_data.append((str(subject),sender,email_sender,"your name",str(email_receiver),extraction_date, body))

In [4]:
for i in email_data[1]:
    print(i)
    print("-"*50)

Congratulations on completing Communication Skills for University Success
--------------------------------------------------
Communication Skills for University Success
--------------------------------------------------
no-reply@t.mail.coursera.org
--------------------------------------------------
Trinh LeNhat
--------------------------------------------------
trinhlnde170294@fpt.edu.vn
--------------------------------------------------
2022-12-27 09:03:16
--------------------------------------------------
Dear Trinh Le Nhat,


Congratulations on finishing the course! Well done on all yourhard work and 
active participation on the course! Coursera will be in contactwith details 
on how to download your certificate.


Also, don’t forget to take a look at the other MOOCs in this 
Specialization:


Information & Digital Literacy for University Success
<https://eventing.coursera.org/redirectSigned/eyJrZXkiOiJlbWFpbC5saW5rLm9wZW4iLCJ2YWx1ZSI6eyJ1cmwiOiJodHRwczovL3d3dy5jb3Vyc2VyYS5vcmcvbGVh

# Create Dataframe using PySpark

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit

In [6]:
spark = SparkSession.builder \
    .appName("DataFrame") \
    .getOrCreate()

columns = ["subject","sender","email_sender","receiver","email_receiver","date", "content"]

rows = [Row(*row) for row in email_data]
rdd = spark.sparkContext.parallelize(rows)
df = spark.createDataFrame(rdd, columns)
df.show()

+--------------------+--------------------+--------------------+------------+--------------------+-------------------+--------------------+
|             subject|              sender|        email_sender|    receiver|      email_receiver|               date|             content|
+--------------------+--------------------+--------------------+------------+--------------------+-------------------+--------------------+
|You have new or u...|            Coursera|no-reply@t.mail.c...|Trinh LeNhat|trinhlnde170294@f...|2024-04-03 15:15:20|You have received...|
|Congratulations o...|Communication Ski...|no-reply@t.mail.c...|Trinh LeNhat|trinhlnde170294@f...|2022-12-27 09:03:16|Dear Trinh Le Nha...|
|Get started with ...|            Coursera|no-reply@t.mail.c...|Trinh LeNhat|trinhlnde170294@f...|2024-03-20 17:59:39|[coursera logo]\r...|
|Could You Be Part...|Artificial Intell...|no-reply@t.mail.c...|Trinh LeNhat|trinhlnde170294@f...|2023-11-20 16:01:15|Hi Trinh Le Nhat,...|
|You're invited to..

In [7]:
senders = df.select('sender', 'email_sender').distinct()

length = senders.count()

df_with_id = senders.withColumn("id_sender", lit(length) + monotonically_increasing_id())

senders = df_with_id.select("id_sender", *[col for col in df_with_id.columns if col != "id_sender"])

senders.show()

+---------+--------------------+--------------------+
|id_sender|              sender|        email_sender|
+---------+--------------------+--------------------+
|       43|           Log2Base2|support@log2base2...|
|       44|Artificial Intell...|no-reply@t.mail.c...|
|       45|          Lam Le Huu|    LamLH9@fe.edu.vn|
|       46|     Phan Truong Lam|no-reply@t.mail.c...|
|       47|              Google|no-reply@accounts...|
|       48|Artificial Intell...|no-reply@t.mail.c...|
|       49|Communication Ski...|no-reply@t.mail.c...|
|       50|Information & Dig...|no-reply@t.mail.c...|
|       51|FPT University + ...|no-reply@t.mail.c...|
|       52|            Coursera|no-reply@t.mail.c...|
|       53|Luk danang Campus...|notifications+kwe...|
|       54|                    |noreplyfptunivers...|
|       55|                    |  it.fudn@fpt.edu.vn|
|       56|          Vu Bui Cao|      vubc@fe.edu.vn|
|       57|FPTStudentCareSystem| dvsv.fudn@fe.edu.vn|
|       58|           Hoàng 

In [8]:
receivers = df.select('receiver', 'email_receiver').distinct()

length = receivers.count()

df_with_id = receivers.withColumn("id_receiver", lit(length) + monotonically_increasing_id())

receivers = df_with_id.select("id_receiver", *[col for col in df_with_id.columns if col != "id_receiver"])

receivers.show()

+-----------+------------+--------------------+
|id_receiver|    receiver|      email_receiver|
+-----------+------------+--------------------+
|          1|Trinh LeNhat|trinhlnde170294@f...|
+-----------+------------+--------------------+



In [9]:
infor_email = df.join(senders, (df['sender'] == senders['sender']) & (df["email_sender"] == senders['email_sender']), "inner")

infor_email = infor_email.join(receivers, (infor_email['receiver'] == receivers['receiver']) & (infor_email["email_receiver"] == receivers['email_receiver']), "inner")

infor_email = infor_email.select(
    infor_email['subject'],
    infor_email['id_sender'],
    receivers['id_receiver'],
    infor_email['date'],
    infor_email['content']
)

length = infor_email.count()
df_with_id = infor_email.withColumn("id_email", lit(length) + monotonically_increasing_id())
infor_email = df_with_id.select("id_email", *[col for col in df_with_id.columns if col != "id_email"])

infor_email.show()

+--------+--------------------+---------+-----------+-------------------+--------------------+
|id_email|             subject|id_sender|id_receiver|               date|             content|
+--------+--------------------+---------+-----------+-------------------+--------------------+
|     331|Big Summer Sale |...|       43|          1|2023-05-31 13:17:38|<!DOCTYPE html> <...|
|     332|Could You Be Part...|       44|          1|2023-11-20 16:01:15|Hi Trinh Le Nhat,...|
|     333|The Secret to Eff...|       44|          1|2024-03-14 13:22:07|Hey Trinh Le Nhat...|
|     334| [RESFES_2024] V/...|       45|          1|2024-05-04 06:33:05|Dear bạn - trưởng...|
|     335|Your invitation i...|       46|          1|2024-01-19 08:53:53|<https://eventing...|
|     336|Your invitation i...|       46|          1|2024-01-23 02:49:21|<https://eventing...|
|     337|Enrol in 2-step v...|       47|          1|2023-05-23 01:43:10|[image: Google]\r...|
|     338|Enrol in 2-step v...|       47|         

# Insert to MySQL

In [11]:
import mysql.connector
from mysql.connector import Error

host = "127.0.0.1"
port = "3306"
db = "mail"
user = "user"
password = "password"
 
connection = mysql.connector.connect(
        host=host,
        database=db,
        user=user,
        password=password
    )

if connection.is_connected():
    cursor = connection.cursor()
    print("conneted")

conneted


In [12]:
try:
        create_senders_table = """
        CREATE TABLE IF NOT EXISTS senders (
            id_sender INT AUTO_INCREMENT PRIMARY KEY,
            sender VARCHAR(255) NOT NULL,
            email_sender VARCHAR(255) NOT NULL
        );
        """
        cursor.execute(create_senders_table)

        create_receivers_table = """
        CREATE TABLE IF NOT EXISTS receivers (
            id_receiver INT AUTO_INCREMENT PRIMARY KEY,
            receiver VARCHAR(255) NOT NULL,
            email_receiver VARCHAR(255) NOT NULL
        );
        """
        cursor.execute(create_receivers_table)

        create_emails_table = """
        CREATE TABLE IF NOT EXISTS emails (
            id_email INT AUTO_INCREMENT PRIMARY KEY,
            subject VARCHAR(255) NOT NULL,
            id_sender INT NOT NULL,
            id_receiver INT NOT NULL,
            date DATETIME NOT NULL,
            content TEXT NOT NULL,
            FOREIGN KEY (id_sender) REFERENCES senders(id_sender),
            FOREIGN KEY (id_receiver) REFERENCES receivers(id_receiver)
        );
        """ 
        cursor.execute(create_emails_table)
except Error as e:
    print(f"{e}")


In [13]:
for row in senders.collect():
    sql_query = """
                INSERT INTO senders (id_sender,sender, email_sender)
                VALUES (%s, %s, %s)
            """
    cursor.execute(sql_query, row)
        
connection.commit()
print("inserted data into senders")

inserted data into senders


In [14]:
for row in receivers.collect():
    sql_query = """
                INSERT INTO receivers (id_receiver,receiver,email_receiver)
                VALUES (%s, %s, %s)
            """
    cursor.execute(sql_query, row)
        
connection.commit()

print("inserted data into receivers")

inserted data into receivers


In [15]:
for row in infor_email.collect():
    sql_query = """
            INSERT INTO emails (id_email, subject, id_sender, id_receiver, date, content)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
    cursor.execute(sql_query, row)
        
connection.commit()

print("inserted data into email")

inserted data into email


# Elasticsearch API

In [4]:
from elasticsearch import Elasticsearch

cloud_id = "cloud_id"
username = "username" 
password = "password" 
cloud_auth = (username, password)

es = Elasticsearch(cloud_id=cloud_id, basic_auth=cloud_auth)

In [5]:
import mysql.connector
mysql_conn = mysql.connector.connect(
    port = "3306",
    db = "mail",
    user = "user",
    password = "password"
)

In [6]:
cursor = mysql_conn.cursor()
cursor.execute("SELECT * FROM senders")
results = cursor.fetchall()

for row in results:
    doc = {
        'id': row[0],
        'sender': row[1],
        'email_sender': row[2],
    }
    es.index(index='senders',id=row[0], body=doc)

In [7]:
cursor = mysql_conn.cursor()
cursor.execute("SELECT * FROM receivers")
results = cursor.fetchall()

for row in results:
    doc = {
        'id': row[0],
        'receiver': row[1],
        'email_receiver': row[2],
    }
    es.index(index='receivers',id=row[0], body=doc)

In [8]:
cursor = mysql_conn.cursor()
cursor.execute("SELECT * FROM emails")
results = cursor.fetchall()

for row in results:
    doc = {
        'id_email': row[0],
        'subject': row[1],
        'id_sender': row[2],
        'id_receiver': row[3],
        'date': row[4],
        'content': row[5]
    }
    es.index(index='emails',id=row[0], body=doc)

In [14]:
row_ = input("row : ")
search_keyword = input("search_keyword : ")

search_results = es.search(index='emails', body={'query': {'match': {row_: search_keyword}}})

for hit in search_results['hits']['hits']:
    id_email = hit['_source']['id_email']
    query = f"""select e.id_email,e.subject,s.sender,s.email_sender,r.receiver,r.email_receiver,e.date,e.content
                from emails as e
                inner join receivers as r on e.id_receiver = r.id_receiver
                inner join senders as s on e.id_sender = s.id_sender 
                where e.id_email = {id_email}
                """
    cursor = mysql_conn.cursor()
    cursor.execute(query)
    results = cursor.fetchall()
    for i in results:
        for j in i:
            print(j)
            print("-"*20)
        print("+"*100)

560
--------------------
Don't lose momentum!
--------------------
Coursera
--------------------
no-reply@t.mail.coursera.org
--------------------
Trinh LeNhat
--------------------
trinhlnde170294@fpt.edu.vn
--------------------
2022-12-27 09:50:17
--------------------
[coursera logo]


It only takes a few minutes


Hi Trinh Le Nhat! Welcome to Academic Skills for University Success: 
Capstone. Your first lecture only takes 1 minutes.

Welcome to the Academic Skills for University Success Specialization!
Lecture • 1 min
<https://eventing.coursera.org/redirectSigned/eyJrZXkiOiJlbWFpbC5saW5rLm9wZW4iLCJ2YWx1ZSI6eyJ1cmwiOiJodHRwczovL3d3dy5jb3Vyc2VyYS5vcmcvbGVhcm4vYWNhZGVtaWMtc2tpbGxzLXByb2plY3QvaXRlbS9ZU25zRj91dG1fbWVkaXVtPWVtYWlsJnV0bV9zb3VyY2U9b3RoZXImdXRtX2NhbXBhaWduPW9wZW5jb3Vyc2Uud2VsY29tZS5hY2FkZW1pYy1za2lsbHMtcHJvamVjdC5-b3BlbmNvdXJzZS53ZWxjb21lLktKenFydWNnRWVXeEt4S1pZekR2a3cuIiwidHJhY2tpbmciOnsidXNlcklkIjoxMTgwMjE4ODIsInVzZXJFbWFpbCI6InRyaW5obG5kZTE3MDI5NEBmcHQuZWR1LnZuIiwibm90aWZp