# ML Ops Final Project
## Data Ingestion, Cleaning & Joining
### Dataset: LinkedIn Job Postings (2023-2024)
https://www.kaggle.com/datasets/arshkon/linkedin-job-postings/data

### Data Ingestion
- Downloaded dataset from Kaggle and unzipped
- Uploaded the .csv files to databricks via the "Data Ingestion" tab

In [0]:
# List files in our Databricks filestore (DBFS)
# Can also be viewed in "Catalog" tab
files = dbutils.fs.ls('dbfs:/user/hive/warehouse')
display(files)

path,name,size,modificationTime
dbfs:/user/hive/warehouse/benefits_csv/,benefits_csv/,0,1731773617834
dbfs:/user/hive/warehouse/cleaned_data/,cleaned_data/,0,1731773617834
dbfs:/user/hive/warehouse/companies_csv/,companies_csv/,0,1731773617834
dbfs:/user/hive/warehouse/company_industries_1_csv/,company_industries_1_csv/,0,1731773617834
dbfs:/user/hive/warehouse/company_specialities_csv/,company_specialities_csv/,0,1731773617834
dbfs:/user/hive/warehouse/employee_counts_1_csv/,employee_counts_1_csv/,0,1731773617835
dbfs:/user/hive/warehouse/industries_csv/,industries_csv/,0,1731773617835
dbfs:/user/hive/warehouse/job_industries_csv/,job_industries_csv/,0,1731773617835
dbfs:/user/hive/warehouse/job_skills_csv/,job_skills_csv/,0,1731773617835
dbfs:/user/hive/warehouse/postings_csv/,postings_csv/,0,1731773617835


### Read data into spark dataframes, then convert to pandas dataframes for data cleaning
- Choosing to use pandas as I haven't worked with pyspark before and our data is not so large that distributed operations is necessary

In [0]:
from pyspark.sql import SparkSession
import pandas as pd

In [0]:
# benefits.csv
benefits_df = spark.table("default.benefits_csv")
benefits_pd = benefits_df.toPandas()

# companies.csv
companies_df = spark.table("default.companies_csv")
companies_pd = companies_df.toPandas()

# company_industries.csv
company_industries_df = spark.table("default.company_industries_1_csv")
company_industries_pd = company_industries_df.toPandas()

# company_specialties.csv
company_specialities_df = spark.table("default.company_specialities_csv")
company_specialities_pd = company_specialities_df.toPandas()

# employee_counts.csv
employee_counts_df = spark.table("default.employee_counts_1_csv")
employee_counts_pd = employee_counts_df.toPandas()

# industries.csv
industries_map_df = spark.table("default.industries_csv")
industries_map_pd = industries_map_df.toPandas()

# job_industries.csv
job_industries_df = spark.table("default.job_industries_csv")
job_industries_pd = job_industries_df.toPandas()

# job_skills.csv
job_skills_df = spark.table("default.job_skills_csv")
job_skills_pd = job_skills_df.toPandas()

# salaries.csv
salaries_df = spark.table("default.salaries_csv")
salaries_pd = salaries_df.toPandas()

# skills.csv
skills_map_df = spark.table("default.skills_csv")
skills_map_pd = skills_map_df.toPandas()

# postings.csv
# this is largest file by far (0.5 GB)
postings_df = spark.table("default.postings_csv")
postings_pd = postings_df.toPandas()


### Data Cleaning
- Inspect structure of each dataframe and determine cleaning steps
- Perform cleaning and validate
- Save cleaned dataframe as new version

#### benefits_pd
- drop inferred - not important for our analysis
- rename type to benefit_type and convert to categorical, drop old 'type' column
- Perform one-hot encoding of benefit_type
- Aggregate the one-hot encoding by job_id

In [0]:
# benefits_pd investigation

print(benefits_pd.info())
print(benefits_pd.head())
print(benefits_pd.describe(include='all'))
print(benefits_pd.isnull().sum())

# Get unique values of benefit type
unique_values = benefits_pd['type'].unique()
print(f"Unique values in benefit type: {unique_values}")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 67943 entries, 0 to 67942
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   job_id    67943 non-null  int64 
 1   inferred  67943 non-null  int32 
 2   type      67943 non-null  object
dtypes: int32(1), int64(1), object(1)
memory usage: 1.3+ MB
None
       job_id  inferred                     type
0  3887473071         0        Medical insurance
1  3887473071         0         Vision insurance
2  3887473071         0         Dental insurance
3  3887473071         0                   401(k)
4  3887473071         0  Student loan assistance
              job_id      inferred    type
count   6.794300e+04  67943.000000   67943
unique           NaN           NaN      12
top              NaN           NaN  401(k)
freq             NaN           NaN   24231
mean    3.896220e+09      0.594969     NaN
std     9.817292e+07      0.490902     NaN
min     2.322152e+07      0.000000     NaN

In [0]:
# benefits data cleaning

# drop inferred - not important for our analysis
benefits_pd.drop(columns=['inferred'], inplace=True)

# rename type to benefit_type and convert to categorical, drop old 'type' column
benefits_pd['benefit_type'] = benefits_pd['type'].astype('category')
benefits_pd.drop(columns=['type'], inplace=True)

# Perform one-hot encoding
benefits_one_hot = pd.get_dummies(benefits_pd, columns=['benefit_type'], prefix='benefit')

# Aggregate the one-hot encoding by job_id
benefits_clean = benefits_one_hot.groupby('job_id').sum().reset_index()


In [0]:
# check for NaN and duplicates in job_id
print(benefits_clean.isna().sum())
print(benefits_clean['job_id'].duplicated().sum())
(benefits_clean.head())

job_id                             0
benefit_401(k)                     0
benefit_Child care support         0
benefit_Commuter benefits          0
benefit_Dental insurance           0
benefit_Disability insurance       0
benefit_Medical insurance          0
benefit_Paid maternity leave       0
benefit_Paid paternity leave       0
benefit_Pension plan               0
benefit_Student loan assistance    0
benefit_Tuition assistance         0
benefit_Vision insurance           0
dtype: int64
0


Unnamed: 0,job_id,benefit_401(k),benefit_Child care support,benefit_Commuter benefits,benefit_Dental insurance,benefit_Disability insurance,benefit_Medical insurance,benefit_Paid maternity leave,benefit_Paid paternity leave,benefit_Pension plan,benefit_Student loan assistance,benefit_Tuition assistance,benefit_Vision insurance
0,23221523,1,0,0,0,0,0,0,0,0,0,0,0
1,56482768,1,0,0,1,1,0,0,0,0,0,0,0
2,69333422,1,0,0,1,0,1,0,0,0,0,0,1
3,95428182,0,0,0,1,1,1,0,0,0,0,0,0
4,111513530,0,0,0,1,0,1,1,1,1,0,0,1


In [0]:
# Replace invalid characters in column names
benefits_clean.columns = benefits_clean.columns.str.replace(r'[ ,;{}()\n\t=]', '_', regex=True)

# Convert uint8 columns to int
benefits_clean = benefits_clean.astype({col: 'int64' for col in benefits_clean.select_dtypes('uint8').columns})

# Convert pandas DataFrame to PySpark DataFrame
spark_benefits_clean = spark.createDataFrame(benefits_clean)

# Overwrite the existing Delta table with schema migration
spark_benefits_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.benefits_csv")

### companies_pd
- fill NaN in description with "No description provided"
- Company_size: Company grouping based on number of employees (1 Smallest - 7 Largest).
- Fill Company_Size NaN with -1.
- Standardize text columns
- State and City columns are very messy - unusable for analysis without major cleaning. As this is just company headquarters location, country is probably sufficient for a location feature
- 0 in country column indicates unknown
- Remove duplicates in name

