# Lightcast Resume Data to Transition Matrix

Here, we build a transition matrix across occupations based on our sequential Lightcast resume data. 

Due to the large size of our sequential dataset, we use the Dask package which allows us to take advantage of multiple cores and clusters to speed up our computations. 

In [1]:
# Install Dask 
#pip install "dask[complete]"

Collecting dask[complete]Note: you may need to restart the kernel to use updated packages.

  Downloading dask-2024.4.0-py3-none-any.whl.metadata (3.8 kB)
Collecting click>=8.1 (from dask[complete])
  Downloading click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting cloudpickle>=1.5.0 (from dask[complete])
  Downloading cloudpickle-3.0.0-py3-none-any.whl.metadata (7.0 kB)
Collecting fsspec>=2021.09.0 (from dask[complete])
  Downloading fsspec-2024.3.1-py3-none-any.whl.metadata (6.8 kB)
Collecting partd>=1.2.0 (from dask[complete])
  Downloading partd-1.4.1-py3-none-any.whl.metadata (4.6 kB)
Collecting pyyaml>=5.3.1 (from dask[complete])
  Downloading PyYAML-6.0.1-cp312-cp312-win_amd64.whl.metadata (2.1 kB)
Collecting pyarrow>=7.0 (from dask[complete])
  Downloading pyarrow-15.0.2-cp312-cp312-win_amd64.whl.metadata (3.1 kB)
Collecting pyarrow-hotfix (from dask[complete])
  Downloading pyarrow_hotfix-0.6-py3-none-any.whl.metadata (3.6 kB)
Collecting lz4>=4.3.2 (from dask[complete])
 

## System Info

In [1]:
# Number of CPU in System

import multiprocessing

# Get the number of CPUs in the system
num_cpus = multiprocessing.cpu_count()

print(f"Number of CPUs in the system: {num_cpus}")


Number of CPUs in the system: 20


In [5]:
# Number of cores per CPU

import psutil

# Logical cores
logical_cores = psutil.cpu_count()
# Physical cores
physical_cores = psutil.cpu_count(logical=False)

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")


Logical cores: 20
Physical cores: 14


## 1. Data Processing with Dask

In [1]:
from dask.distributed import Client

client = Client(n_workers=5, threads_per_worker=3, memory_limit="15GB")
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 57500 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:57500/status,

0,1
Dashboard: http://127.0.0.1:57500/status,Workers: 5
Total threads: 15,Total memory: 69.85 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:57503,Workers: 5
Dashboard: http://127.0.0.1:57500/status,Total threads: 15
Started: Just now,Total memory: 69.85 GiB

0,1
Comm: tcp://127.0.0.1:57536,Total threads: 3
Dashboard: http://127.0.0.1:57539/status,Memory: 13.97 GiB
Nanny: tcp://127.0.0.1:57506,
Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-br_pmh_q,Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-br_pmh_q

0,1
Comm: tcp://127.0.0.1:57526,Total threads: 3
Dashboard: http://127.0.0.1:57527/status,Memory: 13.97 GiB
Nanny: tcp://127.0.0.1:57508,
Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-xx9ce3qf,Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-xx9ce3qf

0,1
Comm: tcp://127.0.0.1:57530,Total threads: 3
Dashboard: http://127.0.0.1:57533/status,Memory: 13.97 GiB
Nanny: tcp://127.0.0.1:57510,
Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-c_yibshc,Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-c_yibshc

0,1
Comm: tcp://127.0.0.1:57535,Total threads: 3
Dashboard: http://127.0.0.1:57537/status,Memory: 13.97 GiB
Nanny: tcp://127.0.0.1:57512,
Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-nnidbfa_,Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-nnidbfa_

0,1
Comm: tcp://127.0.0.1:57529,Total threads: 3
Dashboard: http://127.0.0.1:57531/status,Memory: 13.97 GiB
Nanny: tcp://127.0.0.1:57514,
Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-ewrlw8yn,Local directory: C:\Users\ewp\AppData\Local\Temp\dask-scratch-space\worker-ewrlw8yn


In [27]:
import dask.dataframe as dd

### 1.1 Read Files

In [3]:
df = dd.read_csv("Lightcast_jobs_filtered.csv")

In [6]:
df = df.iloc[:,1:]

In [4]:
df.dtypes

ID              string[pyarrow]
START_DATE      string[pyarrow]
END_DATE        string[pyarrow]
COMPANY_NAME    string[pyarrow]
ONET            string[pyarrow]
ONET_NAME       string[pyarrow]
NATION_RAW      string[pyarrow]
IS_CURRENT                 bool
dtype: object

In [5]:
df.head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
0,Bsk6xcoQF3KrXE4HUAK3BQ_0000,2000-11,2007-05,Whirlpool,11-9199.00,"Managers, All Other",united states,False
1,Bsk6xcoQF3KrXE4HUAK3BQ_0000,1984-06,2000-05,Whirlpool,51-2092.00,Team Assemblers,united states,False
2,Bsk6xcoQF3KrXE4HUAK3BQ_0000,2007-05,2024-04,Whirlpool,43-1011.00,First-Line Supervisors of Office and Administr...,united states,True
3,hHG1-DPlybOKRZuJZTPEuA_0000,2022-08,2024-04,Denver Mattress,41-1011.00,First-Line Supervisors of Retail Sales Workers,united states,True
4,hHG1-DPlybOKRZuJZTPEuA_0000,2020-05,2022-05,Energy Related Properties,49-9071.00,"Maintenance and Repair Workers, General",united states,False


In [10]:
len(df)

68337442

### 1.2 Sorting

Since our resume data is not sorted in the order that we want, we sort our data first. 

In [7]:
df1 = df.sort_values(by=['ID', 'START_DATE', 'IS_CURRENT'], ascending=[True, True, True])

In [8]:
df1.head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
131379,---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
131378,---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
131377,---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
131380,---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
226284,---0LzeiQbmXa7iCz6GzAw_0000,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False


YES!!!!! Sorting done. 

In [7]:
df1 = df1.persist()

In [15]:
#df1.to_parquet("Lightcast_jobs_filtered_sorted.parquet")

