In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as Type
from pyspark.sql.window import Window

from datetime import datetime
import matplotlib.pyplot as plt



Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
8,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark = SparkSession.builder.appName("ExcelReadExample").getOrCreate()
excel_file_path = r'/home/glue_user/workspace/jupyter_workspace/Input_v2.xlsx'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# class DataValidator:
    # def __init__(self, excel_file_path):
    #     self.file_path = excel_file_path
    #     self.spark = SparkSession.builder.appName("Project1").getOrCreate()

def read_file(excel_file_path):
    df_pd = pd.read_excel(excel_file_path, engine='openpyxl')
    df0 = spark.createDataFrame(df_pd)
    return df0

# ----------------------- exploratory data analysis ----------------------- 

def data_analysis(excel_file_path):
    df0 = read_file(excel_file_path)

    # schema
    print(f'1. data schema')
    df0.printSchema()

    # statistical data summary
    print(f'2. statistical data summary')
    df0.describe().show()

    # check duplicates
    print(f'3. check duplicates')
    df0.groupBy(df0.columns).count().where(F.col('count') > 1).select(F.sum('count')).show()

    print(f'4. sample data')
    df0.show(5, truncate=False)

    # check NaN / NULL / empty string
    print(f'5. check NaN / NULL / empty string')
    for col, data_type in df0.dtypes:
        nan_count = df0.filter(F.isnan(F.col(col))).count()
        null_count = df0.filter(F.col(col).isNull()).count()
        empty_str_count = df0.filter(F.col(col)=='').count()

        if (nan_count > 0) or (null_count > 0) or (empty_str_count > 0):
            print(f"{col} with {data_type} -> nan: {nan_count}, null: {null_count}, '': {empty_str_count}")

def general_preprocess(excel_file_path, remove_nan=False):
    df0 = read_file(excel_file_path)

    # ----------------------- data pre-processing -----------------------

    # preprocess for column header
    # 1. remove leading and trailing space
    # 2. replace ' ' with '_'
    # 3. change uppercase char to lowercase char

    df1 = df0.select([F.col(col).alias(col.strip().replace(" ", "_").lower()) for col in df0.columns])
    
    if remove_nan is True:
        df1 = df1.na.drop()
        
    # cast double / long data type into int
    for col, data_type in df1.dtypes:
        if data_type != 'string':
            df1 = df1.withColumn(col, df1[col].cast(Type.IntegerType()))

    return df1


def eliminate_outliers(df, col_name):
    outlier_threshold = 3
    stats_summary = df.describe(col_name)
    mean_val = stats_summary.filter(stats_summary["summary"] == "mean").select(col_name).collect()[0][col_name]
    stddev_val = stats_summary.filter(stats_summary["summary"] == "stddev").select(col_name).collect()[0][col_name]

    df_with_z_score = df.withColumn(f"{col_name}_z_score", (F.col(col_name) - mean_val) / stddev_val)

    df_final = df_with_z_score.filter(F.abs(F.col(f"{col_name}_z_score")) < outlier_threshold) \
                                .drop(f"{column_name}_z_score")

    return df_final


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# ----------------------- question 1: group_by_age function -----------------------

def group_by_age(excel_file_path):
    df0 = general_preprocess(excel_file_path, remove_nan=True)

    curr_year = datetime.today().year

    # get age for each person
    df1 = df0.withColumn("age", curr_year - F.col("birth_year"))

    # Group by age and collect names into lists
    result_dict = (
                df1.groupBy("age")
                .agg({"name": "collect_list"})
                .withColumnRenamed("collect_list(name)", "names")  # collect categorized name into a list
                .orderBy("age")
                .select("age", "names")
                .rdd.collectAsMap()  # df -> rdd -> dict
    )

#     null_names = df0.filter(F.isnan(F.col("birth_year"))) \
#         .select("name") \
#         .rdd.flatMap(list).collect()

#     result_dict['NA'] = null_names
    
    return result_dict

