### TRƯỜNG ĐẠI HỌC CÔNG NGHIỆP
### THÀNH PHỐ HỒ CHÍ MINH
 
### KHOA Công nghệ Thông tin   
## ĐỀ THI GIỮA KỲ
### Môn thi : Nhập môn dữ liệu lớn 
### Lớp/Lớp học phần:  DHKHDL17A
* Thời gian làm bài: 75 phút (Không kể thời gian phát đề)
* Thí sinh được sử dụng tài liệu và tra cứu tại trang wed
  - https://spark.apache.org/
  - https://stackoverflow.com/
  - - https://learn.microsoft.com/en-us/sql/t-sql
* Thí sinh làm bài và lưu lại với định dạng mssv_hovaten_gk.ipynb. Ví dụ bạn có mã số sinh viên là: 12131411, họ và tên: Nguyễn Văn A, thì nộp bài với tên: **12131411_NguyenVanA_gk.ipynb**
* Thí sinh sử dụng dữ liệu *emails.csv*, điều chỉnh biến 'dataPath' ở cell đầu tiên lại cho đúng với đường dẫn đến file data.
* Hoàn thành tất cả các vị trí có chữ **# YOUR CODE HERE** để hoàn thành yêu cầu của mỗi hàm.
#### LƯU Ý: KHÔNG THAY ĐỔI NHỮNG CHỖ KHÁC

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.rdd import RDD
from pyspark.sql.types import Row

sc = SparkSession.builder \
    .appName("Email Data Processing") \
    .getOrCreate()

dataPath = "./data/email.csv"

In [2]:
dataPath = os.environ.get("DATA_MIDTERM") or dataPath

In [3]:
#0.5
def loadAndProcessCsv(filePath: str, spark: SparkSession) -> DataFrame:
    '''
    This function loads a CSV file into a Spark DataFrame, caches it, 
    drops rows with null values, and prints the schema. (using option when read to keep format - header, columns)
    
    Args:
        file_path (str): Path to the CSV file.
        spark (SparkSession): Active Spark session.
        
    Returns:
        DataFrame: Processed DataFrame.
    '''

    data = None
    ### BEGIN SOLUTION 
    data = spark.read.format("csv") \
        .option("header", "true") \
        .option("multiLine", "true") \
        .option("escape", "\"") \
        .option("inferSchema", "true") \
        .load(filePath)
    data.cache()
    data = data.dropna()
    data.printSchema()
    ### END SOLUTION
    return data

In [4]:
data = loadAndProcessCsv(dataPath,sc)

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [5]:
data.columns

['file', 'message']

In [6]:
### BEGIN HIDDEN TESTS
dataPathTest = os.environ.get("DATA_MIDTERM_TEST")
def loadAndProcessCsvSolution(filePath: str, spark: SparkSession) -> DataFrame:
    data = spark.read.format("csv") \
        .option("header", "true") \
        .option("multiLine", "true") \
        .option("escape", "\"") \
        .option("inferSchema", "true") \
        .load(filePath)
    data.cache()
    data = data.dropna()
    data.printSchema()
    return data
dataTest = loadAndProcessCsvSolution(dataPathTest, sc)
assert isinstance(dataTest, DataFrame), "loadAndProcessCsv() don't return correct dataType"
assert dataTest.columns == ['file', 'message'], "loadAndProcessCsv does not contain the correct columns"
### END HIDDEN TESTS

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [7]:
# 0.5đ
def createRDD(data: DataFrame)->RDD[Row]:
    '''
    This function converts a Spark DataFrame into an RDD of Rows.
    
    Args:
        data (DataFrame): Input DataFrame containing data loaded by Spark.
        
    Returns:
        RDD[Row]: An RDD containing all Rows from the input DataFrame.
    '''
    outRDD = None

    ### BEGIN SOLUTION
    outRDD = data.rdd
    ### END SOLUTION
    
    return outRDD

In [8]:
emailRDD = createRDD(data)
assert isinstance(emailRDD, RDD), "createRDD() does not return the correct data type (RDD)"
assert isinstance(emailRDD.first(),Row), "createRDD() contains elements that are not of type Row"

In [9]:
### BEGIN HIDDEN TESTS
def createRRDSolution(dataF):
    return  dataF.rdd