In [15]:
len(df1.ONET.unique())

993

### 1.3 Drop unclassified ONET code

In [274]:
dfx= dd.read_csv("Lightcast_jobs_filtered_sorted_V2.csv")

In [269]:
dfx = dfx.set_index("ID",inplace = True).persist()

In [270]:
dfx.head()

Unnamed: 0_level_0,Unnamed: 0,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
---0KdvEQZJHVrusyaBjfA_0000,0,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,1,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,3,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
---0LzeiQbmXa7iCz6GzAw_0000,4,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False


In [271]:
dfx['ONET'].str.startswith('99', na=False).sum().compute()

0

In [272]:
len(dfx)

51155844

In [273]:
len(dfx.index.unique())

12815753

Dropped in Axelle Replicate Network Construction V2

## 2. Building transition matrix (trial)

#### Pull data

In [3]:
import pandas as pd

In [4]:
dfx= dd.read_csv("data/raw/Lightcast_jobs_filtered_sorted_V2.csv")

In [17]:
dfx.to_parquet("Lightcast_jobs_filtered_sorted_V2.parquet")

In [5]:
sample_dfx = dfx.iloc[:,1:].loc[:10000].set_index("ID",inplace = True).persist()

In [5]:
dfx = dfx.iloc[:,1:].set_index("ID",inplace = True).persist()

In [6]:
len(dfx)

51155844

In [7]:
dfx.head()

Unnamed: 0_level_0,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
---0LzeiQbmXa7iCz6GzAw_0000,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False


In [126]:
dfx.dtypes

START_DATE      string[pyarrow]
END_DATE        string[pyarrow]
COMPANY_NAME    string[pyarrow]
ONET            string[pyarrow]
ONET_NAME       string[pyarrow]
NATION_RAW      string[pyarrow]
IS_CURRENT                 bool
dtype: object

### a. With sample data

In [5]:
sample_df = dfx.get_partition(0).compute().iloc[:10000,:]

In [6]:
len(sample_df)

10000

In [7]:
len(sample_df.ID.unique())

2487

In [17]:
sample_df[sample_df.ID == '--0XnB8xjmkAiJfkGkxezw_0000']

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
218013,--0XnB8xjmkAiJfkGkxezw_0000,2013-01,2016-06,Community College Of Vermont,43-4051.00,Customer Service Representatives,united states,False
218011,--0XnB8xjmkAiJfkGkxezw_0000,2013-09,2017-05,New England Culinary Institute,11-9199.00,"Managers, All Other",united states,False
218014,--0XnB8xjmkAiJfkGkxezw_0000,2014-03,2014-03,Spaulding High School,27-2022.00,Coaches and Scouts,united states,False
218006,--0XnB8xjmkAiJfkGkxezw_0000,2016-09,2021-08,Norwich University,33-9092.00,"Lifeguards, Ski Patrol, and Other Recreational...",united states,False
218012,--0XnB8xjmkAiJfkGkxezw_0000,2016-10,2017-04,Norwich University,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False
218005,--0XnB8xjmkAiJfkGkxezw_0000,2017-08,2021-08,National Life,15-1244.00,Network and Computer Systems Administrators,united states,False
218009,--0XnB8xjmkAiJfkGkxezw_0000,2017-08,2019-07,National Life,43-4051.00,Customer Service Representatives,united states,False
218004,--0XnB8xjmkAiJfkGkxezw_0000,2018-09,2021-08,National Life,15-1241.00,Computer Network Architects,united states,False
218015,--0XnB8xjmkAiJfkGkxezw_0000,2019-06,2019-06,Norwich University,11-1011.00,Chief Executives,united states,False
218008,--0XnB8xjmkAiJfkGkxezw_0000,2019-07,2021-03,National Life,13-1071.00,Human Resources Specialists,united states,False


In [21]:
sample_df.groupby("ID").size().sort_values(ascending = False)

ID
--EbWMMiI7q6DxvoLrJckA_0000    19
--1KFTTA4tYBwlmKtxSzew_0000    15
--5D4wY3FZYAD2lqutZY8A_0000    14
--DGqRAugnsHNYnpChaGEw_0000    14
--DuBLa55TE-By177C8qmw_0000    14
                               ..
--P82YjGHloEejPlA1kGRw_0000     2
--B8zqrQCA-idPrr-USOBQ_0000     2
--HVhfULjLOAAYReMtvapg_0000     2
--PBuo62wHTiFALhbdV4HQ_0000     2
--NFhqI2iRG1PSlFYdPoGw_0000     2
Length: 2487, dtype: int64

In [8]:
from tqdm import tqdm

In [9]:
len(sample_df.groupby('ID'))

2487

In [37]:
sample_df

Unnamed: 0.1,Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
0,0,---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
1,1,---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
2,2,---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
3,3,---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
4,4,---0LzeiQbmXa7iCz6GzAw_0000,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False
...,...,...,...,...,...,...,...,...,...
9995,9995,--X9VS6uLebARPBVIbAASg_0000,2008-02,2013-02,Tdcj,49-1011.00,"First-Line Supervisors of Mechanics, Installer...",united states,False
9996,9996,--X9VS6uLebARPBVIbAASg_0000,2013-02,2015-04,Tdcj,21-1093.00,Social and Human Service Assistants,united states,False
9997,9997,--X9pKiaIOmtdBwVyegftg_0000,1993-01,1996-01,Pennian Bank,11-2022.00,Sales Managers,united states,False
9998,9998,--X9pKiaIOmtdBwVyegftg_0000,1998-02,2000-01,Fremont Financial,11-2022.00,Sales Managers,united states,False


#### a.1 Version 1: 

Count hops to overlapping jobs. If there is no overlapping, count hop to the next job. 

In [10]:
job_hop_count_empty = pd.read_csv("data/raw/job_hop_count_empty.csv",index_col=0)

In [11]:
job_hop_count_empty