In [0]:
print(companies_pd.info())
print(companies_pd.describe(include='all'))
print(companies_pd.isnull().sum())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24473 entries, 0 to 24472
Data columns (total 10 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   company_id    24473 non-null  int32  
 1   name          24473 non-null  object 
 2   description   24177 non-null  object 
 3   company_size  21699 non-null  float64
 4   state         24452 non-null  object 
 5   country       24473 non-null  object 
 6   city          24472 non-null  object 
 7   zip_code      24445 non-null  object 
 8   address       24454 non-null  object 
 9   url           24473 non-null  object 
dtypes: float64(1), int32(1), object(8)
memory usage: 1.8+ MB
None
          company_id  ...                                   url
count   2.447300e+04  ...                                 24473
unique           NaN  ...                                 24473
top              NaN  ...  https://www.linkedin.com/company/ibm
freq             NaN  ...                           

In [0]:
companies_pd.describe(include='all')

Unnamed: 0,company_id,name,description,company_size,state,country,city,zip_code,address,url
count,24473.0,24473,24177,21699.0,24452.0,24473,24472,24445.0,24454.0,24473
unique,,24429,24165,,789.0,81,4124,7779.0,19477.0,24473
top,,Confidential,Our Purpose:\nWe help people love the way they...,,0.0,US,New York,0.0,0.0,https://www.linkedin.com/company/ibm
freq,,5,3,,2175.0,21635,1185,3060.0,3972.0,1
mean,20522390.0,,,3.349233,,,,,,
std,31659290.0,,,1.904503,,,,,,
min,1009.0,,,1.0,,,,,,
25%,165404.0,,,2.0,,,,,,
50%,2738154.0,,,3.0,,,,,,
75%,26241420.0,,,5.0,,,,,,


In [0]:
companies_pd.head()

Unnamed: 0,company_id,name,description,company_size,state,country,city,zip_code,address,url
0,1009,IBM,"At IBM, we do more than work. We create. We cr...",7.0,NY,US,"Armonk, New York",10504,International Business Machines Corp.,https://www.linkedin.com/company/ibm
1,1016,GE HealthCare,Every day millions of people feel the impact o...,7.0,0,US,Chicago,0,-,https://www.linkedin.com/company/gehealthcare
2,1025,Hewlett Packard Enterprise,Official LinkedIn of Hewlett Packard Enterpris...,7.0,Texas,US,Houston,77389,1701 E Mossy Oaks Rd Spring,https://www.linkedin.com/company/hewlett-packa...
3,1028,Oracle,We’re a cloud technology company that provides...,7.0,Texas,US,Austin,78741,2300 Oracle Way,https://www.linkedin.com/company/oracle
4,1033,Accenture,Accenture is a leading global professional ser...,7.0,0,IE,Dublin 2,0,Grand Canal Harbour,https://www.linkedin.com/company/accenture


In [0]:
# Handle missing values
companies_pd['description'].fillna("No description provided", inplace=True)
companies_pd['company_size'].fillna(-1, inplace=True)

# Standardize text columns
companies_pd['name'] = companies_pd['name'].str.strip()
companies_pd['description'] = companies_pd['description'].str.strip()
companies_pd['country'] = companies_pd['country'].str.upper()

# Remove duplicates
companies_pd.drop_duplicates(subset=['name'], inplace=True)

# Drop unnecessary columns
columns_to_drop = ['state', 'city', 'zip_code', 'address', 'url']
companies_pd.drop(columns=columns_to_drop, inplace=True)

# Rename country to company_hq_country
companies_pd.rename(columns={'country': 'company_hq_country'}, inplace=True)

assert companies_clean['company_id'].is_unique

companies_clean = companies_pd

In [0]:
companies_clean.head()

Unnamed: 0,company_id,name,description,company_size,company_hq_country
0,1009,IBM,"At IBM, we do more than work. We create. We cr...",7.0,US
1,1016,GE HealthCare,Every day millions of people feel the impact o...,7.0,US
2,1025,Hewlett Packard Enterprise,Official LinkedIn of Hewlett Packard Enterpris...,7.0,US
3,1028,Oracle,We’re a cloud technology company that provides...,7.0,US
4,1033,Accenture,Accenture is a leading global professional ser...,7.0,IE


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_companies_clean = spark.createDataFrame(companies_clean)

# Overwrite the existing Delta table with schema migration
spark_companies_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.companies_csv")

#### company_industries
- rename industry to company_industry
- 144 unique industries

In [0]:
print(company_industries_pd.info())
print(company_industries_pd.head())
print(company_industries_pd.describe(include='all'))
print(company_industries_pd.isnull().sum())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 24375 entries, 0 to 24374
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   company_id  24375 non-null  int32 
 1   industry    24375 non-null  object
dtypes: int32(1), object(1)
memory usage: 285.8+ KB
None
   company_id                        industry
0      391906  Book and Periodical Publishing
1    22292832                    Construction
2       20300                         Banking
3     3570660  Book and Periodical Publishing
4      878353         Staffing and Recruiting
          company_id                 industry
count   2.437500e+04                    24375
unique           NaN                      144
top              NaN  Staffing and Recruiting
freq             NaN                     2387
mean    2.038566e+07                      NaN
std     3.158083e+07                      NaN
min     1.009000e+03                      NaN
25%     1.652215e+05           

In [0]:
# Count the number of duplicates in the 'company_id' column
num_duplicates = company_industries_pd['company_id'].duplicated().sum()
#print(f"Number of duplicate company IDs: {num_duplicates}")

# 10 duplicate company_id's. Will drop the 10 duplicate rows
company_industries_pd['company_id'].drop_duplicates(inplace=True)

# Rename column for clarity
company_industries_pd.rename(columns={'industry': 'company_industry'}, inplace=True)

# Standardize text in the company_industry column
company_industries_pd['company_industry'] = company_industries_pd['company_industry'].str.strip().str.title()

# Ensure 144 unique industries
assert company_industries_pd['company_industry'].nunique() == 144, "Industry count mismatch!"

company_industries_clean = company_industries_pd

In [0]:
company_industries_clean.head()

Unnamed: 0,company_id,company_industry
0,391906,Book And Periodical Publishing
1,22292832,Construction
2,20300,Banking
3,3570660,Book And Periodical Publishing
4,878353,Staffing And Recruiting


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_company_industries_clean = spark.createDataFrame(company_industries_clean)

# Overwrite the existing Delta table with schema migration
spark_company_industries_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.company_industries_1_csv")

#### company_specialties
- 73179 unique specialties. Too many for one hot encoding. Could do some extensive feature engineering to combine, but may not be super useful. 
- 17780 unique company_id's. Too many specialties to one hot encode, so will concatenate all specialties for a given company in a single column
- Probably won't keep these specialities, but will clean the table just in case

In [0]:
print(company_specialities_pd.info())
print(company_specialities_pd.describe(include='all'))
print(company_specialities_pd.isnull().sum())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 169387 entries, 0 to 169386
Data columns (total 2 columns):
 #   Column      Non-Null Count   Dtype 
---  ------      --------------   ----- 
 0   company_id  169387 non-null  int32 
 1   speciality  169387 non-null  object
dtypes: int32(1), object(1)
memory usage: 1.9+ MB
None
          company_id   speciality
count   1.693870e+05       169387
unique           NaN        82960
top              NaN  Engineering
freq             NaN          601
mean    1.258875e+07          NaN
std     2.451128e+07          NaN
min     1.009000e+03          NaN
25%     1.103290e+05          NaN
50%     1.346497e+06          NaN
75%     1.067675e+07          NaN
max     1.034588e+08          NaN
company_id    0
speciality    0
dtype: int64


In [0]:
# Standardize speciality column
company_specialities_pd['speciality'] = company_specialities_pd['speciality'].str.strip().str.title()
print(company_specialities_pd['speciality'].value_counts().head(20))

# Count the number of unique values in the 'speciality' column
num_unique_values = company_specialities_pd['speciality'].nunique()
print(f"Number of unique values: {num_unique_values}")


Staffing                   750
Engineering                745
Recruiting                 742
Technology                 695
Consulting                 597
Healthcare                 571
Marketing                  567
Manufacturing              526
Project Management         440
Executive Search           438
Recruitment                437
Information Technology     436
Human Resources            426
Education                  418
Finance                    393
Construction               363
Retail                     354
Artificial Intelligence    346
Accounting                 340
Sales                      324
Name: speciality, dtype: int64
Number of unique values: 73179


In [0]:
# Count the number of unique values in the 'company_id' column
num_unique_values = company_specialities_pd['company_id'].nunique()

print(f"Number of unique values: {num_unique_values}")

Number of unique values: 17780


In [0]:
# Consolidate specialities into a single string for each company
specialities_grouped = company_specialities_pd.groupby('company_id')['speciality'].apply(lambda x: ', '.join(x)).reset_index()
specialities_grouped.rename(columns={'speciality': 'company_specialities'}, inplace=True)

company_specialities_clean = specialities_grouped

In [0]:
company_specialities_clean.head()

Unnamed: 0,company_id,company_specialities
0,1009,"Cloud, Mobile, Cognitive, Security, Research, ..."
1,1016,"Healthcare, Biotechnology"
2,1028,"Enterprise, Software, Applications, Database, ..."
3,1033,"Management Consulting, Systems Integration And..."
4,1035,"Business Software, Developer Tools, Home & Edu..."


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_company_specialities_clean = spark.createDataFrame(company_specialities_clean)

# Overwrite the existing Delta table with schema migration
spark_company_specialities_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.company_specialities_csv")

#### employee_counts
- convert time_recorded to datetime
- duplicate company_id's indicate observations taken at different times. Keep most recent observation.
- range of employee counts and followers counts is reasonable. no incorrect outliers

In [0]:
print(employee_counts_pd.info())
print(employee_counts_pd.describe(include='all'))
print(employee_counts_pd.isnull().sum())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 35787 entries, 0 to 35786
Data columns (total 4 columns):
 #   Column          Non-Null Count  Dtype
---  ------          --------------  -----
 0   company_id      35787 non-null  int32
 1   employee_count  35787 non-null  int32
 2   follower_count  35787 non-null  int32
 3   time_recorded   35787 non-null  int32
dtypes: int32(4)
memory usage: 559.3 KB
None
         company_id  employee_count  follower_count  time_recorded
count  3.578700e+04    35787.000000    3.578700e+04   3.578700e+04
mean   1.668254e+07     6715.874256    2.012616e+05   1.713163e+09
std    2.924722e+07    29400.984643    1.114733e+06   3.990869e+05
min    1.009000e+03        0.000000    0.000000e+00   1.712346e+09
25%    6.059650e+04       56.000000    2.738000e+03   1.712861e+09
50%    1.339209e+06      418.000000    1.617800e+04   1.713393e+09
75%    1.544092e+07     2945.000000    7.412950e+04   1.713472e+09
max    1.034730e+08   751125.000000    3.270284e+07  

In [0]:
employee_counts_pd['time_recorded'] = pd.to_datetime(employee_counts_pd['time_recorded'], unit='s')
employee_counts_pd['time_recorded'].head()

0   2024-04-05 19:42:53
1   2024-04-05 19:42:53
2   2024-04-05 19:42:53
3   2024-04-05 19:42:53
4   2024-04-05 19:42:53
Name: time_recorded, dtype: datetime64[ns]

In [0]:
duplicates = employee_counts_pd['company_id'].duplicated()
print(f"Number of duplicate rows: {duplicates.sum()}")

# duplicate company_id's. keep the row with most recent time_recorded

employee_counts_pd = employee_counts_pd.sort_values(by=['company_id', 'time_recorded']).drop_duplicates(subset=['company_id'], keep='last')

print(employee_counts_pd.shape)


Number of duplicate rows: 11314
(24473, 4)


In [0]:
employee_counts_clean = employee_counts_pd
employee_counts_clean.head()

Unnamed: 0,company_id,employee_count,follower_count,time_recorded
30056,1009,311223,16314846,2024-04-19 04:34:15
35604,1016,57001,2196350,2024-04-19 23:23:51
21766,1025,79559,3588329,2024-04-18 15:14:18
32109,1028,191374,9497909,2024-04-19 15:21:13
35631,1033,565191,11890321,2024-04-19 23:30:51


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_employee_counts_clean = spark.createDataFrame(employee_counts_clean)

# Overwrite the existing Delta table with schema migration
spark_employee_counts_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.employee_counts_1_csv")

#### industries_map
- This mapping is only used for job_industries
- Replace missing industry_name entries with "Unnamed Industry"

In [0]:
print(industries_map_pd.info())
print(industries_map_pd.describe(include='all'))
print(industries_map_pd.isnull().sum())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 422 entries, 0 to 421
Data columns (total 2 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   industry_id    422 non-null    int32 
 1   industry_name  388 non-null    object
dtypes: int32(1), object(1)
memory usage: 5.1+ KB
None
        industry_id                    industry_name
count    422.000000                              388
unique          NaN                              388
top             NaN  Defense and Space Manufacturing
freq            NaN                                1
mean    1342.305687                              NaN
std     1212.022551                              NaN
min        1.000000                              NaN
25%      108.250000                              NaN
50%     1161.500000                              NaN
75%     2279.500000                              NaN
max     3253.000000                              NaN
industry_id       0
industry_name

In [0]:
# Replace missing industry_name entries with "Unnamed Industry"
industries_map_pd['industry_name'].fillna("Unnamed Industry", inplace=True)

# Standardize industry names
industries_map_pd['industry_name'] = industries_map_pd['industry_name'].str.strip().str.title()

job_industries_map_clean = industries_map_pd

In [0]:
job_industries_map_clean.head()

Unnamed: 0,industry_id,industry_name
0,1,Defense And Space Manufacturing
1,3,Computer Hardware Manufacturing
2,4,Software Development
3,5,Computer Networking Products
4,6,"Technology, Information And Internet"


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_job_industries_map_clean = spark.createDataFrame(job_industries_map_clean)

# Overwrite the existing Delta table with schema migration
spark_job_industries_map_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.industries_csv")

#### job_industries
- a single job_id has multiple industries
- If a job_id has multiple industries, will keep the one that occurs most frequently in the table

In [0]:
print(job_industries_pd.info())
print(job_industries_pd.describe(include='all'))
print(job_industries_pd.isnull().sum())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 164808 entries, 0 to 164807
Data columns (total 2 columns):
 #   Column       Non-Null Count   Dtype
---  ------       --------------   -----
 0   job_id       164808 non-null  int64
 1   industry_id  164808 non-null  int32
dtypes: int32(1), int64(1)
memory usage: 1.9 MB
None
             job_id    industry_id
count  1.648080e+05  164808.000000
mean   3.897074e+09     196.155284
std    7.624930e+07     594.230895
min    9.217160e+05       1.000000
25%    3.894876e+09      17.000000
50%    3.902342e+09      44.000000
75%    3.904719e+09      96.000000
max    3.906267e+09    3253.000000
job_id         0
industry_id    0
dtype: int64


In [0]:
duplicates = job_industries_pd['job_id'].duplicated()
print(f"Number of duplicate rows: {duplicates.sum()}")


Number of duplicate rows: 37683


In [0]:
job_industries_pd.head()

Unnamed: 0,job_id,industry_id
0,3884428798,82
1,3887473071,48
2,3887465684,41
3,3887467939,82
4,3887467939,80


In [0]:
from collections import Counter
# Calculate global frequency of each industry
industry_frequency = job_industries_pd['industry_id'].value_counts().to_dict()

# Group by job_id
job_industries_grouped = job_industries_pd.groupby('job_id')['industry_id'].apply(list).reset_index()

# Keep the most frequent industry globally for each job_id
def most_frequent_global(industries):
    return sorted(industries, key=lambda x: -industry_frequency.get(x, 0))[0]

job_industries_grouped['primary_industry_id'] = job_industries_grouped['industry_id'].apply(most_frequent_global)

# Drop the aggregated industry_id column
job_industries_grouped.drop(columns=['industry_id'], inplace=True)

job_industries_clean = job_industries_grouped

In [0]:
job_industries_clean.head()

Unnamed: 0,job_id,primary_industry_id
0,921716,44
1,1218575,14
2,2264355,89
3,9615617,142
4,10998357,32


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_job_industries_clean = spark.createDataFrame(job_industries_clean)

# Overwrite the existing Delta table with schema migration
spark_job_industries_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.job_industries_csv")

#### skills_map
- no cleaning needed

In [0]:
print(skills_map_pd.info())
print(skills_map_pd.describe(include='all'))
print(skills_map_pd.isnull().sum())

# only 35 unique skills - seems kind of low, but maybe makes it more usable/interpretable
# join to jobs by skill_abr


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 35 entries, 0 to 34
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   skill_abr   35 non-null     object
 1   skill_name  35 non-null     object
dtypes: object(2)
memory usage: 688.0+ bytes
None
       skill_abr    skill_name
count         35            35
unique        35            35
top          ART  Art/Creative
freq           1             1
skill_abr     0
skill_name    0
dtype: int64


#### job_skills
- only 35 unique skills - seems kind of low, but maybe makes it more usable/interpretable
- Each job can have many skills, so will one hot encode the skills and group by job_id
- Need to join with skills_map_pd first, though

In [0]:
print(job_skills_pd.info())
print(job_skills_pd.head())
print(job_skills_pd.describe(include='all'))
print(job_skills_pd.isnull().sum())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 213768 entries, 0 to 213767
Data columns (total 2 columns):
 #   Column     Non-Null Count   Dtype 
---  ------     --------------   ----- 
 0   job_id     213768 non-null  int64 
 1   skill_abr  213768 non-null  object
dtypes: int64(1), object(1)
memory usage: 3.3+ MB
None
       job_id skill_abr
0  3884428798      MRKT
1  3884428798        PR
2  3884428798       WRT
3  3887473071      SALE
4  3887465684       FIN
              job_id skill_abr
count   2.137680e+05    213768
unique           NaN        35
top              NaN        IT
freq             NaN     26137
mean    3.896849e+09       NaN
std     7.834902e+07       NaN
min     9.217160e+05       NaN
25%     3.894661e+09       NaN
50%     3.902323e+09       NaN
75%     3.904715e+09       NaN
max     3.906267e+09       NaN
job_id       0
skill_abr    0
dtype: int64


In [0]:
duplicates = job_skills_pd['job_id'].duplicated()
print(f"Number of duplicate rows: {duplicates.sum()}")


Number of duplicate rows: 86961


In [0]:
# Perform the join on skill_abr
job_skills_with_names = job_skills_pd.merge(skills_map_pd, on='skill_abr', how='left')

# Drop the skill_abr column
job_skills_with_names.drop(columns=['skill_abr'], inplace=True)

job_skills_with_names

Unnamed: 0,job_id,skill_name
0,3884428798,Marketing
1,3884428798,Public Relations
2,3884428798,Writing/Editing
3,3887473071,Sales
4,3887465684,Finance
...,...,...
213763,3902876855,Human Resources
213764,3902878689,Management
213765,3902878689,Manufacturing
213766,3902883233,Sales


In [0]:
# Step 1: One-hot encode the skills
skills_one_hot = pd.get_dummies(job_skills_with_names, columns=['skill_name'], prefix='skill')

# Step 2: Group by job_id and sum the one-hot encoded columns
skills_one_hot_grouped = skills_one_hot.groupby('job_id').sum().reset_index()

job_skills_clean = skills_one_hot_grouped


In [0]:
job_skills_clean.head()

Unnamed: 0,job_id,skill_Accounting/Auditing,skill_Administrative,skill_Advertising,skill_Analyst,skill_Art/Creative,skill_Business Development,skill_Consulting,skill_Customer Service,skill_Design,skill_Distribution,skill_Education,skill_Engineering,skill_Finance,skill_General Business,skill_Health Care Provider,skill_Human Resources,skill_Information Technology,skill_Legal,skill_Management,skill_Manufacturing,skill_Marketing,skill_Other,skill_Product Management,skill_Production,skill_Project Management,skill_Public Relations,skill_Purchasing,skill_Quality Assurance,skill_Research,skill_Sales,skill_Science,skill_Strategy/Planning,skill_Supply Chain,skill_Training,skill_Writing/Editing
0,921716,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0
1,1218575,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,1829192,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
3,2264355,0,0,0,0,1,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
4,10998357,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
# Replace invalid characters in column names
job_skills_clean.columns = job_skills_clean.columns.str.replace(r'[ ,;{}()\n\t=]', '_', regex=True)

# Convert uint8 columns to int
job_skills_clean = job_skills_clean.astype({col: 'int64' for col in job_skills_clean.select_dtypes('uint8').columns})

# Convert pandas DataFrame to PySpark DataFrame
spark_job_skills_clean = spark.createDataFrame(job_skills_clean)

# Overwrite the existing Delta table with schema migration
spark_job_skills_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.job_skills_csv")

#### salaries
- Npt using this table
- Doesn't contain any additional salary information for the job_ids in postings_pd
- The job_ids in salaries_pd that are not in postings_pd do not have other supplementary info like description, skills, views, etc.

In [0]:
print(salaries_pd.info())
print(salaries_pd.head())
print(salaries_pd.describe(include='all'))
print(salaries_pd.isnull().sum())


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 40785 entries, 0 to 40784
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   salary_id          40785 non-null  int32  
 1   job_id             40785 non-null  int64  
 2   max_salary         33947 non-null  float64
 3   med_salary         6838 non-null   float64
 4   min_salary         33947 non-null  float64
 5   pay_period         40785 non-null  object 
 6   currency           40785 non-null  object 
 7   compensation_type  40785 non-null  object 
dtypes: float64(3), int32(1), int64(1), object(3)
memory usage: 2.3+ MB
None
   salary_id      job_id  max_salary  ...  pay_period  currency compensation_type
0          1  3884428798         NaN  ...      HOURLY       USD       BASE_SALARY
1          2  3887470552        25.0  ...      HOURLY       USD       BASE_SALARY
2          3  3884431523    120000.0  ...      YEARLY       USD       BASE_SALARY
3      

In [0]:
salaries_pd

Unnamed: 0,salary_id,job_id,max_salary,med_salary,min_salary,pay_period,currency,compensation_type
0,1,3884428798,,20.0,,HOURLY,USD,BASE_SALARY
1,2,3887470552,25.00,,23.0,HOURLY,USD,BASE_SALARY
2,3,3884431523,120000.00,,100000.0,YEARLY,USD,BASE_SALARY
3,4,3884911725,200000.00,,10000.0,YEARLY,USD,BASE_SALARY
4,5,3887473220,35.00,,33.0,HOURLY,USD,BASE_SALARY
...,...,...,...,...,...,...,...,...
40780,40781,3902881498,,15.5,,HOURLY,USD,BASE_SALARY
40781,40782,3902883232,,25.0,,HOURLY,USD,BASE_SALARY
40782,40783,3902866633,21.53,,21.1,HOURLY,USD,BASE_SALARY
40783,40784,3902879720,125000.00,,100000.0,YEARLY,USD,BASE_SALARY


#### postings

In [0]:
# To access an older version of any of the tables run the following 
# Alternatively, go to catalog -> table -> history

postings_df = spark.read.format("delta").option("versionAsOf", 1).table("default.postings_csv")
postings_pd = postings_df.toPandas()

In [0]:
print(postings_pd.info())
print(postings_pd.head())
print(postings_pd.describe(include='all'))
print(postings_pd.isnull().sum())

# Drop irrelevant columns (too many NaN (closed_time, skills_desc, posting_domain), duplicate information (work_type, max_salary, min_salary, med_salary), or only one level (compensation type and sponsored))
# Drop rows where NaN are not acceptable
# For columns where NaN are acceptable, fill with placeholders
# Standardize text columns
# Convert time columns to datetime
# Parse location
# One-hot encode categorical variables


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 123849 entries, 0 to 123848
Data columns (total 31 columns):
 #   Column                      Non-Null Count   Dtype  
---  ------                      --------------   -----  
 0   job_id                      123849 non-null  int64  
 1   company_name                122130 non-null  object 
 2   title                       123849 non-null  object 
 3   description                 123842 non-null  object 
 4   max_salary                  29793 non-null   float64
 5   pay_period                  36073 non-null   object 
 6   location                    123849 non-null  object 
 7   company_id                  122132 non-null  float64
 8   views                       122160 non-null  float64
 9   med_salary                  6280 non-null    float64
 10  min_salary                  29793 non-null   float64
 11  formatted_work_type         123849 non-null  object 
 12  applies                     23320 non-null   float64
 13  original_liste

In [0]:
postings_pd.head()

Unnamed: 0,job_id,company_name,title,description,max_salary,pay_period,location,company_id,views,med_salary,min_salary,formatted_work_type,applies,original_listed_time,remote_allowed,job_posting_url,application_url,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,posting_domain,sponsored,work_type,currency,compensation_type,normalized_salary,zip_code,fips
0,921716,Corcoran Sawyer Smith,Marketing Coordinator,Job descriptionA leading real estate firm in N...,20.0,HOURLY,"Princeton, NJ",2774458.0,20.0,,17.0,Full-time,2.0,1713398000000.0,,https://www.linkedin.com/jobs/view/921716/?trk...,,ComplexOnsiteApply,1715990000000.0,,,Requirements: \n\nWe are seeking a College or ...,1713398000000.0,,0,FULL_TIME,USD,BASE_SALARY,38480.0,8540.0,34021.0
1,1829192,,Mental Health Therapist/Counselor,"At Aspen Therapy and Wellness , we are committ...",50.0,HOURLY,"Fort Collins, CO",,1.0,,30.0,Full-time,,1712858000000.0,,https://www.linkedin.com/jobs/view/1829192/?tr...,,ComplexOnsiteApply,1715450000000.0,,,,1712858000000.0,,0,FULL_TIME,USD,BASE_SALARY,83200.0,80521.0,8069.0
2,10998357,The National Exemplar,Assitant Restaurant Manager,The National Exemplar is accepting application...,65000.0,YEARLY,"Cincinnati, OH",64896719.0,8.0,,45000.0,Full-time,,1713278000000.0,,https://www.linkedin.com/jobs/view/10998357/?t...,,ComplexOnsiteApply,1715870000000.0,,,We are currently accepting resumes for FOH - A...,1713278000000.0,,0,FULL_TIME,USD,BASE_SALARY,55000.0,45202.0,39061.0
3,23221523,"Abrams Fensterman, LLP",Senior Elder Law / Trusts and Estates Associat...,Senior Associate Attorney - Elder Law / Trusts...,175000.0,YEARLY,"New Hyde Park, NY",766262.0,16.0,,140000.0,Full-time,,1712896000000.0,,https://www.linkedin.com/jobs/view/23221523/?t...,,ComplexOnsiteApply,1715488000000.0,,,This position requires a baseline understandin...,1712896000000.0,,0,FULL_TIME,USD,BASE_SALARY,157500.0,11040.0,36059.0
4,35982263,,Service Technician,Looking for HVAC service tech with experience ...,80000.0,YEARLY,"Burlington, IA",,3.0,,60000.0,Full-time,,1713452000000.0,,https://www.linkedin.com/jobs/view/35982263/?t...,,ComplexOnsiteApply,1716044000000.0,,,,1713452000000.0,,0,FULL_TIME,USD,BASE_SALARY,70000.0,52601.0,19057.0


In [0]:
postings_pd.describe(include='all')

Unnamed: 0,job_id,company_name,title,description,max_salary,pay_period,location,company_id,views,med_salary,min_salary,formatted_work_type,applies,original_listed_time,remote_allowed,job_posting_url,application_url,application_type,expiry,closed_time,formatted_experience_level,skills_desc,listed_time,posting_domain,sponsored,work_type,currency,compensation_type,normalized_salary,zip_code,fips
count,123849.0,122130,123849,123842,29793.0,36073,123849,122132.0,122160.0,6280.0,29793.0,123849,23320.0,123849.0,15246.0,123849,87184,123849,123849.0,1073.0,94440,2439,123849.0,83881,123849.0,123849,36073,36073,36073.0,102977.0,96434.0
unique,,24428,72521,107827,,5,8526,,,,,7,,,,123849,84800,4,,,6,2212,,4443,,7,6,1,,,
top,,Liberty Healthcare and Rehabilitation Services,Sales Manager,Position Summary: Our Sales Manager has managi...,,YEARLY,United States,,,,,Full-time,,,,https://www.linkedin.com/jobs/view/921716/?trk...,https://app.dataannotation.tech/worker_signup?...,OffsiteApply,,,Mid-Senior level,This position requires the following skills: E...,,www.click2apply.net,,FULL_TIME,USD,BASE_SALARY,,,
freq,,1108,673,474,,20628,8125,,,,,98814,,,,1,205,84607,,,41489,28,,3811,,98814,36058,36073,,,
mean,3896402000.0,,,,91939.42,,,12204010.0,14.618247,22015.619876,64910.85,,10.591981,1713152000000.0,1.0,,,,1716213000000.0,1712928000000.0,,,1713204000000.0,,0.0,,,,205327.0,50400.491887,28713.879887
std,84043550.0,,,,701110.1,,,25541430.0,85.903598,52255.873846,495973.8,,29.047395,484820900.0,0.0,,,,2321394000.0,362289300.0,,,398912200.0,,0.0,,,,5097627.0,30252.232515,16015.929825
min,921716.0,,,,1.0,,,1009.0,1.0,0.0,1.0,,1.0,1701811000000.0,1.0,,,,1712903000000.0,1712346000000.0,,,1711317000000.0,,0.0,,,,0.0,1001.0,1003.0
25%,3894587000.0,,,,48.28,,,14352.0,3.0,18.94,37.0,,1.0,1712863000000.0,1.0,,,,1715481000000.0,1712670000000.0,,,1712886000000.0,,0.0,,,,52000.0,24112.0,13121.0
50%,3901998000.0,,,,80000.0,,,226965.0,4.0,25.5,60000.0,,3.0,1713395000000.0,1.0,,,,1716042000000.0,1712670000000.0,,,1713408000000.0,,0.0,,,,81500.0,48059.0,29183.0
75%,3904707000.0,,,,140000.0,,,8047188.0,8.0,2510.5,100000.0,,8.0,1713478000000.0,1.0,,,,1716088000000.0,1713283000000.0,,,1713484000000.0,,0.0,,,,125000.0,78201.0,42077.0


In [0]:
# Drop irrelevant columns
postings_pd.drop(columns=['closed_time', 'skills_desc', 'max_salary', 'min_salary', 'med_salary', 'work_type', 'posting_domain', 'compensation_type', 'sponsored'], inplace=True)

# Drop rows where NaN are not acceptable
postings_pd.dropna(subset=['company_name', 'company_id', 'views'], inplace=True)

# For columns where NaN are acceptable, fill with placeholders
postings_pd['applies'].fillna(0, inplace=True)   # 0 applies recorded
postings_pd['remote_allowed'].fillna(0, inplace=True)   # remote_allowed not specified
postings_pd['formatted_experience_level'].fillna('unspecified', inplace=True)

# Standardize text columns
text_columns = ['company_name', 'title', 'description', 'location']
for col in text_columns:
    postings_pd[col] = postings_pd[col].str.strip().str.lower()

# Convert time columns to datetime
time_columns = ['original_listed_time', 'expiry', 'listed_time']
for col in time_columns:
    postings_pd[col] = pd.to_datetime(postings_pd[col], unit='ms', errors='coerce')

# Parse location
location_split = postings_pd['location'].str.split(',', expand=True)
postings_pd['city'] = location_split[0].str.strip()  # Remove leading/trailing whitespace
postings_pd['state'] = location_split[1].str.strip() # Remove leading/trailing whitespace
postings_pd['state'].fillna('united states', inplace=True)  # Fill NA values in 'state' with 'united states'

# One-hot encode categorical variables
postings_pd = pd.get_dummies(postings_pd, columns=['formatted_work_type', 'formatted_experience_level', 'pay_period', 'currency', 'application_type'], prefix=['work_type', 'experience', 'pay_period', 'currency', 'app_type'])

# Create link_to_application: 1 if application_url is not NaN, 0 otherwise
postings_pd['link_to_application'] = postings_pd['application_url'].notna().astype(int)

# Create salary_listed: 1 if normalized_salary is not NaN, 0 otherwise
postings_pd['salary_listed'] = postings_pd['normalized_salary'].notna().astype(int)


In [0]:
postings_pd.shape

(120503, 49)

In [0]:
postings_pd.describe(include='all', datetime_is_numeric=True)

Unnamed: 0,job_id,company_name,title,description,location,company_id,views,applies,original_listed_time,remote_allowed,job_posting_url,application_url,expiry,listed_time,normalized_salary,zip_code,fips,city,state,work_type_Contract,work_type_Full-time,work_type_Internship,work_type_Other,work_type_Part-time,work_type_Temporary,work_type_Volunteer,experience_Associate,experience_Director,experience_Entry level,experience_Executive,experience_Internship,experience_Mid-Senior level,experience_unspecified,pay_period_BIWEEKLY,pay_period_HOURLY,pay_period_MONTHLY,pay_period_WEEKLY,pay_period_YEARLY,currency_BBD,currency_CAD,currency_EUR,currency_GBP,currency_USD,app_type_ComplexOnsiteApply,app_type_OffsiteApply,app_type_SimpleOnsiteApply,app_type_UnknownApply,link_to_application,salary_listed
count,120503.0,120503,120503,120497,120503,120503.0,120503.0,120503.0,120503,120503.0,120503,86426,120503,120503,35095.0,100617.0,94227.0,120503,120503,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0,120503.0
unique,,24032,70336,104784,8438,,,,,,120503,84122,,,,,,6408,119,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
top,,liberty healthcare and rehabilitation services,sales manager,position summary: our sales manager has managi...,united states,,,,,,https://www.linkedin.com/jobs/view/921716/?trk...,https://app.dataannotation.tech/worker_signup?...,,,,,,united states,united states,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
freq,,1089,632,447,7725,,,,,,1,199,,,,,,7725,16366,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
mean,3897071000.0,,,,,12114730.0,14.630947,2.001668,2024-04-15 03:10:31.026248448,0.120121,,,2024-05-20 00:38:04.546882304,2024-04-15 17:57:18.672804608,208445.2,50421.635966,28736.956382,,,0.097508,0.798395,0.007842,0.003693,0.078313,0.009676,0.004573,0.079035,0.03053,0.301412,0.009867,0.011967,0.338788,0.228401,7.5e-05,0.119474,0.004158,0.001469,0.166062,1.7e-05,2.5e-05,4.1e-05,1.7e-05,0.291138,0.241297,0.695908,0.062787,8e-06,0.71721,0.291238
min,921716.0,,,,,1009.0,1.0,0.0,2023-12-05 21:08:53,0.0,,,2024-04-12 06:30:48,2024-03-24 21:50:14,0.0,1001.0,1003.0,,,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,3894604000.0,,,,,14158.0,3.0,0.0,2024-04-11 19:14:06,0.0,,,2024-05-12 02:24:52,2024-04-12 01:49:57,51809.65,24149.0,13121.0,,,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,3902300000.0,,,,,217526.0,4.0,0.0,2024-04-17 22:54:53,0.0,,,2024-05-18 14:12:01,2024-04-18 02:25:38,81706.5,48080.0,29183.0,,,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0
75%,3904706000.0,,,,,7934671.0,8.0,0.0,2024-04-18 22:00:27,0.0,,,2024-05-19 02:38:27,2024-04-18 23:16:29,125000.0,78201.0,42079.0,,,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,1.0
max,3906267000.0,,,,,103473000.0,9975.0,967.0,2024-04-20 00:26:30,1.0,,,2024-10-17 00:25:46,2024-04-20 00:26:56,535600000.0,99901.0,56045.0,,,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [0]:
postings_pd.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 120503 entries, 0 to 123848
Data columns (total 49 columns):
 #   Column                       Non-Null Count   Dtype         
---  ------                       --------------   -----         
 0   job_id                       120503 non-null  int64         
 1   company_name                 120503 non-null  object        
 2   title                        120503 non-null  object        
 3   description                  120497 non-null  object        
 4   location                     120503 non-null  object        
 5   company_id                   120503 non-null  float64       
 6   views                        120503 non-null  float64       
 7   applies                      120503 non-null  float64       
 8   original_listed_time         120503 non-null  datetime64[ns]
 9   remote_allowed               120503 non-null  float64       
 10  job_posting_url              120503 non-null  object        
 11  application_url           

#### Further clean 'state' column

In [0]:
print("Unique values in state (including NaN):")
print(postings_pd['state'].unique())

Unique values in state (including NaN):
['nj' 'oh' 'ny' 'nc' 'united states' 'ne' 'fl' 'mi' 'mo' 'ak' 'al' 'ga'
 'co' 'tx' 'ma' 'az' 'wa' 'wi' 'hi' 'pa' 'ca' 'la' 'ut' 'in' 'va' 'ia'
 'tn' 'mn' 'md' 'ky' 'or' 'nebraska metropolitan area' 'il' 'mt'
 'ohio metropolitan area' 'ok' 'dc' 'ms'
 'south carolina metropolitan area' 'sc' 'ks' 'ar'
 'texas metropolitan area' 'nm' 'ct' 'california' 'nv'
 'oregon metropolitan area' 'illinois metropolitan area' 'id'
 'south carolina area' 'nh' 'wy' 'sd' 'nd' 'de' 'alabama area' 'wv' 'vt'
 'kansas metropolitan area' 'massachusetts metropolitan area' 'me'
 'wisconsin metropolitan area' 'on' 'ri' 'missouri'
 'new york metropolitan area' 'virginia' 'colorado' 'florida' 'ohio'
 'north carolina metropolitan area' 'new york' 'texas' 'mi area' 'oregon'
 'illinois' 'massachusetts' 'arizona' 'hawaii' 'nevada' 'tennessee'
 'south dakota' 'indiana' 'district of columbia' 'pennsylvania'
 'south carolina' 'utah' 'georgia area' 'south holland'
 'indiana metropolit

In [0]:
# Define a mapping of state variations to their standard abbreviations
state_mapping = {
    # Valid state abbreviations
    'nj': 'nj', 'oh': 'oh', 'ny': 'ny', 'nc': 'nc', 'ne': 'ne', 'fl': 'fl', 'mi': 'mi',
    'mo': 'mo', 'ak': 'ak', 'al': 'al', 'ga': 'ga', 'co': 'co', 'tx': 'tx', 'ma': 'ma',
    'az': 'az', 'wa': 'wa', 'wi': 'wi', 'hi': 'hi', 'pa': 'pa', 'ca': 'ca', 'la': 'la',
    'ut': 'ut', 'in': 'in', 'va': 'va', 'ia': 'ia', 'tn': 'tn', 'mn': 'mn', 'md': 'md',
    'ky': 'ky', 'or': 'or', 'il': 'il', 'mt': 'mt', 'ok': 'ok', 'dc': 'dc', 'ms': 'ms',
    'sc': 'sc', 'ks': 'ks', 'ar': 'ar', 'nm': 'nm', 'ct': 'ct', 'nv': 'nv', 'nh': 'nh',
    'wy': 'wy', 'sd': 'sd', 'nd': 'nd', 'de': 'de', 'wv': 'wv', 'vt': 'vt', 'me': 'me',
    'ri': 'ri', 'id': 'id',

    # Variations of state names or areas
    'nebraska metropolitan area': 'ne',
    'ohio metropolitan area': 'oh',
    'south carolina metropolitan area': 'sc',
    'california': 'ca',
    'illinois metropolitan area': 'il',
    'massachusetts metropolitan area': 'ma',
    'north carolina metropolitan area': 'nc',
    'new york metropolitan area': 'ny',
    'texas metropolitan area': 'tx',
    'wisconsin metropolitan area': 'wi',
    'oregon metropolitan area': 'or',
    'florida': 'fl',
    'new york': 'ny',
    'colorado': 'co',
    'virginia': 'va',
    'missouri': 'mo',
    'georgia': 'ga',
    'illinois': 'il',
    'arizona': 'az',
    'hawaii': 'hi',
    'nevada': 'nv',
    'tennessee': 'tn',
    'south dakota': 'sd',
    'indiana': 'in',
    'district of columbia': 'dc',
    'pennsylvania': 'pa',
    'south carolina': 'sc',
    'utah': 'ut',
    'maryland': 'md',
    'north carolina': 'nc',
    'new jersey': 'nj',
    'alabama': 'al',
    'michigan': 'mi',
    'nebraska': 'ne',
    'maine': 'me',
    'oklahoma': 'ok',
    'louisiana': 'la',
    'minnesota': 'mn',
    'arkansas': 'ar',

    # General terms or invalid values
    'united states': 'united states',
    'the gambia': 'unspecified',  # or drop these rows if preferred
    'qc': 'unspecified',  # Quebec, not a U.S. state
    'on': 'unspecified',  # Ontario, not a U.S. state
    'az area': 'az',
    'nc area': 'nc',
    'il area': 'il',
    'minnesota area': 'mn',
    'georgia area': 'ga',
    'alabama area': 'al',
    'indiana metropolitan area': 'in',
    'virginia metropolitan area': 'va',
    'louisiana metropolitan area': 'la',
    'texas': 'tx',
    'missouri area': 'mo',
    'oregon': 'or',
    'wisconsin': 'wi',
}

# Apply the mapping to the 'state' column
postings_pd['state'] = postings_pd['state'].str.strip().map(state_mapping)

# Fill any remaining NaN values with 'unspecified'
postings_pd['state'].fillna('unspecified', inplace=True)

# Check unique values after mapping
print(postings_pd['state'].unique())
print(postings_pd['state'].value_counts())


['nj' 'oh' 'ny' 'nc' 'united states' 'ne' 'fl' 'mi' 'mo' 'ak' 'al' 'ga'
 'co' 'tx' 'ma' 'az' 'wa' 'wi' 'hi' 'pa' 'ca' 'la' 'ut' 'in' 'va' 'ia'
 'tn' 'mn' 'md' 'ky' 'or' 'il' 'mt' 'ok' 'dc' 'ms' 'sc' 'ks' 'ar' 'nm'
 'ct' 'nv' 'id' 'unspecified' 'nh' 'wy' 'sd' 'nd' 'de' 'wv' 'vt' 'me' 'ri']
united states    16366
ca               11410
tx               10438
ny                6020
fl                5795
nc                4856
il                4396
pa                4069
va                3606
oh                3469
ma                3428
ga                3358
nj                3192
mi                2798
wa                2637
az                2460
co                2293
md                1929
mo                1922
tn                1839
wi                1824
mn                1804
in                1775
sc                1574
or                1239
ky                1164
ct                1154
la                1092
al                 991
dc                 979
ia                 9

In [0]:
postings_clean = postings_pd
print(postings_clean.shape)

(120503, 49)


In [0]:
postings_clean.head()

Unnamed: 0,job_id,company_name,title,description,location,company_id,views,applies,original_listed_time,remote_allowed,job_posting_url,application_url,expiry,listed_time,normalized_salary,zip_code,fips,city,state,work_type_Contract,work_type_Full-time,work_type_Internship,work_type_Other,work_type_Part-time,work_type_Temporary,work_type_Volunteer,experience_Associate,experience_Director,experience_Entry level,experience_Executive,experience_Internship,experience_Mid-Senior level,experience_unspecified,pay_period_BIWEEKLY,pay_period_HOURLY,pay_period_MONTHLY,pay_period_WEEKLY,pay_period_YEARLY,currency_BBD,currency_CAD,currency_EUR,currency_GBP,currency_USD,app_type_ComplexOnsiteApply,app_type_OffsiteApply,app_type_SimpleOnsiteApply,app_type_UnknownApply,link_to_application,salary_listed
0,921716,corcoran sawyer smith,marketing coordinator,job descriptiona leading real estate firm in n...,"princeton, nj",2774458.0,20.0,2.0,2024-04-17 23:45:08,0.0,https://www.linkedin.com/jobs/view/921716/?trk...,,2024-05-17 23:45:08,2024-04-17 23:45:08,38480.0,8540.0,34021.0,princeton,nj,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,1,0,0,0,0,1
2,10998357,the national exemplar,assitant restaurant manager,the national exemplar is accepting application...,"cincinnati, oh",64896719.0,8.0,0.0,2024-04-16 14:26:54,0.0,https://www.linkedin.com/jobs/view/10998357/?t...,,2024-05-16 14:26:54,2024-04-16 14:26:54,55000.0,45202.0,39061.0,cincinnati,oh,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,1,0,0,0,0,1
3,23221523,"abrams fensterman, llp",senior elder law / trusts and estates associat...,senior associate attorney - elder law / trusts...,"new hyde park, ny",766262.0,16.0,0.0,2024-04-12 04:23:32,0.0,https://www.linkedin.com/jobs/view/23221523/?t...,,2024-05-12 04:23:32,2024-04-12 04:23:32,157500.0,11040.0,36059.0,new hyde park,ny,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,1,0,0,0,0,1
5,91700727,downtown raleigh alliance,economic development and planning intern,job summary:the economic development & plannin...,"raleigh, nc",1481176.0,9.0,4.0,2024-04-18 16:01:39,0.0,https://www.linkedin.com/jobs/view/91700727/?t...,,2024-05-18 16:01:39,2024-04-18 16:01:39,35360.0,27601.0,37183.0,raleigh,nc,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,1,1,0,0,0,0,1
6,103254301,raw cereal,producer,company descriptionraw cereal is a creative de...,united states,81942316.0,7.0,1.0,2024-04-11 18:43:39,1.0,https://www.linkedin.com/jobs/view/103254301/?...,,2024-05-11 18:43:39,2024-04-11 18:43:39,180000.0,,,united states,united states,1,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,1,0,0,0,0,1,0,0,1,0,0,1


In [0]:
# Replace invalid characters in column names
postings_clean.columns = postings_clean.columns.str.replace(r'[ ,;{}()\n\t=]', '_', regex=True)

# Convert uint8 columns to int
postings_clean = postings_clean.astype({col: 'int64' for col in postings_clean.select_dtypes('uint8').columns})

# Convert pandas DataFrame to PySpark DataFrame
spark_postings_clean = spark.createDataFrame(postings_clean)

# Overwrite the existing Delta table with schema migration
spark_postings_clean.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.postings_csv")

### Join cleaned tables
- Schema diagram
![](/Workspace/Users/wdeforest@uchicago.edu/db_diagram.png)

#### Load cleaned tables

In [0]:
# benefits.csv
benefits_df = spark.table("default.benefits_csv")
benefits_pd = benefits_df.toPandas()

# companies.csv
companies_df = spark.table("default.companies_csv")
companies_pd = companies_df.toPandas()

# company_industries.csv
company_industries_df = spark.table("default.company_industries_1_csv")
company_industries_pd = company_industries_df.toPandas()

# company_specialties.csv
company_specialities_df = spark.table("default.company_specialities_csv")
company_specialities_pd = company_specialities_df.toPandas()

# employee_counts.csv
employee_counts_df = spark.table("default.employee_counts_1_csv")
employee_counts_pd = employee_counts_df.toPandas()

# industries.csv
industries_map_df = spark.table("default.industries_csv")
industries_map_pd = industries_map_df.toPandas()

# job_industries.csv
job_industries_df = spark.table("default.job_industries_csv")
job_industries_pd = job_industries_df.toPandas()

# job_skills.csv
job_skills_df = spark.table("default.job_skills_csv")
job_skills_pd = job_skills_df.toPandas()

# salaries.csv
salaries_df = spark.table("default.salaries_csv")
salaries_pd = salaries_df.toPandas()

# skills.csv
skills_map_df = spark.table("default.skills_csv")
skills_map_pd = skills_map_df.toPandas()

# postings.csv
# this is largest file by far (0.5 GB)
postings_df = spark.table("default.postings_csv")
postings_pd = postings_df.toPandas()

### Join all job-related tables

In [0]:
# Join job_industries_pd with industries_map_pd to get the job_industry
job_industries_with_names = job_industries_pd.merge(
    industries_map_pd[['industry_id', 'industry_name']], 
    left_on='primary_industry_id', 
    right_on='industry_id', 
    how='left'
)

# Keep only the job_id and job_industry columns
job_industries_with_names = job_industries_with_names[['job_id', 'industry_name']]
job_industries_with_names.rename(columns={'industry_name': 'job_industry'}, inplace=True)

# Start with job_postings_pd as the base
jobs_compiled = postings_pd

# Join with job_skills_clean
jobs_compiled = jobs_compiled.merge(job_skills_pd, on='job_id', how='left')

# Join with job_industries_with_names (updated version of job_industries_pd)
jobs_compiled = jobs_compiled.merge(job_industries_with_names, on='job_id', how='left')

# Join with benefits_clean
jobs_compiled = jobs_compiled.merge(benefits_pd, on='job_id', how='left')

print("Joined all job-related tables:")
print(jobs_compiled.info())


Joined all job-related tables:
<class 'pandas.core.frame.DataFrame'>
Int64Index: 120503 entries, 0 to 120502
Data columns (total 97 columns):
 #   Column                           Non-Null Count   Dtype         
---  ------                           --------------   -----         
 0   job_id                           120503 non-null  int64         
 1   company_name                     120503 non-null  object        
 2   title                            120503 non-null  object        
 3   description                      120497 non-null  object        
 4   location                         120503 non-null  object        
 5   company_id                       120503 non-null  float64       
 6   views                            120503 non-null  float64       
 7   applies                          120503 non-null  float64       
 8   original_listed_time             120503 non-null  datetime64[ns]
 9   remote_allowed                   120503 non-null  float64       
 10  job_posting_u

In [0]:
jobs_compiled.head()

Unnamed: 0,job_id,company_name,title,description,location,company_id,views,applies,original_listed_time,remote_allowed,job_posting_url,application_url,expiry,listed_time,normalized_salary,zip_code,fips,city,state,work_type_Contract,work_type_Full-time,work_type_Internship,work_type_Other,work_type_Part-time,work_type_Temporary,work_type_Volunteer,experience_Associate,experience_Director,experience_Entry_level,experience_Executive,experience_Internship,experience_Mid-Senior_level,experience_unspecified,pay_period_BIWEEKLY,pay_period_HOURLY,pay_period_MONTHLY,pay_period_WEEKLY,pay_period_YEARLY,currency_BBD,currency_CAD,...,skill_Design,skill_Distribution,skill_Education,skill_Engineering,skill_Finance,skill_General_Business,skill_Health_Care_Provider,skill_Human_Resources,skill_Information_Technology,skill_Legal,skill_Management,skill_Manufacturing,skill_Marketing,skill_Other,skill_Product_Management,skill_Production,skill_Project_Management,skill_Public_Relations,skill_Purchasing,skill_Quality_Assurance,skill_Research,skill_Sales,skill_Science,skill_Strategy/Planning,skill_Supply_Chain,skill_Training,skill_Writing/Editing,job_industry,benefit_401_k_,benefit_Child_care_support,benefit_Commuter_benefits,benefit_Dental_insurance,benefit_Disability_insurance,benefit_Medical_insurance,benefit_Paid_maternity_leave,benefit_Paid_paternity_leave,benefit_Pension_plan,benefit_Student_loan_assistance,benefit_Tuition_assistance,benefit_Vision_insurance
0,3905284609,oak street health,outreach executive i,description\n\ncompany: oak street health\n\nt...,"tucson, az",3029525.0,3.0,0.0,2024-04-18 20:11:46,0.0,https://www.linkedin.com/jobs/view/3905284609/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,85701.0,4019.0,tucson,az,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
1,3905284610,oak street health,"nurse practitioner, advanced practice provider",description\n\ntitle: nurse practitioner - adv...,"albuquerque, nm",3029525.0,3.0,0.0,2024-04-18 20:11:47,0.0,https://www.linkedin.com/jobs/view/3905284610/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,87101.0,35001.0,albuquerque,nm,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0
2,3905284614,oak street health,medical assistant/phlebotomist,description\n\ncompany: oak street health\n\nt...,"aurora, co",3029525.0,4.0,0.0,2024-04-18 20:11:56,0.0,https://www.linkedin.com/jobs/view/3905284614/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,80010.0,8001.0,aurora,co,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,3905284616,lantheus,"director, quality systems & compliance (hybrid)","lantheus is headquartered in bedford, massachu...","billerica, ma",249830.0,4.0,0.0,2024-04-18 00:00:00,0.0,https://www.linkedin.com/jobs/view/3905284616/...,https://recruiting.ultipro.com/LAN1018LMII/Job...,2024-05-18 20:27:01,2024-04-18 20:27:01,,1821.0,25017.0,billerica,ma,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Pharmaceutical Manufacturing,,,,,,,,,,,,
4,3905284617,north american dental group (nadg),patient service rep i,patient service representative \n\njoin us in ...,"loganville, ga",2831596.0,2.0,0.0,2024-04-18 20:12:07,0.0,https://www.linkedin.com/jobs/view/3905284617/...,https://careers-nadentalgroup.icims.com/jobs/1...,2024-05-18 20:16:48,2024-04-18 20:16:48,,30052.0,,loganville,ga,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Medical Practices,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


#### Join all company-related tables

In [0]:
# Start with companies_clean as the base
companies_compiled = companies_pd

# Join with company_industries_clean
companies_compiled = companies_compiled.merge(company_industries_pd, on='company_id', how='left')

# Join with company_specialities_clean
companies_compiled = companies_compiled.merge(company_specialities_pd, on='company_id', how='left')

# Join with employee_counts_clean
companies_compiled = companies_compiled.merge(employee_counts_pd, on='company_id', how='left')

print("Joined all company-related tables:")
print(companies_compiled.info())


Joined all company-related tables:
<class 'pandas.core.frame.DataFrame'>
Int64Index: 24434 entries, 0 to 24433
Data columns (total 10 columns):
 #   Column                Non-Null Count  Dtype         
---  ------                --------------  -----         
 0   company_id            24434 non-null  int32         
 1   name                  24434 non-null  object        
 2   description           24434 non-null  object        
 3   company_size          24434 non-null  float64       
 4   company_hq_country    24434 non-null  object        
 5   company_industry      24328 non-null  object        
 6   company_specialities  17758 non-null  object        
 7   employee_count        24434 non-null  int32         
 8   follower_count        24434 non-null  int32         
 9   time_recorded         24434 non-null  datetime64[ns]
dtypes: datetime64[ns](1), float64(1), int32(3), object(5)
memory usage: 1.8+ MB
None


In [0]:
companies_compiled.head()

Unnamed: 0,company_id,name,description,company_size,company_hq_country,company_industry,company_specialities,employee_count,follower_count,time_recorded
0,32313,Audemars Piguet,Audemars Piguet is the oldest fine watchmaking...,5.0,CH,Retail Luxury Goods And Jewelry,"Haute Horlogerie, Luxury Products",2027,249352,2024-04-20 00:27:39
1,32336,Accentuate Staffing,"Accentuate Staffing, founded in 1996, has expe...",1.0,US,Staffing And Recruiting,"Staffing, Recruitment",167,22831,2024-04-11 18:39:47
2,32346,AMSURG,AMSURG is an independent leader in ambulatory ...,5.0,US,Hospitals And Health Care,"Health Care, Outpatient Surgery, Ambulatory Su...",948,12775,2024-04-19 21:30:05
3,32359,AgFirst Farm Credit Bank,"As part of the nationwide Farm Credit system, ...",3.0,US,Banking,"Financial Services, Technology Services, Infor...",834,9168,2024-04-18 05:12:05
4,32364,Aims Community College,Aims Community College is one of the most prog...,4.0,US,Higher Education,"Automotive, Aviation, Welding, Nursing, Busine...",1077,16325,2024-04-16 03:31:08


#### Join compiled jobs and companies tables

In [0]:
# Join jobs_compiled with companies_compiled on company_id
final_compiled = jobs_compiled.merge(companies_compiled, on='company_id', how='left')

print("Final compiled table:")
print(final_compiled.info())


Final compiled table:
<class 'pandas.core.frame.DataFrame'>
Int64Index: 120650 entries, 0 to 120649
Columns: 106 entries, job_id to time_recorded
dtypes: datetime64[ns](4), float64(57), int64(31), object(14)
memory usage: 98.5+ MB
None


In [0]:
final_compiled.head()

Unnamed: 0,job_id,company_name,title,description_x,location,company_id,views,applies,original_listed_time,remote_allowed,job_posting_url,application_url,expiry,listed_time,normalized_salary,zip_code,fips,city,state,work_type_Contract,work_type_Full-time,work_type_Internship,work_type_Other,work_type_Part-time,work_type_Temporary,work_type_Volunteer,experience_Associate,experience_Director,experience_Entry_level,experience_Executive,experience_Internship,experience_Mid-Senior_level,experience_unspecified,pay_period_BIWEEKLY,pay_period_HOURLY,pay_period_MONTHLY,pay_period_WEEKLY,pay_period_YEARLY,currency_BBD,currency_CAD,...,skill_Legal,skill_Management,skill_Manufacturing,skill_Marketing,skill_Other,skill_Product_Management,skill_Production,skill_Project_Management,skill_Public_Relations,skill_Purchasing,skill_Quality_Assurance,skill_Research,skill_Sales,skill_Science,skill_Strategy/Planning,skill_Supply_Chain,skill_Training,skill_Writing/Editing,job_industry,benefit_401_k_,benefit_Child_care_support,benefit_Commuter_benefits,benefit_Dental_insurance,benefit_Disability_insurance,benefit_Medical_insurance,benefit_Paid_maternity_leave,benefit_Paid_paternity_leave,benefit_Pension_plan,benefit_Student_loan_assistance,benefit_Tuition_assistance,benefit_Vision_insurance,name,description_y,company_size,company_hq_country,company_industry,company_specialities,employee_count,follower_count,time_recorded
0,3905284609,oak street health,outreach executive i,description\n\ncompany: oak street health\n\nt...,"tucson, az",3029525.0,3.0,0.0,2024-04-18 20:11:46,0.0,https://www.linkedin.com/jobs/view/3905284609/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,85701.0,4019.0,tucson,az,0,1,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,Oak Street Health,"Founded in 2012, Oak Street Health is a networ...",6.0,US,Hospitals And Health Care,"Primary Care Health Centers, Medicare, Concier...",3780.0,113611.0,2024-04-18 17:04:11
1,3905284610,oak street health,"nurse practitioner, advanced practice provider",description\n\ntitle: nurse practitioner - adv...,"albuquerque, nm",3029525.0,3.0,0.0,2024-04-18 20:11:47,0.0,https://www.linkedin.com/jobs/view/3905284610/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,87101.0,35001.0,albuquerque,nm,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,Oak Street Health,"Founded in 2012, Oak Street Health is a networ...",6.0,US,Hospitals And Health Care,"Primary Care Health Centers, Medicare, Concier...",3780.0,113611.0,2024-04-18 17:04:11
2,3905284614,oak street health,medical assistant/phlebotomist,description\n\ncompany: oak street health\n\nt...,"aurora, co",3029525.0,4.0,0.0,2024-04-18 20:11:56,0.0,https://www.linkedin.com/jobs/view/3905284614/...,https://jobs.jobvite.com/oak-street-health/job...,2024-05-18 20:23:04,2024-04-18 20:23:04,,80010.0,8001.0,aurora,co,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Hospitals And Health Care,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Oak Street Health,"Founded in 2012, Oak Street Health is a networ...",6.0,US,Hospitals And Health Care,"Primary Care Health Centers, Medicare, Concier...",3780.0,113611.0,2024-04-18 17:04:11
3,3905284616,lantheus,"director, quality systems & compliance (hybrid)","lantheus is headquartered in bedford, massachu...","billerica, ma",249830.0,4.0,0.0,2024-04-18 00:00:00,0.0,https://www.linkedin.com/jobs/view/3905284616/...,https://recruiting.ultipro.com/LAN1018LMII/Job...,2024-05-18 20:27:01,2024-04-18 20:27:01,,1821.0,25017.0,billerica,ma,0,1,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Pharmaceutical Manufacturing,,,,,,,,,,,,,Lantheus,About us\n\nWith more than 65 years of experie...,4.0,US,Pharmaceutical Manufacturing,"Precision Diagnostics, Targeted Therapeutics, ...",1095.0,17988.0,2024-04-18 20:31:33
4,3905284617,north american dental group (nadg),patient service rep i,patient service representative \n\njoin us in ...,"loganville, ga",2831596.0,2.0,0.0,2024-04-18 20:12:07,0.0,https://www.linkedin.com/jobs/view/3905284617/...,https://careers-nadentalgroup.icims.com/jobs/1...,2024-05-18 20:16:48,2024-04-18 20:16:48,,30052.0,,loganville,ga,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Medical Practices,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,North American Dental Group (NADG),North American Dental Group is a premier denta...,5.0,US,Hospitals And Health Care,"Dentistry, Dso, Dental Services Organization, ...",554.0,26933.0,2024-04-18 20:18:51


In [0]:
# Convert pandas DataFrame to PySpark DataFrame
spark_final_compiled = spark.createDataFrame(final_compiled)

# Create a new Delta table instead of overwriting an existing one
spark_final_compiled.write.format("delta").option("overwriteSchema", "true").mode("overwrite").saveAsTable("default.final_compiled")

In [0]:
# To access an older version of any of the tables run the following 
# Alternatively, go to catalog -> table -> history

# previous_version_df = spark.read.format("delta").option("versionAsOf", 0).table("default.table_name")

# Show the old version
# previous_version_df.show()

- final_compiled is the combination of all the cleaned datasets (other than salaries which was not used)
- In the next notebook (EDA and Versioning) I will do EDA on final_compiled and clean it in different ways to get different datasets for different analyses