rddTest = createRRDSolution(dataTest)
result = createRDD(dataTest)
assert isinstance(result, RDD), "createRDD() don't return correct dataType"
assert isinstance(result.first(),Row), "createRDD() have element not correct datatype"
assert result.count() == 500, "createRDD() does not contain the correct number of elements"
### END HIDDEN TESTS

In [10]:
import email
from typing import List, Optional
from pyspark.sql.types import Row

def splitEmailAddresses(emailString: str) -> List[Optional[str]]:
    '''
    The function splits a comma-separated string of email addresses into a unique list.
    
    Args:
        emailString: A string containing email addresses separated by commas.
        
    Returns:
        A list of unique email addresses.
    '''
    if emailString:
        addresses = emailString.split(',')
        uniqueAddresses = list(frozenset(map(lambda x: x.strip(), addresses)))
        return uniqueAddresses
    return []

def extractEmailDetailsFromRawText(rawEmail: str) -> Row:
    '''
    The function extracts relevant details from a raw email message string.
    
    Args:
        rawEmail: A string representing the raw email message.
        
    Returns:
        A Row object containing the extracted email details.
    '''
    emailMessage = email.message_from_string(rawEmail)
    emailContentParts = []
    for part in emailMessage.walk():
        if part.get_content_type() == 'text/plain':
            emailContentParts.append(part.get_payload())

    emailContent = ''.join(emailContentParts)

    fromAddresses = splitEmailAddresses(emailMessage.get("From"))
    toAddresses = splitEmailAddresses(emailMessage.get("To"))
    ccEmail = splitEmailAddresses(emailMessage.get("Cc"))
    return Row(
        Date=emailMessage.get("Date"),
        From=fromAddresses, 
        To=toAddresses, 
        Subject=emailMessage.get("Subject"), 
        CC=ccEmail, 
        Content=emailContent
    )

# Extract structured email details from the first email message
firstEmailData = data.first()
structuredEmail = extractEmailDetailsFromRawText(firstEmailData.message)
structuredEmail