Unnamed: 0_level_0,11-1011.00,11-1011.03,11-1021.00,11-1031.00,11-2011.00,11-2021.00,11-2022.00,11-2032.00,11-2033.00,11-3012.00,...,55-2012.00,55-2013.00,55-3011.00,55-3012.00,55-3013.00,55-3014.00,55-3015.00,55-3016.00,55-3018.00,55-3019.00
ONET (row->col),Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11-1011.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1011.03,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1021.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1031.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-2011.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
55-3014.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3015.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3016.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3018.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [12]:
# For each ID, Profile 
with tqdm(total=len(sample_df.groupby('ID'))) as pbar:
    for id_, group in sample_df.groupby('ID'):
        # For each experience for ID
        for i in range(len(group) - 1):
            # Current job experience info (ONET code, start and end dates) 
            current_st = group.iloc[i]["START_DATE"]
            current_end = group.iloc[i]["END_DATE"]
            current_ONET = group.iloc[i]["ONET"]

            # Remaining experiences in group
            remaining_group = group.iloc[i+1:]

            # Select experiences with overlapping dates 
            overlapping = remaining_group[(remaining_group['START_DATE'] > current_st) & (remaining_group['START_DATE'] < current_end)]

            # If not empty, count all experiences as hops. Hop from which ONET to which
            if not overlapping.empty: 
                # to which ONET occupations
                overlapping_ONET = overlapping["ONET"].values.tolist()
                #print(overlapping_ONET)
                
                # Find where
                #print(job_hop_count_empty.loc[current_ONET][overlapping_ONET])
                # Count!
                job_hop_count_empty.loc[current_ONET][overlapping_ONET] += 1
                #print(job_hop_count_empty.loc[current_ONET][overlapping_ONET])
            
            # If empty, count hop to next row only
            else: 
                #print('EMPTY')
                next_ONET = remaining_group.iloc[0]['ONET']
                job_hop_count_empty.loc[current_ONET][next_ONET] += 1
                #print(job_hop_count_empty.loc[current_ONET][next_ONET])
        pbar.update(1)
    

  0%|          | 0/2487 [00:00<?, ?it/s]

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  job_hop_count_empty.loc[current_ONET][next_ONET] += 1
You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the origina

In [13]:
job_hop_count_empty

Unnamed: 0_level_0,11-1011.00,11-1011.03,11-1021.00,11-1031.00,11-2011.00,11-2021.00,11-2022.00,11-2032.00,11-2033.00,11-3012.00,...,55-2012.00,55-2013.00,55-3011.00,55-3012.00,55-3013.00,55-3014.00,55-3015.00,55-3016.00,55-3018.00,55-3019.00
ONET (row->col),Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11-1011.00,28,0,15,0,0,4,2,4,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1011.03,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1021.00,6,0,48,0,0,12,15,0,0,1,...,0,0,0,0,0,0,0,0,0,0
11-1031.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-2011.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
55-3014.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3015.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3016.00,0,0,0,0,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3018.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [14]:
(job_hop_count_empty !=0).sum().sort_values(ascending=False)[:10]

43-1011.00    115
11-9199.00    103
15-1232.00     89
11-1021.00     82
43-4051.00     78
43-6014.00     67
13-1199.00     65
11-1011.00     63
11-2022.00     61
21-1093.00     59
dtype: int64

In [15]:
(job_hop_count_empty !=0).sum().sum()

4479

#### a.2 Version 2: 

Count hops to overlapping jobs AND to the next job. If there is no overlapping, just count hop to the next job. 

In [16]:
job_hop_count_empty1 = pd.read_csv("data/raw/job_hop_count_empty.csv",index_col=0)

In [17]:
# Revised code: including hop to the next. 
# For each ID, Profile 
with tqdm(total=len(sample_df.groupby('ID'))) as pbar:
    overlap_count = 0
    for id_, group in sample_df.groupby('ID'):
        # For each experience for ID
        for i in range(len(group) - 1):
            # Current job experience info (ONET code, start and end dates) 
            current_st = group.iloc[i]["START_DATE"]
            current_end = group.iloc[i]["END_DATE"]
            current_ONET = group.iloc[i]["ONET"]

            # Remaining experiences in group
            remaining_group = group.iloc[i+1:]

            # Select experiences with overlapping dates 
            overlapping = remaining_group[
                (remaining_group['START_DATE'] > current_st) & 
                (remaining_group['START_DATE'] < current_end)]
            notOverlapping = remaining_group[
                ~((remaining_group['START_DATE'] > current_st) & 
                  (remaining_group['START_DATE'] < current_end))]

            # If not empty, count all experiences as hops. Hop from which ONET to which
            if not overlapping.empty: 
                if not notOverlapping.empty:
                    overlap_count += 1
                # to which ONET occupations
                overlapping_ONET = overlapping["ONET"].values.tolist()
                #print(overlapping_ONET)

                # Find where
                #print(job_hop_count_empty.loc[current_ONET][overlapping_ONET])
                # Count!
                job_hop_count_empty1.loc[current_ONET][overlapping_ONET] += 1
                #print(job_hop_count_empty.loc[current_ONET][overlapping_ONET])
            
            # If empty, count hop to next row only
            # Count hop to next row
            if not notOverlapping.empty:
                #print('EMPTY')
                next_ONET = notOverlapping.iloc[0]['ONET']
                job_hop_count_empty1.loc[current_ONET][next_ONET] += 1
                #print(job_hop_count_empty.loc[current_ONET][next_ONET])
        pbar.update(1)

  0%|          | 0/2487 [00:00<?, ?it/s]

You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the original DataFrame or Series, because the intermediate object on which we are setting values will behave as a copy.
A typical example is when you are setting values in a column of a DataFrame, like:

df["col"][row_indexer] = value

Use `df.loc[row_indexer, "col"] = values` instead, to perform the assignment in a single step and ensure this keeps updating the original `df`.

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy

  job_hop_count_empty1.loc[current_ONET][next_ONET] += 1
You are setting values through chained assignment. Currently this works in certain cases, but when using Copy-on-Write (which will become the default behaviour in pandas 3.0) this will never work to update the origin

In [52]:
overlap_count

674

In [51]:
(job_hop_count_empty1 !=0).sum().sum()

4765

#### a.3 Version 3 (a): 

Revision sent on 4/25 - Count hops to overlapping jobs AND to the next job. If there is no overlapping, just count hop to the next job. 

