In [0]:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
import pyspark.sql.functions as F
from collections import defaultdict
import matplotlib.pyplot as plt

In [0]:
%python
## step1: extract data from dbfs with spark
# Start a Spark session
try:
    spark = SparkSession.builder.appName("CSVIngestion").getOrCreate()
    # Read CSV file
    df = spark.read.format("csv").\
    option("header", "true").\
    load("dbfs:/FileStore/shared_uploads/linkekk@ad.unc.edu/OccupationalEmploymentandWageStatistics_final.csv")  
    # Show the first few rows of the DataFrame
    df.head()
    # Transform to data table
    df.createOrReplaceTempView("kk_table")
    result = spark.sql("SELECT * FROM kk_table WHERE Occupation = 'Legal'")
    result.show()
except Exception as e:
    print(f'Error during data extraction:{e}')


Row(Area='North Carolina', Time Period='2022', Occupation='Management', Occupation Code='110000', Industry='Total, All Industries', Employment='285,550', 10th %='$52,280', 25th %='$75,480', Entry level='$60,590', Median='$104,240', Mean='$126,640', Experienced='$159,180', 75th %='$158,220', 90th %='$208,000')

In [0]:
%python
## step2: Transform Data extracted from source file
# preprocess the data table
# 1. delete the characters in string and convert to integer
try:
    myHash = defaultdict(list)

    columns = ['10th %', '25th %', 'Entry level', 'Median', 'Mean', 'Experienced', \
    '75th %', '90th %']

    for column in columns:
        df = df.withColumn(column, regexp_replace(col(column), "[\$,]", "").cast("integer"))

except Exception as e:
    print(f'Error during characters deletion:{e}')

# 2. rename the table header
# Renaming columns
try:
    new_column_names = ['Area', 'TimePeriod', 'Occupation','OccupationCode','Industry','Employment','10th','25th','EntryLevel','Median','Mean','Experienced','75th','90th']  # List your new column names here

    for old_name, new_name in zip(df.columns, new_column_names):
        df = df.withColumnRenamed(old_name, new_name)
    
    display(df.limit(10))

except Exception as e:
    print(f'Error during column renaming:{e}')

+--------------+-----------+----------+---------------+--------------------+----------+-------+-------+-----------+-------+--------+-----------+--------+--------+
|          Area|Time Period|Occupation|Occupation Code|            Industry|Employment| 10th %| 25th %|Entry level| Median|    Mean|Experienced|  75th %|  90th %|
+--------------+-----------+----------+---------------+--------------------+----------+-------+-------+-----------+-------+--------+-----------+--------+--------+
|North Carolina|       2022|     Legal|         230000|Total, All Indust...|    28,060|$38,750|$50,190|    $43,580|$70,520|$102,780|   $131,940|$127,350|$208,000|
+--------------+-----------+----------+---------------+--------------------+----------+-------+-------+-----------+-------+--------+-----------+--------+--------+



Area,TimePeriod,Occupation,OccupationCode,Industry,Employment,10th,25th,EntryLevel,Median,Mean,Experienced,75th,90th
North Carolina,2022,Management,110000,"Total, All Industries",285550,52280,75480,60590,104240,126640,159180,158220,208000
North Carolina,2022,Business and Financial Operations,130000,"Total, All Industries",302740,41600,53990,46310,74530,82730,100670,100330,132610
North Carolina,2022,Computer and Mathematical,150000,"Total, All Industries",172210,49880,68990,57450,102370,104530,127710,133200,164980
North Carolina,2022,Architecture and Engineering,170000,"Total, All Industries",68190,46350,60940,51120,79560,85090,101820,102170,130970
North Carolina,2022,"Life, Physical, and Social Science",190000,"Total, All Industries",41840,38800,50010,43640,64470,74320,89440,86070,120640
North Carolina,2022,Community and Social Service,210000,"Total, All Industries",57300,31670,39000,34470,48190,51620,60060,59830,73060
North Carolina,2022,Legal,230000,"Total, All Industries",28060,38750,50190,43580,70520,102780,131940,127350,208000
North Carolina,2022,Educational Instruction and Library,250000,"Total, All Industries",268980,23850,32780,28260,47920,52930,65080,60130,78700
North Carolina,2022,"Arts, Design, Entertainment, Sports, and Media",270000,"Total, All Industries",48910,28700,36490,31130,51640,66670,84180,76260,105690
North Carolina,2022,Healthcare Practitioners and Technical,290000,"Total, All Industries",297090,36990,50300,42940,66650,87180,108970,95280,138150