Row(Date='Thu, 1 Feb 2001 08:00:00 -0800 (PST)', From=['tana.jones@enron.com'], To=['darren.vanek@enron.com', 'carol.clair@enron.com', 'janie.aguayo@enron.com', 'debbie.brackett@enron.com', 'sara.shackleton@enron.com', 'brent.hendry@enron.com', 'marilyn.colbert@enron.com', 'larry.hunter@enron.com', 'rudwell.johnson@enron.com', 'kevin.meredith@enron.com', 'russell.diamond@enron.com', 'celeste.cisneros@enron.com', 'tanya.rohauer@enron.com', 'lesli.campbell@enron.com', 'paul.radous@enron.com', 'bill.hare@enron.com', 'diane.anderson@enron.com', 'tom.moran@enron.com', 'frank.sayre@enron.com', 'frank.davis@enron.com', 'mary.cook@enron.com', 'william.bradford@enron.com', 'lisa.lees@enron.com', 'georgi.landau@enron.com', 'samuel.schott@enron.com', 'susan.bailey@enron.com', 'samantha.boyd@enron.com', 'derek.bailey@enron.com', 'bob.bowen@enron.com', 'laurel.adams@enron.com', 'gordon.heaney@enron.com', 'carrie.southard@enron.com', 'adnan.patel@enron.com', 'dale.neuner@enron.com', 'melissa.murphy@

In [11]:
#0.5đ
def createStructuredEmailRDD(emailRDD: RDD[Row]) -> RDD[Row]:
    '''
    The function takes an RDD of email messages and converts it into a new RDD containing structured email details.
    
    Args:
        emailRDD: An RDD where each Row contains an email message in raw text format.
        
    Returns:
        A new RDD where each element is a Row with structured email details such as Date, From, To, Subject, CC, and Content.
    '''
    structuredEmailRDD = None
    ### BEGIN SOLUTION
    structuredEmailRDD = emailRDD.map(lambda row: extractEmailDetailsFromRawText(row.message))
    ### END SOLUTION
    return structuredEmailRDD

In [12]:
structuredEmailRDD = createStructuredEmailRDD(emailRDD)
assert isinstance(structuredEmailRDD, RDD), "createStructuredEmailRDD() doesn't return an RDD"
assert isinstance(structuredEmailRDD.first(), Row), "createStructuredEmailRDD() elements are not of type Row"

In [13]:
### BEGIN HIDDEN TESTS
def createStructuredEmailRDDSolution(emailRDD):
    return emailRDD.map(lambda row: extractEmailDetailsFromRawText(row.message))
result =  createStructuredEmailRDD(rddTest)
firstRow = result.first()
assert "Date" in firstRow, "'Date' field is missing in the structured Row"
assert "From" in firstRow, "'From' field is missing in the structured Row"
assert "To" in firstRow, "'To' field is missing in the structured Row"
assert "Subject" in firstRow, "'Subject' field is missing in the structured Row"
assert "CC" in firstRow, "'CC' field is missing in the structured Row"
assert "Content" in firstRow, "'Content' field is missing in the structured Row"

assert firstRow.Date == 'Thu, 1 Feb 2001 08:00:00 -0800 (PST)', "structuredEmailRDD() have error when get content from Row"
### END HIDDEN TESTS

In [14]:
#1.
def countNumberEmail(structuredEmailRDD: RDD[Row], k: int)->int:
    '''
    The function counts the number of emails with more than `k` email addresses in the 'To' field.
    
    Args:
    - structuredEmailRDD: RDD of Row objects, each containing an email's structured data.
    - k: The threshold for the number of emails in the 'To' field.
    
    Returns:
    - int: The count of emails with more than `k` email addresses in the 'To' field.
    '''
    count = -1
    ### BEGIN SOLUTION
    count =  structuredEmailRDD.map(lambda row: 1 if len(row.To) > k else 0).reduce(lambda a, b: a + b)
    ### END SOLUTION
    return count

In [15]:
countNumberEmail(structuredEmailRDD,40)

647

In [16]:
### BEGIN HIDDEN TESTS
structuredEmailRDDTest = createStructuredEmailRDDSolution(rddTest)
countTest = countNumberEmail(structuredEmailRDDTest,12)
assert countTest == 39, "Count return wrong number email"
### END HIDDEN TESTS

In [17]:
# 1đ
def countUniqueEmailDomains(structuredEmailRDD: RDD[Row], k) -> int:
    '''
    This function counts the number of unique email domains in the "CC" field using `map` and `reduce`.
    
    Args:
    - structuredEmailRDD: An RDD containing Row objects, each representing an email's structured data.
    
    Returns:
    - dict: A dictionary showing the count of emails from each unique domain in the "CC" field.
      Example:
        If k = 3, the result might look like:
        {
          'enron.com': 16452,
          'aol.com': 122,
          'hotmail.com': 101
        }
    '''
    results = {}
    ### BEGIN SOLUTION
    domainCounts = structuredEmailRDD.flatMap(lambda row: [email.split('@')[1] for email in row.CC]) \
                                    .map(lambda domain: (domain, 1)) \
                                    .reduceByKey(lambda a, b: a + b)
    sortedDomainCounts = domainCounts.sortBy(lambda x: x[1], ascending=False)
    topDomain = sortedDomainCounts.take(k)
    results = {i:v for i,v in topDomain}
    ### END SOLUTION
    return results
    

In [18]:
countUniqueEmailDomains(structuredEmailRDD, 10)

{'enron.com': 18864,
 'aol.com': 92,
 'hotmail.com': 79,
 'duke-energy.com': 68,
 'haas.berkeley.edu': 50,
 'caiso.com': 47,
 'yahoo.com': 44,
 'enron.com>': 43,
 'iepa.com': 37,
 'akllp.com': 35}

In [19]:
### BEGIN HIDDEN TESTS
countDomains = countUniqueEmailDomains(structuredEmailRDDTest,3)
assert countDomains == {'enron.com': 447, 'socalgas.com': 6, 'haas.berkeley.edu': 5}, "countUniqueEmailDomains wrong answer"
### END HIDDEN TESTS

In [20]:
# 0.5đ ;
def countEmailsBySubject(structuredEmailRDD: RDD[Row], subjectContent: str) -> int:
    '''
    The function count num emails the dataset to the subject have included "subjectContent", case insensitive

    Args:
    - structuredEmailRDD: RDD of Row objects containing an email's structured data.
    - subSubject: The content of the subject to filter by.

    Returns:
    - numEmails: Subject include content have substring subjectContent
    '''
    numEmails =  -1
    ### BEGIN SOLUTION
    filteredEmailsRDD = structuredEmailRDD.filter(lambda emailRow: subjectContent.lower() in emailRow.Subject.lower())
    numEmails = filteredEmailsRDD.count()
     ### END SOLUTION
    return numEmails

In [21]:
countEmailsBySubject(structuredEmailRDD, "bank")

78

In [22]:
### BEGIN HIDDEN TESTS
countEmailsByRecipient =  countEmailsBySubject(structuredEmailRDDTest, "email")
assert countEmailsByRecipient == 2, "countEmailsByRecipient wrong answer"
assert countEmailsBySubject(structuredEmailRDDTest, "email") == countEmailsBySubject(structuredEmailRDDTest, "EMaIL"), "countEmailsByRecipient wrong answer"
### END HIDDEN TESTS

In [23]:
# 1.5d
def getTopKFrequentWordsInContentBySender(structuredEmailRDD: RDD[Row], sender: str, k: int) -> dict:
    '''
    This function filtered emails in the dataset sent by a specific sender's email,
    and then returns the top k most frequent words found in the content of those filtered emails.

    Args:
    - structuredEmailRDD: RDD of Row objects, where each Row represents an email with structured data, such as subject and content.
    - sender: sender's email which needs to count.
    - k: The number of most frequent words to return.

    Returns:
    - topKWordsDict: A dictionary containing the top k most frequent words found in the content of filtered emails.
                     The keys are the words, and the values are their frequencies, representing how often they appear in the content of the filtered emails.
                    example:
                    {'to': 12,
                     'the': 9,
                     'your': 8,
                     'a': 5,
                     'is': 4,
                     'survey': 4,
                     'and': 4,
                     'you': 4,
                     'of': 3,
                     'very': 3}
    '''
    topKWordsDict = {}
    ### BEGIN SOLUTION
    filteredEmailsRDD = structuredEmailRDD.filter(lambda emailRow: sender in emailRow.From)
    
    wordCountRDD = filteredEmailsRDD.flatMap(lambda emailRow: emailRow.Content.lower().split()) \
                                    .map(lambda word: (word, 1)) \
                                    .reduceByKey(lambda a, b: a + b)
    sortedWordCount = wordCountRDD.collect()
    sortedWordCount.sort(key=lambda x: x[1], reverse=True)
    topKWordsDict = dict(sortedWordCount[:k])
    ### END SOLUTION
    return topKWordsDict

In [24]:
getTopKFrequentWordsInContentBySender(structuredEmailRDD, "tana.jones@enron.com", 10)

{'the': 1501,
 'to': 962,
 'and': 703,
 'of': 642,
 'a': 524,
 'i': 487,
 'on': 448,
 'you': 447,
 'for': 441,
 'tana': 391}

In [25]:
### BEGIN HIDDEN TESTS
topkWord = getTopKFrequentWordsInContentBySender(structuredEmailRDDTest, "tana.jones@enron.com", 4)
assert topkWord == {'the': 25, 'a': 20, 'to': 14, '=20': 14}, "getTopKFrequentWordsInContentBySender wrong answer"
### END HIDDEN TESTS

In [26]:
# SQL Query
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, size, to_date, year, month, weekofyear
sc.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Convert RDD to DataFrame
dfEmais = structuredEmailRDD.toDF()
dfEmais = dfEmais.withColumn('DateTime', to_date(col('Date'), "EEE, d MMM yyyy HH:mm:ss Z"))
dfEmais = dfEmais.withColumn('Num_To', size(col('To')))
dfEmais = dfEmais.withColumn('Num_CC', size(col('CC')))

In [27]:
sqlContext = SQLContext(sc)
tableName = "Emails"
dfEmais.createOrReplaceTempView(tableName)



In [28]:
# 0.5đ
def getTopKRowsBySQL(sqlContext: SQLContext, tableName: str, k: int) -> DataFrame:
    '''
    This function queries the first k rows from a given table using SQLContext.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table from which the rows will be selected.
    - k: The number of rows to return. It must be a positive integer.

    Returns:
    - result: A DataFrame containing the first k rows of data from the specified table.
                If the table contains fewer than k rows, the DataFrame will contain all available rows.
    '''
    ### BEGIN SOLUTION
    assert isinstance(sqlContext, SQLContext), f"sqlContext is not the correct data type, expected SQLContext but got {type(sqlContext)}"
    assert isinstance(tableName, str), f"tableName is not the correct data type, expected string but got {type(tableName)}"
    assert isinstance(k, int) and k > 0, f"k must be a positive integer, but got {k}"

    query = f"SELECT * FROM {tableName} LIMIT {k}"
    
    result = sqlContext.sql(query)
    ### END SOLUTION
    return result


def getTopKRowsByDFOperations(dataFrame: DataFrame, k: int) -> DataFrame:
    '''
    This function queries the first k rows from a given table using SQLContext.

    Args:
    - dataFrame: An DataFrame data
    - tableName: The name of the table from which the rows will be selected.
    - k: The number of rows to return. It must be a positive integer.

    Returns:
    - resultDF: A DataFrame containing the first k rows of data from the specified table.
                If the table contains fewer than k rows, the DataFrame will contain all available rows.
    '''
    ### BEGIN SOLUTION
    assert isinstance(dataFrame, DataFrame), f"sqlContext is not the correct data type, expected SQLContext but got {type(dataFrame)}"
    assert isinstance(k, int) and k > 0, f"k must be a positive integer, but got {k}"
    result =  dataFrame.limit(k)
    ### END SOLUTION
    return result

In [29]:
getTopKRowsBySQL(sqlContext,tableName,1).show()

+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|                Date|                From|                  To|         Subject|                  CC|             Content|  DateTime|Num_To|Num_CC|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|Thu, 1 Feb 2001 0...|[tana.jones@enron...|[carol.clair@enro...|Deutsche Bank AG|[larry.gagliardi@...|We have received ...|2001-02-01|    64|     3|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+



In [30]:
getTopKRowsByDFOperations(dfEmais,1).show()

+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|                Date|                From|                  To|         Subject|                  CC|             Content|  DateTime|Num_To|Num_CC|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|Thu, 1 Feb 2001 0...|[tana.jones@enron...|[carol.clair@enro...|Deutsche Bank AG|[larry.gagliardi@...|We have received ...|2001-02-01|    64|     3|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+



In [31]:
### BEGIN HIDDEN TESTS
dfEmaisTest = structuredEmailRDDTest.toDF()
dfEmaisTest = dfEmaisTest.withColumn('DateTime', to_date(col('Date'), "EEE, d MMM yyyy HH:mm:ss Z"))
dfEmaisTest = dfEmaisTest.withColumn('Num_To', size(col('To')))
dfEmaisTest = dfEmaisTest.withColumn('Num_CC', size(col('CC')))
tableNameTest = "EmailsTest"
dfEmaisTest.createOrReplaceTempView(tableNameTest)
def getTopKRowsBySQLSolution(sqlContext: SQLContext, tableName: str, k: int) -> DataFrame:
    query = f"SELECT * FROM {tableName} LIMIT {k}"
    result = sqlContext.sql(query)
    return result
assert getTopKRowsBySQL(sqlContext,tableNameTest,5).take(4) == getTopKRowsBySQLSolution(sqlContext,tableNameTest,5).take(4), "Wrong in function getTopKRowsBySQL"
assert getTopKRowsBySQL(sqlContext,tableNameTest,1).take(4) == getTopKRowsBySQLSolution(sqlContext,tableNameTest,1).take(4), "Wrong in function getTopKRowsBySQL"
### END HIDDEN TESTS

In [32]:
### BEGIN HIDDEN TESTS    
assert getTopKRowsByDFOperations(dfEmaisTest,5).take(4) == dfEmaisTest.limit(5).take(4), "Wrong in function getTopKRowsByDFOperations"
assert getTopKRowsByDFOperations(dfEmaisTest,1).take(4) == dfEmaisTest.limit(1).take(4), "Wrong in function getTopKRowsByDFOperations"
### END HIDDEN TESTS

In [33]:
# 1đ, 6.5đ
def countEmailsWithToGreaterThanKBySQL(sqlContext: SQLContext, tableName: str, k: int) -> int:
    '''
    This function uses SQL to count the number of emails with more than k email addresses in the To field.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.
    - k: The threshold number of email addresses in the To field.

    Returns:
    - count: An integer representing the number of emails where the To field has more than k email addresses.
    '''
    ### BEGIN SOLUTION
    assert isinstance(sqlContext, SQLContext), f"sqlContext is not the correct data type, expected SQLContext but got {type(sqlContext)}"
    assert isinstance(tableName, str), f"tableName is not the correct data type, expected string but got {type(tableName)}"
    assert isinstance(k, int) and k >= 0, f"k must be a non-negative integer, but got {k}"
    query = f"SELECT COUNT(*) as count FROM {tableName} WHERE size(To) > {k}"
    resultDF = sqlContext.sql(query)
    count = resultDF.collect()[0]['count']
    ### END SOLUTION
    return count
def countEmailsWithToGreaterThanKByDFOperations(dataFrame: DataFrame, k: int) -> int:
    '''
    This function counts the number of emails with more than k email addresses in the To field using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.
    - k: The threshold number of email addresses in the To field.

    Returns:
    - count: An integer representing the number of emails where the To field has more than k email addresses.
    '''
    ### BEGIN SOLUTION
    assert isinstance(dataFrame, DataFrame), f"dataFrame is not the correct data type, expected DataFrame but got {type(dataFrame)}"
    assert isinstance(k, int) and k >= 0, f"k must be a non-negative integer, but got {k}"

    result = dataFrame.filter(size(col('To')) > k).count()
    ### END SOLUTION
    return result


In [34]:
countEmailsWithToGreaterThanKBySQL(sqlContext,tableName,50)

525

In [35]:
### BEGIN HIDDEN TESTS
assert countEmailsWithToGreaterThanKBySQL(sqlContext,tableNameTest,50) == 12, "Wrong in function countEmailsWithToGreaterThanKBySQL"
assert countEmailsWithToGreaterThanKBySQL(sqlContext,tableNameTest,10) == 46, "Wrong in function countEmailsWithToGreaterThanKBySQL"
### END HIDDEN TESTS

In [36]:
### BEGIN HIDDEN TESTS
assert countEmailsWithToGreaterThanKByDFOperations(dfEmaisTest, 50) == 12, "Wrong in function countEmailsWithCCGreaterThanKByDFOperations"
assert countEmailsWithToGreaterThanKByDFOperations(dfEmaisTest, 10) == 46, "Wrong in function countEmailsWithCCGreaterThanKByDFOperations"
### END HIDDEN TESTS

In [37]:
#1đ, 7.5

def countEmailsWithMoreToThanCCInJanuaryBySQL(sqlContext: SQLContext, tableName: str) -> int:
    '''
    This function calculates the number of emails sent in January where the Num_To is greater than Num_CC using SQL.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.

    Returns:
    - email_count: An integer representing the number of emails that meet the criteria.
    '''
    
    assert isinstance(sqlContext, SQLContext), f"sqlContext is not the correct data type, expected SQLContext but got {type(sqlContext)}"
    assert isinstance(tableName, str), f"tableName is not the correct data type, expected string but got {type(tableName)}"

    query = f"""
        SELECT COUNT(*) as email_count
        FROM {tableName}
        WHERE Num_To > Num_CC
        AND MONTH(DateTime) = 1
    """
    
    resultDF = sqlContext.sql(query)
    email_count = resultDF.collect()[0]['email_count']
    return email_count

def countEmailsWithMoreToThanCCInJanuaryByDFOperations(dataFrame: DataFrame) -> int:
    '''
    This function calculates the number of emails sent in January where the Num_To is greater than Num_CC using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.

    Returns:
    - email_count: An integer representing the number of emails that meet the criteria.
    '''
    
    assert isinstance(dataFrame, DataFrame), f"dataFrame is not the correct data type, expected DataFrame but got {type(dataFrame)}"

    result = dataFrame.filter((col("Num_To") > col("Num_CC")) & (month(col("DateTime")) == 1)).count()
    return result


In [38]:
countEmailsWithMoreToThanCCInJanuaryBySQL(sqlContext, tableName)

1567

In [39]:
countEmailsWithMoreToThanCCInJanuaryByDFOperations(dfEmais)

1567

In [40]:
### BEGIN HIDDEN TESTS
import datetime
assert countEmailsWithMoreToThanCCInJanuaryBySQL(sqlContext, tableNameTest) == 43, "Wrong in function countEmailsWithMoreToThanCCInJanuaryBySQL"
### END HIDDEN TESTS

In [41]:
### BEGIN HIDDEN TESTS
import datetime
assert countEmailsWithMoreToThanCCInJanuaryByDFOperations(dfEmaisTest) == 43, "Wrong in function countEmailsWithMoreToThanCCInJanuaryByDFOperations"
### END HIDDEN TESTS

In [42]:
# 1đ, 8.5
def countEmailsInMonthBySQL(sqlContext: SQLContext, tableName: str, monthValue: int) -> int:
    '''
    This function calculates the number of emails sent in a given year by performing a group by operation on the DateTime column using SQL.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.
    - monthValue: The specific month to filter the emails by (1-12).

    Returns:
    - email_count: An integer representing the number of emails sent in the given date.
    '''
    ### BEGIN SOLUTION    
    assert isinstance(sqlContext, SQLContext), f"sqlContext is not the correct data type, expected SQLContext but got {type(sqlContext)}"
    assert isinstance(tableName, str), f"tableName is not the correct data type, expected string but got {type(tableName)}"
    assert isinstance(monthValue, int) and monthValue > 0 and monthValue <= 12, f"monthValue must be a positive integer and in (1,30), but got {monthValue}"

    query = f"""
        SELECT COUNT(*) as email_count
        FROM {tableName}
        WHERE MONTH(DateTime) = {monthValue}
    """
    
    resultDF = sqlContext.sql(query)
    emailCount = resultDF.collect()[0]['email_count']
    ### END SOLUTION
    return emailCount

def countEmailsInMonthByDFOperations(dataFrame: DataFrame, monthValue: int) -> int:
    '''
    This function calculates the number of emails sent in a given month by performing a filter operation on the DateTime column using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.
    - monthValue: The specific date to filter the emails by.

    Returns:
    - email_count: An integer representing the number of emails sent in the given monthValue.
    '''
    assert isinstance(dataFrame, DataFrame), f"dataFrame is not the correct data type, expected DataFrame but got {type(dataFrame)}"
    assert isinstance(monthValue, int) and monthValue > 0 and monthValue <= 12, f"date must be a positive integer, but got {monthValue}"
    ### BEGIN SOLUTION 

    result = dataFrame.filter(month(col("DateTime")) == monthValue).count()
    ### END SOLUTION
    return result


In [43]:
countEmailsInMonthBySQL(sqlContext,tableName,5)

1826

In [44]:
countEmailsInMonthByDFOperations(dfEmais,5)

1826

In [45]:
### BEGIN HIDDEN TESTS
assert countEmailsInMonthBySQL(sqlContext,tableNameTest,5) == 46, "Wrong in function countEmailsInMonthBySQL"
assert countEmailsInMonthBySQL(sqlContext,tableNameTest,1) == 51, "Wrong in function countEmailsInYearBySQL"
### END HIDDEN TESTS

In [46]:
### BEGIN HIDDEN TESTS
assert countEmailsInMonthByDFOperations(dfEmaisTest,5) == 46, "Wrong in function countEmailsInYearByDFOperations"
assert countEmailsInMonthByDFOperations(dfEmaisTest,1) == 51, "Wrong in function countEmailsInYearByDFOperations"
### END HIDDEN TESTS

In [47]:
# 1đ
def countEmailsPerWeekBySQL(sqlContext: SQLContext, tableName: str) -> DataFrame:
    '''
    This function calculates the number of emails sent per Week by performing a group by operation on the DateTime column 
    and sorts the result in descending order by the number of emails.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.

    Returns:
    - resultDF: A DataFrame containing the number of emails sent per Week, sorted in descending order by num week.
                With schema:
                    root
                     |-- week: integer (nullable = true)
                     |-- count: long (nullable = false)
                    
    '''
    assert isinstance(sqlContext, SQLContext), f"sqlContext is not the correct data type, expected SQLContext but got {type(sqlContext)}"
    assert isinstance(tableName, str), f"tableName is not the correct data type, expected string but got {type(tableName)}"
    ### BEGIN SOLUTION 
    query = f"""
        SELECT WEEKOFYEAR(DateTime) as week, COUNT(*) as count
        FROM {tableName}
        GROUP BY WEEKOFYEAR(DateTime)
        ORDER BY week DESC
    """
    
    result = sqlContext.sql(query)
    ### END SOLUTION 
    return result


def countEmailsPerWeekByDFOperations(dataFrame: DataFrame) -> DataFrame:
    '''
    This function calculates the number of emails sent per Week by performing a group by operation on the DateTime column 
    and sorts the result in descending order by the number of emails.

    Args:
    - dataFrame: A DataFrame containing the email data.

    Returns:
    - resultDF: A DataFrame containing the number of emails sent per Week, sorted in descending order by num week.
                With schema:
                    root
                     |-- week: integer (nullable = true)
                     |-- count: long (nullable = false)
    '''
    assert isinstance(dataFrame, DataFrame), f"dataFrame is not the correct data type, expected DataFrame but got {type(dataFrame)}"
    ### BEGIN SOLUTION
    result = dataFrame.groupBy(weekofyear(col("DateTime")).alias("week")).count()
    result = result.orderBy(col("week").desc())
    ### END SOLUTION
    return result

In [48]:
countEmailsPerWeekBySQL(sqlContext,tableName).show()

+----+-----+
|week|count|
+----+-----+
|  52|  193|
|  51|  338|
|  50|  616|
|  49|  491|
|  48|  700|
|  47|  518|
|  46|  628|
|  45|  481|
|  44|  503|
|  43|  768|
|  42|  576|
|  41|  405|
|  40|  349|
|  39|  350|
|  38|  285|
|  37|  311|
|  36|  227|
|  35|  307|
|  34|  285|
|  33|  225|
+----+-----+
only showing top 20 rows



In [49]:
countEmailsPerWeekByDFOperations(dfEmais).show()

+----+-----+
|week|count|
+----+-----+
|  52|  193|
|  51|  338|
|  50|  616|
|  49|  491|
|  48|  700|
|  47|  518|
|  46|  628|
|  45|  481|
|  44|  503|
|  43|  768|
|  42|  576|
|  41|  405|
|  40|  349|
|  39|  350|
|  38|  285|
|  37|  311|
|  36|  227|
|  35|  307|
|  34|  285|
|  33|  225|
+----+-----+
only showing top 20 rows



In [50]:
### BEGIN HIDDEN TESTS
def countEmailsPerWeekBySQLSolution(sqlContext: SQLContext, tableName: str) -> DataFrame:
    query = f"""
        SELECT WEEKOFYEAR(DateTime) as week, COUNT(*) as count
        FROM {tableName}
        GROUP BY WEEKOFYEAR(DateTime)
        ORDER BY week DESC
    """
    result = sqlContext.sql(query)
    return result
   
assert countEmailsPerWeekBySQL(sqlContext,tableNameTest).take(10)  == countEmailsPerWeekBySQLSolution(sqlContext,tableNameTest).take(10), "Wrong in function countEmailsPerWeekBySQL"
assert countEmailsPerWeekBySQL(sqlContext,tableNameTest).take(1)  == countEmailsPerWeekBySQLSolution(sqlContext,tableNameTest).take(1), "Wrong in function countEmailsPerWeekBySQL"
assert countEmailsPerWeekBySQL(sqlContext,tableNameTest).take(1000)  == countEmailsPerWeekBySQLSolution(sqlContext,tableNameTest).take(1000), "Wrong in function countEmailsPerWeekBySQL"
### END HIDDEN TESTS

In [51]:
### BEGIN HIDDEN TESTS
def countEmailsPerWeekByDFOperationsSolution(dataFrame: DataFrame) -> DataFrame:
    result = dataFrame.groupBy(weekofyear(col("DateTime")).alias("week")).count()
    result = result.orderBy(col("week").desc())
    return result

assert countEmailsPerWeekByDFOperations(dfEmaisTest).take(1) == countEmailsPerWeekByDFOperationsSolution(dfEmaisTest).take(1), "Wrong in function countEmailsPerDayByDFOperations"
assert countEmailsPerWeekByDFOperations(dfEmaisTest).take(5) == countEmailsPerWeekByDFOperationsSolution(dfEmaisTest).take(5), "Wrong in function countEmailsPerDayByDFOperations"
assert countEmailsPerWeekByDFOperations(dfEmaisTest).take(1000) == countEmailsPerWeekByDFOperationsSolution(dfEmaisTest).take(1000), "Wrong in function countEmailsPerDayByDFOperations"
### END HIDDEN TESTS