In [18]:
job_hop_count_empty1 = pd.read_csv("data/raw/job_hop_count_empty.csv",index_col=0)

In [21]:
# Revised code: including hop to the next. 
# For each ID, Profile 
with tqdm(total=len(sample_df.groupby('ID'))) as pbar:
    difference = 0
    after = 0
    for id_, group in sample_df.groupby('ID'):
        # For each experience for ID
        for i in range(len(group) - 1):
            # Current job experience info (ONET code, start and end dates) 
            current_st = group.iloc[i]["START_DATE"]
            current_end = group.iloc[i]["END_DATE"]
            current_ONET = group.iloc[i]["ONET"]

            # Remaining experiences in group
            remaining_group = group.iloc[i+1:]

            # Select experiences with overlapping dates 
            overlapping = remaining_group[
                (remaining_group['START_DATE'] > current_st) & 
                (remaining_group['START_DATE'] < current_end)]
            notOverlapping = remaining_group[
                ~((remaining_group['START_DATE'] > current_st) & 
                  (remaining_group['START_DATE'] < current_end))]

            # if there is overlapping and non-overlapping, add 1 (intersection)
            if not overlapping.empty: 
                for overlapped_ONET in overlapping["ONET"].values:
                    job_hop_count_empty1.loc[current_ONET, overlapped_ONET] += 1
                    after += 1

                    # print("c", current_ONET, "over", overlapped_ONET)

                if not notOverlapping.empty:
                    difference += 1
                    # print("here")
            
            # If there is non-overlapping
            if not notOverlapping.empty:
                next_ONET = notOverlapping.iloc[0]['ONET']
                job_hop_count_empty1.loc[current_ONET, next_ONET] += 1
                after += 1
                # print("c", current_ONET, "n", next_ONET)
        pbar.update(1)

  0%|          | 0/2487 [00:00<?, ?it/s]

100%|██████████| 2487/2487 [00:08<00:00, 309.04it/s]


In [22]:
(job_hop_count_empty1 !=0).sum().sum()

4791

#### a.3 Version 3 (b): 

Revision sent on 4/25 - Count hops to overlapping jobs AND to the next job. If there is no overlapping, just count hop to the next job. 

In [23]:
job_hop_count_empty1 = pd.read_csv("data/raw/job_hop_count_empty.csv",index_col=0)

In [25]:
# Revised code: including hop to the next. 
# For each ID, Profile 
with tqdm(total=len(sample_df.groupby('ID'))) as pbar:
    difference = 0
    after = 0
    for id_, group in sample_df.groupby('ID'):
        # For each experience for ID
        for i in range(len(group) - 1):
            # Current job experience info (ONET code, start and end dates) 
            current_st = group.iloc[i]["START_DATE"]
            current_end = group.iloc[i]["END_DATE"]
            current_ONET = group.iloc[i]["ONET"]

            # Remaining experiences in group
            remaining_group = group.iloc[i+1:]

            # Select experiences with overlapping dates 
            remaining_group = group.iloc[i+1:]
            overlapping = remaining_group[
                (pd.to_datetime(remaining_group['START_DATE']) > current_st) & 
                (pd.to_datetime((remaining_group['START_DATE'])) < current_end)
            ]
            notOverlapping = remaining_group[
                (pd.to_datetime(remaining_group['START_DATE']) != current_st) &
                (pd.to_datetime(remaining_group['START_DATE']) >= current_end) 
            ]

            # if there is overlapping and non-overlapping, add 1 (intersection)
            if not overlapping.empty: 
                for overlapped_ONET in overlapping["ONET"].values:
                    job_hop_count_empty1.loc[current_ONET, overlapped_ONET] += 1
                    after += 1

                    # print("c", current_ONET, "over", overlapped_ONET)

                if not notOverlapping.empty:
                    difference += 1
                    # print("here")
            
            # If there is non-overlapping
            if not notOverlapping.empty:
                next_ONET = notOverlapping.iloc[0]['ONET']
                job_hop_count_empty1.loc[current_ONET, next_ONET] += 1
                after += 1
                # print("c", current_ONET, "n", next_ONET)
        pbar.update(1)

  0%|          | 0/2487 [00:00<?, ?it/s]

100%|██████████| 2487/2487 [00:24<00:00, 99.68it/s] 


In [26]:
(job_hop_count_empty1 !=0).sum().sum()

4725

### b. With full data

#### b.1 Test Code on Dask 

In [15]:
ids = dfx.index.unique().compute()

In [16]:
len(ids)

12815753

In [17]:
ids[15]

'--L7WWc7F8HIwlUVpKmgcA_0000'

In [23]:
len(dfx)

51155844

In [102]:
dfx.head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
0,---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
1,---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
2,---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
3,---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
4,---0LzeiQbmXa7iCz6GzAw_0000,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False


In [37]:
dfx1 = dfx.loc[dfx.ID == ids[1]]

In [50]:
# Selecting rows with loc!
dfx.loc[1].head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
1,---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False


In [63]:
dfx["START_DATE"].head(1)

0    2014-06
Name: START_DATE, dtype: object

In [70]:
dfx[dfx["START_DATE"].isna()].head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT


In [101]:
dfx["START_DATE"].head(1)

0    2014-06
Name: START_DATE, dtype: object

In [49]:
dfx.loc[dfx.ID == ids[0]]["START_DATE"].head()

556    1995-01
557    1997-01
558    1999-01
559    2001-01
560    2001-01
Name: START_DATE, dtype: string

In [127]:
dfx[(dfx["START_DATE"]>"2014-04") & (dfx["START_DATE"]<"2018-04")].head()

Unnamed: 0_level_0,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
---18jGh5nUfRprRcGwDqQ_0000,2016-01,2016-01,Town Of Taos,43-3031.00,"Bookkeeping, Accounting, and Auditing Clerks",united states,False
---36XuDr6y7VhJ8US4ffA_0000,2016-10,2018-07,Intermex Wire Transfer,11-9199.00,"Managers, All Other",united states,False


#### b.2 Apply code to sample dataset

In [8]:
len(dfx.groupby('ID').size())

12815753