In [0]:
%python
## Step3: Load Operation: Load Data to Delta Lake
try:
    df.write.format("delta").mode("overwrite").saveAsTable("ncemployment")
except Exception as e:
    print(f"Error During loading result to Delta Lake:{e}")

In [0]:
%sql
SELECT * FROM ncemployment


Area,TimePeriod,Occupation,OccupationCode,Industry,Employment,10th,25th,EntryLevel,Median,Mean,Experienced,75th,90th
North Carolina,2022,Management,110000,"Total, All Industries",285550,52280,75480,60590,104240,126640,159180,158220,208000
North Carolina,2022,Business and Financial Operations,130000,"Total, All Industries",302740,41600,53990,46310,74530,82730,100670,100330,132610
North Carolina,2022,Computer and Mathematical,150000,"Total, All Industries",172210,49880,68990,57450,102370,104530,127710,133200,164980
North Carolina,2022,Architecture and Engineering,170000,"Total, All Industries",68190,46350,60940,51120,79560,85090,101820,102170,130970
North Carolina,2022,"Life, Physical, and Social Science",190000,"Total, All Industries",41840,38800,50010,43640,64470,74320,89440,86070,120640
North Carolina,2022,Community and Social Service,210000,"Total, All Industries",57300,31670,39000,34470,48190,51620,60060,59830,73060
North Carolina,2022,Legal,230000,"Total, All Industries",28060,38750,50190,43580,70520,102780,131940,127350,208000
North Carolina,2022,Educational Instruction and Library,250000,"Total, All Industries",268980,23850,32780,28260,47920,52930,65080,60130,78700
North Carolina,2022,"Arts, Design, Entertainment, Sports, and Media",270000,"Total, All Industries",48910,28700,36490,31130,51640,66670,84180,76260,105690
North Carolina,2022,Healthcare Practitioners and Technical,290000,"Total, All Industries",297090,36990,50300,42940,66650,87180,108970,95280,138150


In [0]:
%python
## step4: Data Validation
# load delta data table
# delta_table_path = "dbfs:/user/hive/warehouse/ncemployment"  
# ncemployment = spark.read.format("delta").load(delta_table_path)
# ncemployment = spark.sql("SELECT * FROM ncemployment")
# check if certain column is NULL
try:
    if ncemployment.filter((col("OccupationCode").isNull())|(col("Occupation").isNull())).count() > 0:
        raise ValueError("Invalid cell: Table contains NULL values")
    if ncemployment.filter((col("10th")<0)|(col("10th")<0)|(col("25th")<0)|(col("EntryLevel")<0)|(col("Median")<0)|(col("Mean")<0)|(col("Experienced")<0)|(col("75th")<0)|(col("90th")<0)).count() > 0:
        raise ValueError("Invalid cell: Table contains invalid negative values")
except ValueError as ve:
    print("Data Invalidation Error:",ve)


In [0]:
%python
## step 5: Data Visualization
# 1. Analyze the data and store the result(max, min, mean value) in hash table
myHash = defaultdict(list)
meanHash = defaultdict()
maxHash = defaultdict()
minHash = defaultdict()

columns = ['10th', '25th', 'EntryLevel', 'Median', 'Mean', 'Experienced', \
'75th', '90th']

for column in columns:
    meanHash[column] = ncemployment.select(F.mean(col(column))).first()[0]
    maxHash[column] = ncemployment.select(F.max(col(column))).first()[0]
    minHash[column] = ncemployment.select(F.min(col(column))).first()[0]

# print(myHash)
print("Mean Value information")
print(meanHash)
print("Max Value information")
print(maxHash)
print("Min Value information")
print(minHash)

# 2. plot the analysis result
# Function to plot a histogram
def plot_histogram(data, title, ax):
    ax.bar(data.keys(), data.values())
    ax.set_title(title)
    ax.set_xlabel('Categories')
    ax.set_ylabel('Values')
    ax.tick_params(axis='x', rotation=45)

# Create subplots
fig, axes = plt.subplots(nrows=1, ncols=3, figsize=(18, 6))

# Plot each histogram
plot_histogram(meanHash, 'Mean Values', axes[0])
plot_histogram(maxHash, 'Max Values', axes[1])
plot_histogram(minHash, 'Min Values', axes[2])

# Adjust layout
plt.tight_layout()

# Show the plot
plt.show()