result = group_by_age(excel_file_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():

In [7]:
# ----------------------- question 2: vowel in middle of the name -----------------------

def check_vowel(excel_file_path):
    df1 = general_preprocess(excel_file_path)

    # Define a UDF with a lambda function
    udf_is_vowel = F.udf(lambda name: len(name) % 2 == 1 and name[len(name)//2].lower() in ['a', 'e', 'i'], Type.BooleanType())

    # Add a new column "HasVowelInMiddle"
    df2 = df1.withColumn("vowel_in_middle", udf_is_vowel(F.col("name")))

    # collect into list
    result = df2.filter("vowel_in_middle == True").select("name").rdd.flatMap(lambda x: x).collect()

    return result

result = check_vowel(excel_file_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Blair', 'Blake', 'Brent', 'Archibald', 'Atwater', 'Avery', 'Jed', 'Keene', 'Keith', 'Kim', 'Grant', 'Hal', 'Ian', 'Isaac']
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():

In [9]:
# ----------------------- question 3: age category -----------------------

def age_category(excel_file_path):
    df1 = general_preprocess(excel_file_path, remove_nan=True)

    curr_year = datetime.today().year

    # get age for each person
    df1 = df1.withColumn("age", curr_year - F.col("birth_year"))

    # age group
    df1 = df1.withColumn(
            'AGE_GROUP',
            F.when(F.col("age").between(0, 17), 'TEENAGER')\
            .when(F.col('age').between(18,39), 'YOUNG')\
            .when(F.col('age').between(40,100), 'OLD')
            .otherwise('NA'))
    
    return df1

df3 = age_category(excel_file_path)
# df3.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------------+----------+------------+------------+-------------+-------+----------+---+---------+
| id|        name|birth_year|assignment_a|assignment_b|mid_term_exam|project|final_exam|age|AGE_GROUP|
+---+------------+----------+------------+------------+-------------+-------+----------+---+---------+
|D95|        Bert|      1985|          58|          70|           66|     90|        95| 39|    YOUNG|
|I67|       Bevis|      1965|          63|          65|           74|     75|        99| 59|      OLD|
|H45|      Blaine|      1984|          57|           0|           62|     90|        91| 40|      OLD|
|A68|       Blair|      1974|          90|          73|           59|     85|        94| 50|      OLD|
|B45|       Blake|      1955|          73|          56|           77|     95|        46| 69|      OLD|
|G56|        Bond|      1978|          99|          43|           73|     85|        75| 46|      OLD|
|C87|       Boris|      1984|          88|           0|           82|    

In [10]:
# ----------------------- question 4: check ID uniqueness  -----------------------
def check_id_uniqueness(excel_file_path):
    df1 = general_preprocess(excel_file_path)

    if df1.count() > df1.dropDuplicates(['id']).count():
        print('has duplicates in id')

        df2 = df1.groupby(['id']) \
                    .count() \
                    .filter('count > 1') \
        
        return df2

    else:
        print('no duplicate in id')

        
df = check_id_uniqueness(excel_file_path)
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

has duplicates in id
+---+-----+
| id|count|
+---+-----+
|Q28|    2|
|I67|    2|
|J29|    3|
|G56|    2|
|A68|    2|
+---+-----+

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():

In [12]:
# ----------------------- question 5: age group average  -----------------------


def calculate_and_save_average_age(excel_file_path):
    df3 = age_category(excel_file_path)

    # partition by the age group set in previous question and find the average
    window = Window.partitionBy("AGE_GROUP")
    avg_age = F.avg("age").over(window)
    df4 = df3.withColumn("AVERAGE", F.round(avg_age, 2))

    df4 = (df4.withColumnRenamed('id', 'ID')
                    .withColumnRenamed('name', 'NAME')
                    .withColumnRenamed('birth_year', 'BIRTHYEAR')
                    .withColumnRenamed('age', 'AGE'))

    # Get specific column and export .csv 
    df4.select("ID", "NAME", "BIRTHYEAR", "AGE", "AGE_GROUP", "AVERAGE") \
             .write \
             .csv("output.csv", header=True, mode="overwrite")

# calculate_and_save_average_age(excel_file_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():

In [19]:
df1 = general_preprocess(excel_file_path)
df1.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['id', 'name', 'birth_year', 'assignment_a', 'assignment_b', 'mid_term_exam', 'project', 'final_exam']
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():

In [15]:
# ----------------------- question 6: get grades  -----------------------

def get_grades(excel_file_path):
    df1 = general_preprocess(excel_file_path)
    
    df1 = eliminate_outliers(df1, 'assignment_a')
    eliminate_outliers(df1, 'assignment_b')
    eliminate_outliers(df1, 'mid_term_exam')
    eliminate_outliers(df1, 'project')
    eliminate_outliers(df1, 'final_exam')

    # provided weights
    weights = {
        "assignment_a": 0.07, 
        "assignment_b": 0.13, 
        "mid_term_exam": 0.20, 
        "project": 0.25, 
        "final_exam": 0.35
    }

    # create expression for weights
    weighted_avg_expr = sum(F.col(criteria) * percentage for criteria, percentage in weights.items())

    df1 = df1.withColumn("GRADE", weighted_avg_expr)
    
    return df1

# df3 = get_grades(excel_file_path)
# df3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+
|summary|  id|   name|       birth_year|      assignment_a|      assignment_b|    mid_term_exam|          project|        final_exam|
+-------+----+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+
|  count| 100|    100|              100|               100|               100|              100|              100|               100|
|   mean|null|   null|          1901.26|             74.11|             74.78|            68.29|            79.45|             81.09|
| stddev|null|   null|390.2527443137901|29.798276338192327|20.271680001938915|9.580382945046473|9.997348133228614|16.520629894807644|
|    min| A27|Anthony|                0|                 0|                 0|               52|               60|                 0|
|    max| Z18|   Kirk|             2000|               208|   

In [17]:
# ----------------------- question 7: draw a chart to show the trend of student -----------------------


def return_chart_student_performance():
    df = pd.read_excel(excel_file_path, engine='openpyxl')
    df.columns = [col.strip().lower().replace(' ', '_') for col in df.columns]

    df1 = df.query("""
        assignment_a <= 100 and \
        assignment_b <= 100 and \
        mid_term_exam <= 100 and \
        project <= 100 and \
        final_exam <= 100
    """)

    plt.figure(figsize=(10, 6))

    for index, row in df1.iterrows():
        student_name = row['name']
        student_scores = row[['assignment_a', 'assignment_b', 'mid_term_exam', 'project', 'final_exam']]

        plt.plot(student_scores, label=student_name)

    plt.xlabel('Assessment Type')
    plt.ylabel('Score')
    plt.title('Student Performance Trend During the Semester')
    plt.xticks(range(5), ['Assignment A', 'Assignment B', 'Mid Term Exam', 'Project', 'Final Exam'])
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True)
    plt.show()

return_chart_student_performance()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…