In [103]:
dfx.loc[dfx.ID == '---0KdvEQZJHVrusyaBjfA_0000'].head()

Unnamed: 0,ID,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
0,---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
1,---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
2,---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
3,---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True


In [186]:
dfx[(dfx['START_DATE'] > dfx['START_DATE'].head(1).iloc[0])].head()

Unnamed: 0_level_0,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
---18jGh5nUfRprRcGwDqQ_0000,2016-01,2016-01,Town Of Taos,43-3031.00,"Bookkeeping, Accounting, and Auditing Clerks",united states,False
---36XuDr6y7VhJ8US4ffA_0000,2016-10,2018-07,Intermex Wire Transfer,11-9199.00,"Managers, All Other",united states,False


In [201]:
job_hop_count_df = pd.read_csv("raw data/job_hop_count_empty.csv",index_col=0)

In [231]:
sample_dfx.head()

Unnamed: 0_level_0,START_DATE,END_DATE,COMPANY_NAME,ONET,ONET_NAME,NATION_RAW,IS_CURRENT
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
---0KdvEQZJHVrusyaBjfA_0000,2014-06,2017-04,Northern Illinois University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2017-05,2018-10,Keypath Education,25-9031.00,Instructional Coordinators,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2018-01,2019-12,American Intercontinental University,13-1151.00,Training and Development Specialists,united states,False
---0KdvEQZJHVrusyaBjfA_0000,2020-01,2024-04,American Intercontinental University,17-2051.00,Civil Engineers,united states,True
---0LzeiQbmXa7iCz6GzAw_0000,2006-01,2010-01,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",united states,False


In [7]:
def custom_function(group):
    # Update list
    updates = []
    
    group = group.reset_index(drop=True)  # Reset index without adding 'index' column
    
    for i in range(len(group) - 1):

        # Extract current job info
        current_st = group.loc[i, "START_DATE"]
        current_end = group.loc[i, "END_DATE"]
        current_ONET = group.loc[i, "ONET"]

        # Iterate through remaining experiences
        for j in range(i + 1, len(group)):
            next_st = group.loc[j, "START_DATE"]
            #next_ONET = group.loc[j, "ONET"]
            
            # Check for overlap
            if next_st > current_st and next_st < current_end:
                # If there's overlap, handle as needed
                # Update your job_hop_count_df here based on your logic
                # Ensure this DataFrame is accessible and modifiable as intended
                #job_hop_count_df.loc[current_ONET][next_ONET] += 1
                update_info = {
                    'current_ONET': current_ONET,  # The ONET code of the current job experience
                    'next_ONET': group.loc[i+1, "ONET"],  # The ONET code of the next or overlapping job
                    'update_value': 1  # Assuming a simple count increment for each hop
                }
                updates.append(update_info)

            # If the first comparison after the current job doesn't overlap, break the loop
            if j == i + 1 and not (next_st > current_st and next_st < current_end):      # For variant where we count the extra hop to the next occupation without an overlap, we would change this line (exclude j==i+1)
                #job_hop_count_df.loc[current_ONET][next_ONET] += 1
                update_info = {'current_ONET': current_ONET,  
                            'next_ONET': group.loc[i+1, "ONET"],
                            'update_value': 1}
                updates.append(update_info)
                break
    
    return pd.DataFrame(updates)

    # Return modified group if necessary or perform other operations
    #return group  # Or update global DataFrame directly if that's your intent

# Example of how to apply custom_function to each group in a Dask DataFrame, assuming 'ID' is your groupby column
# Note: This pseudocode assumes job_hop_count_df is properly initialized and accessible within custom_function
# results = ddf.groupby('ID').apply(custom_function, meta=ddf).compute()  # Adjust 'meta' as necessary


In [8]:
                update_info = {
                    'current_ONET': 'yyy',  # The ONET code of the current job experience
                    'next_ONET': 'xxx',  # The ONET code of the next or overlapping job
                    'update_value': 1  # Assuming a simple count increment for each hop
                }

In [9]:
pd.DataFrame([update_info])

Unnamed: 0,current_ONET,next_ONET,update_value
0,yyy,xxx,1


In [10]:
result = sample_dfx.groupby('ID').apply(custom_function, meta={'current_ONET': 'object', 'next_ONET': 'object', 'update_value': 'int'}).compute()

We observe that our hop counting algorithm works on our sample dataset with 10,000 rows and has running time of under 2 minutes! 

In [11]:
result

Unnamed: 0_level_0,Unnamed: 1_level_0,current_ONET,next_ONET,update_value
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
---0KdvEQZJHVrusyaBjfA_0000,0,13-1151.00,25-9031.00,1.0
---0KdvEQZJHVrusyaBjfA_0000,1,25-9031.00,13-1151.00,1.0
---0KdvEQZJHVrusyaBjfA_0000,2,13-1151.00,17-2051.00,1.0
---0LzeiQbmXa7iCz6GzAw_0000,0,43-6014.00,15-1254.00,1.0
---0Mxhb8o75R0ZZPEh4tA_0000,0,41-3091.00,13-2052.00,1.0
...,...,...,...,...
znkQJoWCe2RpJLFgA8DT7w_0000,0,15-1253.00,53-7062.00,1.0
znkR2nRglW3fdTkX2syH7Q_0000,0,27-2012.00,11-2011.00,1.0
znkR2nRglW3fdTkX2syH7Q_0000,1,11-2011.00,11-2011.00,1.0
znkR2nRglW3fdTkX2syH7Q_0000,2,11-2011.00,11-2032.00,1.0


In [255]:
len(result)

854696

In [259]:
from tqdm import tqdm

In [260]:
for _, update in tqdm(result.iterrows()):
    job_hop_count_df.loc[update['current_ONET'], update['next_ONET']] += update['update_value']

910it [00:00, 5105.64it/s]

854696it [02:44, 5187.40it/s]


In [261]:
job_hop_count_df

Unnamed: 0_level_0,11-1011.00,11-1011.03,11-1021.00,11-1031.00,11-2011.00,11-2021.00,11-2022.00,11-2032.00,11-2033.00,11-3012.00,...,55-2012.00,55-2013.00,55-3011.00,55-3012.00,55-3013.00,55-3014.00,55-3015.00,55-3016.00,55-3018.00,55-3019.00
ONET (row->col),Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11-1011.00,4897,0,1408,8,45,597,835,394,1,42,...,0,0,0,0,0,2,0,0,0,2
11-1011.03,0,1,2,0,0,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
11-1021.00,1910,2,9766,2,35,1523,2369,227,6,106,...,0,0,0,0,0,1,0,12,0,0
11-1031.00,3,0,0,1,0,2,0,9,0,0,...,0,0,0,0,0,0,0,0,0,0
11-2011.00,35,0,45,0,110,224,92,55,0,1,...,0,0,0,0,0,0,0,0,0,4
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
55-3014.00,0,0,4,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,1
55-3015.00,0,0,2,0,0,0,1,0,0,0,...,0,0,0,0,0,0,0,0,0,0
55-3016.00,9,0,26,0,0,3,9,2,0,0,...,0,0,0,0,0,1,0,44,0,0
55-3018.00,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [264]:
job_hop_count_df.to_parquet("Sample_transition_matrix.parquet")

In [265]:
job_hop_count_df.to_csv("Sample_transition_matrix.csv")

In [263]:
job_hop_count_df.sum().sum()

1239214

In [262]:
(job_hop_count_df !=0).sum().sum()

87303

#### b.3 Apply code to full dataset

In [None]:
dfx= dd.read_csv("Lightcast_jobs_filtered_sorted_V2.csv")

In [6]:
dfx = dfx.iloc[:,1:].set_index("ID",inplace = True).persist()

In [7]:
len(dfx)

Task exception was never retrieved
future: <Task finished name='Task-330274' coro=<Client._gather.<locals>.wait() done, defined at c:\Users\ewp\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\client.py:2197> exception=AllExit()>
Traceback (most recent call last):
  File "c:\Users\ewp\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\client.py", line 2206, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-1931323' coro=<Client._gather.<locals>.wait() done, defined at c:\Users\ewp\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\client.py:2197> exception=AllExit()>
Traceback (most recent call last):
  File "c:\Users\ewp\AppData\Local\Programs\Python\Python312\Lib\site-packages\distributed\client.py", line 2206, in wait
    raise AllExit()
distributed.client.AllExit


51155844

In [23]:
job_hop_count_df = pd.read_csv("raw data/job_hop_count_empty.csv",index_col=0)

In [8]:
def custom_function(group):
    # Update list
    updates = []

    #group = group.repartition(npartitions=min(10, len(group)))

    group = group.reset_index(drop=True)  # Reset index without adding 'index' column
    
    for i in range(len(group) - 1):

        # Extract current job info
        current_st = group.loc[i, "START_DATE"]
        current_end = group.loc[i, "END_DATE"]
        current_ONET = group.loc[i, "ONET"]

        # Iterate through remaining experiences
        for j in range(i + 1, len(group)):  
            next_st = group.loc[j, "START_DATE"]
            #next_ONET = group.loc[j, "ONET"]
            
            # Check for overlap
            if next_st > current_st and next_st < current_end:
                # If there's overlap, handle as needed
                # Update your job_hop_count_df here based on your logic
                # Ensure this DataFrame is accessible and modifiable as intended
                #job_hop_count_df.loc[current_ONET][next_ONET] += 1
                update_info = {
                    'current_ONET': current_ONET,  # The ONET code of the current job experience
                    'next_ONET': group.loc[i+1, "ONET"],  # The ONET code of the next or overlapping job
                    'update_value': 1  # Assuming a simple count increment for each hop
                }
                updates.append(update_info)

            # If the first comparison after the current job doesn't overlap, break the loop
            if j == i + 1 and not (next_st > current_st and next_st < current_end):      # For variant where we count the extra hop to the next occupation without an overlap, we would change this line (exclude j==i+1)
                #job_hop_count_df.loc[current_ONET][next_ONET] += 1
                update_info = {'current_ONET': current_ONET,  
                            'next_ONET': group.loc[i+1, "ONET"],
                            'update_value': 1}
                updates.append(update_info)
                break
    
    return pd.DataFrame(updates)

    # Return modified group if necessary or perform other operations
    #return group  # Or update global DataFrame directly if that's your intent

# Example of how to apply custom_function to each group in a Dask DataFrame, assuming 'ID' is your groupby column
# Note: This pseudocode assumes job_hop_count_df is properly initialized and accessible within custom_function
# results = ddf.groupby('ID').apply(custom_function, meta=ddf).compute()  # Adjust 'meta' as necessary


In [9]:
result = dfx.groupby('ID').apply(custom_function, meta={'current_ONET': 'object', 'next_ONET': 'object', 'update_value': 'int'})

In [11]:
result.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,current_ONET,next_ONET,update_value
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
---0KdvEQZJHVrusyaBjfA_0000,0,13-1151.00,25-9031.00,1
---0KdvEQZJHVrusyaBjfA_0000,1,25-9031.00,13-1151.00,1
---0KdvEQZJHVrusyaBjfA_0000,2,13-1151.00,17-2051.00,1
---0LzeiQbmXa7iCz6GzAw_0000,0,43-6014.00,15-1254.00,1
---0Mxhb8o75R0ZZPEh4tA_0000,0,41-3091.00,13-2052.00,1


In [12]:
type(result)

dask_expr._collection.DataFrame

Before persisting the result or computing it, we process this data as much as we can as a Dask dataframe, and then we convert it into a pandas dataframe.

Here, we sum the transitions between same pairs of ONET codes first. This gives us a smaller dataframe.

In [10]:
# Assuming ddf is your Dask DataFrame
aggregated_ddf = result.groupby(['current_ONET', 'next_ONET']).agg({'update_value': 'sum'}).reset_index()


In [11]:
aggregated_ddf

Unnamed: 0_level_0,current_ONET,next_ONET,update_value
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
,object,object,int32
,...,...,...


In [18]:
aggregated_ddf1 = aggregated_ddf.compute()

In [20]:
aggregated_ddf1

Unnamed: 0,current_ONET,next_ONET,update_value
0,41-3091.00,41-4012.00,24455
1,21-1021.00,41-3091.00,156
2,33-3051.00,43-5011.00,33
3,11-1021.00,11-3031.00,35671
4,11-2021.00,11-9111.00,7505
...,...,...,...
58037,51-6093.00,53-2021.00,1
58038,51-1011.00,51-3023.00,1
58039,29-2043.00,17-3022.00,1
58040,51-4111.00,51-8013.00,1


In [20]:
aggregated_ddf1.to_parquet("Transitions.parquet")

In [21]:
aggregated_ddf1.to_csv("Transitions.csv")

In [31]:
aggregated_ddf1[(aggregated_ddf1.current_ONET == '11-1011.00') & (aggregated_ddf1.next_ONET == '11-1011.00')]

Unnamed: 0,current_ONET,next_ONET,update_value
2,11-1011.00,11-1011.00,153583


Now, we build the transition matrix by creating a pivot table and then ensuring it is a square matrix.

In [21]:
piv = aggregated_ddf1.pivot_table(index='current_ONET', columns='next_ONET', values='update_value', fill_value=0)

In [29]:
piv

next_ONET,11-1011.00,11-1011.03,11-1021.00,11-1031.00,11-2011.00,11-2021.00,11-2022.00,11-2032.00,11-2033.00,11-3012.00,...,55-2011.00,55-2012.00,55-2013.00,55-3011.00,55-3012.00,55-3013.00,55-3014.00,55-3015.00,55-3016.00,55-3019.00
current_ONET,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
11-1011.00,153583.0,25.0,44281.0,178.0,867.0,18447.0,23859.0,11130.0,93.0,1159.0,...,0.0,6.0,1.0,0.0,0.0,0.0,8.0,9.0,34.0,24.0
11-1011.03,26.0,47.0,23.0,1.0,0.0,30.0,10.0,30.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
11-1021.00,61170.0,24.0,319037.0,71.0,1101.0,46666.0,76036.0,6822.0,73.0,3190.0,...,0.0,11.0,2.0,0.0,0.0,0.0,14.0,18.0,116.0,43.0
11-1031.00,166.0,0.0,49.0,18.0,2.0,22.0,43.0,64.0,0.0,5.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
11-2011.00,1806.0,1.0,1620.0,2.0,4329.0,7946.0,3817.0,1716.0,12.0,56.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,1.0,4.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
55-3014.00,17.0,0.0,45.0,0.0,0.0,7.0,8.0,2.0,0.0,1.0,...,0.0,3.0,0.0,0.0,0.0,0.0,27.0,3.0,17.0,19.0
55-3015.00,12.0,0.0,37.0,0.0,0.0,10.0,15.0,4.0,0.0,3.0,...,0.0,1.0,0.0,0.0,0.0,0.0,0.0,29.0,0.0,1.0
55-3016.00,276.0,0.0,777.0,3.0,3.0,143.0,296.0,46.0,0.0,25.0,...,0.0,7.0,0.0,0.0,0.0,0.0,31.0,11.0,1119.0,55.0
55-3018.00,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [24]:
# Ensure the matrix is square
all_labels = piv.index.union(job_hop_count_df.index)
pivot_table = piv.reindex(index=all_labels, columns=all_labels, fill_value=0)


In [25]:
pivot_table

Unnamed: 0,11-1011.00,11-1011.03,11-1021.00,11-1031.00,11-2011.00,11-2021.00,11-2022.00,11-2032.00,11-2033.00,11-3012.00,...,55-2012.00,55-2013.00,55-3011.00,55-3012.00,55-3013.00,55-3014.00,55-3015.00,55-3016.00,55-3018.00,55-3019.00
11-1011.00,153583.0,25.0,44281.0,178.0,867.0,18447.0,23859.0,11130.0,93.0,1159.0,...,6.0,1.0,0.0,0.0,0.0,8.0,9.0,34.0,0.0,24.0
11-1011.03,26.0,47.0,23.0,1.0,0.0,30.0,10.0,30.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
11-1021.00,61170.0,24.0,319037.0,71.0,1101.0,46666.0,76036.0,6822.0,73.0,3190.0,...,11.0,2.0,0.0,0.0,0.0,14.0,18.0,116.0,0.0,43.0
11-1031.00,166.0,0.0,49.0,18.0,2.0,22.0,43.0,64.0,0.0,5.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
11-2011.00,1806.0,1.0,1620.0,2.0,4329.0,7946.0,3817.0,1716.0,12.0,56.0,...,0.0,0.0,0.0,0.0,0.0,0.0,2.0,1.0,0.0,4.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
55-3014.00,17.0,0.0,45.0,0.0,0.0,7.0,8.0,2.0,0.0,1.0,...,3.0,0.0,0.0,0.0,0.0,27.0,3.0,17.0,0.0,19.0
55-3015.00,12.0,0.0,37.0,0.0,0.0,10.0,15.0,4.0,0.0,3.0,...,1.0,0.0,0.0,0.0,0.0,0.0,29.0,0.0,0.0,1.0
55-3016.00,276.0,0.0,777.0,3.0,3.0,143.0,296.0,46.0,0.0,25.0,...,7.0,0.0,0.0,0.0,0.0,31.0,11.0,1119.0,0.0,55.0
55-3018.00,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [37]:
pivot_table.to_csv("Transition_Matrix.csv")

--- 
### (Incompelte) 1.3 Include Education Data

In [20]:
df_edu = dd.read_csv("Lightcast_edu_filtered.csv")

In [21]:
df_edu = df_edu.iloc[:,1:]

In [37]:
len(df_edu)

25553583

In [6]:
df.columns

Index(['ID', 'START_DATE', 'END_DATE', 'COMPANY_NAME', 'ONET', 'ONET_NAME',
       'NATION_RAW', 'IS_CURRENT'],
      dtype='object')

In [7]:
df_edu.columns

Index(['ID', 'SCHOOL_NAME', 'EDUCATION_LEVEL', 'EDUCATION_LEVEL_NAME',
       'NATION_RAW', 'START_DATE', 'END_DATE'],
      dtype='object')

#### 1.3.1 Merge education and job data [Method 1]

##### a. Merge

In [65]:
# METHOD 1: Either we can combine the education and work experiences sequentially 
df_merged = dd.concat([df, df_edu],axis = 0)[['ID', 'COMPANY_NAME', 'ONET','ONET_NAME', 
                'SCHOOL_NAME','EDUCATION_LEVEL', 'EDUCATION_LEVEL_NAME',
                  'START_DATE','END_DATE','IS_CURRENT','NATION_RAW']].sort_values(by = 'ID')

In [66]:
df_merged['IS_CURRENT'] = df_merged['IS_CURRENT'] == True

In [27]:
df_merged.dtypes

ID                      string[pyarrow]
COMPANY_NAME            string[pyarrow]
ONET                    string[pyarrow]
ONET_NAME               string[pyarrow]
SCHOOL_NAME             string[pyarrow]
EDUCATION_LEVEL         string[pyarrow]
EDUCATION_LEVEL_NAME    string[pyarrow]
START_DATE              string[pyarrow]
END_DATE                string[pyarrow]
IS_CURRENT                         bool
NATION_RAW              string[pyarrow]
dtype: object

In [29]:
df_merged.head()

Unnamed: 0,ID,COMPANY_NAME,ONET,ONET_NAME,SCHOOL_NAME,EDUCATION_LEVEL,EDUCATION_LEVEL_NAME,START_DATE,END_DATE,IS_CURRENT,NATION_RAW
131377,---0KdvEQZJHVrusyaBjfA_0000,American Intercontinental University,13-1151.00,Training and Development Specialists,,,,2018-01,2019-12,False,united states
131378,---0KdvEQZJHVrusyaBjfA_0000,Keypath Education,25-9031.00,Instructional Coordinators,,,,2017-05,2018-10,False,united states
131379,---0KdvEQZJHVrusyaBjfA_0000,Northern Illinois University,13-1151.00,Training and Development Specialists,,,,2014-06,2017-04,False,united states
131380,---0KdvEQZJHVrusyaBjfA_0000,American Intercontinental University,17-2051.00,Civil Engineers,,,,2020-01,2024-04,True,united states
41517,---0KdvEQZJHVrusyaBjfA_0000,,,,Northeastern Illinois University,,Incomplete Degree/Certificate,,,False,united states


##### b. Sort

In [67]:
df_merged1 = df_merged.sort_values(by=['ID', 'START_DATE', 'IS_CURRENT'], ascending=[True, True, True], na_position='first')

In [68]:
df_merged1 = df_merged1.persist()

In [44]:
df_merged1.dtypes

ID                      string[pyarrow]
COMPANY_NAME            string[pyarrow]
ONET                    string[pyarrow]
ONET_NAME               string[pyarrow]
SCHOOL_NAME             string[pyarrow]
EDUCATION_LEVEL         string[pyarrow]
EDUCATION_LEVEL_NAME    string[pyarrow]
START_DATE              string[pyarrow]
END_DATE                string[pyarrow]
IS_CURRENT                         bool
NATION_RAW              string[pyarrow]
dtype: object

In [69]:
df_merged1.head(20)

Unnamed: 0,ID,COMPANY_NAME,ONET,ONET_NAME,SCHOOL_NAME,EDUCATION_LEVEL,EDUCATION_LEVEL_NAME,START_DATE,END_DATE,IS_CURRENT,NATION_RAW
41516,---0KdvEQZJHVrusyaBjfA_0000,,,,Northern Illinois University,,Incomplete Degree/Certificate,,,False,united states
41517,---0KdvEQZJHVrusyaBjfA_0000,,,,Northeastern Illinois University,,Incomplete Degree/Certificate,,,False,united states
41518,---0KdvEQZJHVrusyaBjfA_0000,,,,Brigham Young University,,Incomplete Degree/Certificate,,,False,united states
131379,---0KdvEQZJHVrusyaBjfA_0000,Northern Illinois University,13-1151.00,Training and Development Specialists,,,,2014-06,2017-04,False,united states
131378,---0KdvEQZJHVrusyaBjfA_0000,Keypath Education,25-9031.00,Instructional Coordinators,,,,2017-05,2018-10,False,united states
131377,---0KdvEQZJHVrusyaBjfA_0000,American Intercontinental University,13-1151.00,Training and Development Specialists,,,,2018-01,2019-12,False,united states
131380,---0KdvEQZJHVrusyaBjfA_0000,American Intercontinental University,17-2051.00,Civil Engineers,,,,2020-01,2024-04,True,united states
226284,---0LzeiQbmXa7iCz6GzAw_0000,Ncar - The National Center For Atmospheric Res...,43-6014.00,"Secretaries and Administrative Assistants, Exc...",,,,2006-01,2010-01,False,united states
226283,---0LzeiQbmXa7iCz6GzAw_0000,Ucar - The University Corporation For Atmosphe...,15-1254.00,Web Developers,,,,2006-01,2024-04,True,united states
466984,---0Mxhb8o75R0ZZPEh4tA_0000,,,,Iowa State University,CE321,Bachelor's Degree,1988-01-01,1993-01-01,False,united states


In [73]:
df_merged1.to_parquet("Lightcast_jobs_edu_filtered_sorted.parquet")

#### 1.3.2 Most recent degree [METHOD 2]

In [24]:
# METHOD 2: Or we can create a new column and save the most recent degree for the work experience 

df['MOST_RECENT_DEGREE'] = None

for index, row in df.iterrows():
    filtered_edu = df_edu[(df_edu['ID'] == row['ID']) & (df_edu['END_DATE'] < row['START_DATE'])]

    if not filtered_edu.empty:
        most_recent_degree = filtered_edu.loc[filtered_edu['END_DATE'].idxmax(), 'EDUCATION_LEVEL']
    else:
        most_recent_degree = None
    
    df.at[index, 'MOST_RECENT_DEGREE'] = most_recent_degree

AttributeNotImplementedError: Checking whether a Dask DataFrame has any rows may be expensive. However, checking the number of columns is fast. Depending on which of these results you need, use either `len(df.index) == 0` or `len(df.columns) == 0`