# Cleaning Many Datasets

In [1]:
import dask, dask.dataframe as dd
import matplotlib.pyplot as plt
import pandas as pd
import re, csv, os
import numpy as np
from dask import delayed, persist
from glob import glob

pd.set_option('display.max_columns', None)
csv.field_size_limit(10000000)

%matplotlib inline

## What to do before running these cells

Add the path to your files to the variable **path** below.

For **partitions_out** below think about how many GB you will be cleaning and how many files you will like to have at the end of the cleaning process. A good rule of thumb is to split large files into manageable chunks of 300 to 600 MB for analysis. If you would like to follow this approach, figure out how much data you will be cleaning in MB terms (1 GB = 1000 MB) and divide it by the size in MB terms that you would like your final files to have. For example, 3GB (or 3,000MB) divided by 300MB would amount to 10 partitions.

For **partitions_in**, do something somewhat similar than with **partitions_out** but to a much larger scale. If you are cleaning 100 GB of data make about 1000 partitions so that dask can clean faster with very manageable chunks.

In [2]:
path = '/Volumes/LaCie SSD/bgdata/data_19/test/'
partitions_in = 70
partitions_out = 50

The following are the variables which I've determined the most useful. Feel free to add or subtract from them before running the cells below. No need to update the `dtypes` dictionary below as it contains all the variables in the BG dataset.

In [3]:
best_list = ['JobID', 'CleanJobTitle', 'CanonCity', 'CanonState', 'JobDate', 'JobText', 'Source', 'CanonEmployer',
             'Latitude', 'Longitude', 'CanonIntermediary', 'CanonJobTitle', 'CanonCounty', 'DivisionCode', 'MSA', 'LMA',
             'InternshipFlag', 'ConsolidatedONET', 'CanonSkillClusters', 'CanonSkills', 'IsDuplicate', 'CanonMinimumDegree', 
             'CanonRequiredDegrees', 'CIPCode', 'MinExperience', 'ConsolidatedInferredNAICS', 'BGTOcc', 'MaxAnnualSalary',
             'MaxHourlySalary', 'MinAnnualSalary', 'MinHourlySalary', 'YearsOfExperience', 'CanonJobHours', 'CanonJobType',
             'CanonPostalCode', 'CanonYearsOfExperienceCanonLevel', 'CanonYearsOfExperienceLevel', 'ConsolidatedTitle', 
             'Language', 'BGTSubOcc', 'ConsolidatedDegreeLevels', 'MaxDegreeLevel', 'MinDegreeLevel']

Because the data is very messy and dask can't infer correctly all of the variables' data types without taking away the gain of parallelizing the computations, we will import every var with the data type as a string.

In [4]:
dtypes={'JobID': np.str, 'CleanJobTitle': np.str, 'JobDomain': np.str, 
        'CanonCity': np.str, 'CanonCountry': np.str, 'CanonState': np.str, 
        'JobText': np.str, 'JobURL': np.str, 'PostingHTML': np.str, 
        'Source': np.str, 'JobReferenceID': np.str, 'Email': np.str, 
        'CanonEmployer': np.str, 'Latitude': np.str, 'Longitude': np.str, 
        'CanonIntermediary': np.str, 'Telephone': np.str, 'CanonJobTitle': np.str, 
        'CanonCounty': np.str, 'DivisionCode': np.str, 'MSA': np.str, 'LMA': np.str,
        'InternshipFlag': np.str, 'ConsolidatedONET': np.str, 'CanonCertification': np.str, 
        'CanonSkillClusters': np.str, 'CanonSkills': np.str, 'IsDuplicate': np.str, 
        'IsDuplicateOf': np.str, 'CanonMaximumDegree': np.str, 'CanonMinimumDegree': np.str, 
        'CanonOtherDegrees': np.str, 'CanonPreferredDegrees': np.str,
        'CanonRequiredDegrees': np.str, 'CIPCode': np.str, 'StandardMajor': np.str, 
        'MaxExperience': np.str, 'MinExperience': np.str, 'ConsolidatedInferredNAICS': np.str, 
        'BGTOcc': np.str, 'MaxAnnualSalary': np.str, 'MaxHourlySalary': np.str, 
        'MinAnnualSalary': np.str, 'MinHourlySalary': np.str, 'YearsOfExperience': np.str, 
        'CanonJobHours': np.str, 'CanonJobType': np.str, 'CanonPostalCode': np.str, 
        'CanonYearsOfExperienceCanonLevel': np.str, 'CanonYearsOfExperienceLevel': np.str, 
        'ConsolidatedTitle': np.str, 'Language': np.str, 'BGTSubOcc': np.str, 'JobDate': np.str,
        'ConsolidatedDegreeLevels': np.str, 'MaxDegreeLevel': np.str, 'MinDegreeLevel': np.str,
                       }

Notice the wildcard in the `os.path.join()` call of your dask dataframe `read_csv` function. That tells Dask to grab all of the files that end with `'.csv'` inside your folder. You can make it more specific by adding more characters before and after the star. For example, `'data_0*.csv'` will grab all CSV files in your folder that start with `data_0` and end with `.csv`.

Also notice the we pass in the list of variables and the the dictionary of data types. We also tell dask to assume that there will be missing data with the parameter `assume_missing`. Error bad lines will print the bad lines that dask skips for us.

Make sure to add a few letters from the start of your files.

Now run everything and wait. :)

In [5]:
ddf = dd.read_csv(os.path.join(path, 'da*.csv'), 
                 engine='python', 
                 dtype=dtypes,
                 assume_missing=True,
                 error_bad_lines=False,
                 blocksize=None,
                 usecols=best_list,
                )
ddf

Skipping line 49: unexpected end of data


Unnamed: 0_level_0,JobID,CleanJobTitle,CanonCity,CanonState,JobDate,JobText,Source,CanonEmployer,Latitude,Longitude,CanonIntermediary,CanonJobTitle,CanonCounty,DivisionCode,MSA,LMA,InternshipFlag,ConsolidatedONET,CanonSkillClusters,CanonSkills,IsDuplicate,CanonMinimumDegree,CanonRequiredDegrees,CIPCode,MinExperience,ConsolidatedInferredNAICS,BGTOcc,MaxAnnualSalary,MaxHourlySalary,MinAnnualSalary,MinHourlySalary,YearsOfExperience,CanonJobHours,CanonJobType,CanonPostalCode,CanonYearsOfExperienceCanonLevel,CanonYearsOfExperienceLevel,ConsolidatedTitle,Language,BGTSubOcc,ConsolidatedDegreeLevels,MaxDegreeLevel,MinDegreeLevel
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1
,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object,object
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [6]:
# here is where we repartition our data
ddf00 = ddf.repartition(npartitions=partitions_in)

In [7]:
%%time

ddf00.head()

CPU times: user 33 s, sys: 13.9 s, total: 46.8 s
Wall time: 53.5 s


Unnamed: 0,JobID,CleanJobTitle,CanonCity,CanonState,JobDate,JobText,Source,CanonEmployer,Latitude,Longitude,CanonIntermediary,CanonJobTitle,CanonCounty,DivisionCode,MSA,LMA,InternshipFlag,ConsolidatedONET,CanonSkillClusters,CanonSkills,IsDuplicate,CanonMinimumDegree,CanonRequiredDegrees,CIPCode,MinExperience,ConsolidatedInferredNAICS,BGTOcc,MaxAnnualSalary,MaxHourlySalary,MinAnnualSalary,MinHourlySalary,YearsOfExperience,CanonJobHours,CanonJobType,CanonPostalCode,CanonYearsOfExperienceCanonLevel,CanonYearsOfExperienceLevel,ConsolidatedTitle,Language,BGTSubOcc,ConsolidatedDegreeLevels,MaxDegreeLevel,MinDegreeLevel
0,38472243834,Sdet,San Francisco,CA,2019-01-01,SDET\n\nABOTTS Consulting\n\n-\n\nSan Francisc...,Job Board,,37.7798,-122.417,,,San Francisco,41884.0,41860: Metropolitan Statistical Area|488: Comb...,DV064188|MT064186,0,17205100,Specialized Skills|Information Technology: Sof...,"{'Analytical Skills': 'Specialized Skills', 'A...",False,Bachelor's in Computer Science,Bachelor's|Bachelor's in Computer Science,110701.0,5.0,,17-2051.00,,,,,5+ years|6 years,fulltime,temporary,94101,1-6,mid,Sdet,en,Civil Engineer,16.0,,16.0
1,38472243883,Skilled Nursing Biller,Kannapolis,NC,2019-01-01,Skilled Nursing Biller\n\nGatewood Healthcare ...,Job Board,,35.4971,-80.65,,,Cabarrus,,16740: Metropolitan Statistical Area,MT371674,0,29114100,Finance: Billing and Invoicing;Specialized Ski...,{'Billing': 'Finance: Billing and Invoicing;Sp...,False,,,,2.0,6231.0,29-1141.00,,,,,Minimum of two years,fulltime,permanent,28081,1-6,mid,Nursing Biller,en,Registered Nurse,,,
2,38472243901,Senior Engineer,Cincinnati,OH,2019-01-01,View All num of num Close (Esc)\n\nGreater Cin...,Job Board,Greater Cincinnati Water Works,39.1072,-84.5004,,,Hamilton,,17140: Metropolitan Statistical Area,MT391714,0,17205100,Specialized Skills|Architecture and Constructi...,"{'Calculation': 'Specialized Skills', 'Cost Es...",False,Bachelor's,Bachelor's,,,,17-2051.00,96535.1,46.41,71831.1,34.53,one year,fulltime,permanent,45201,,,Senior Engineer,en,Civil Engineer,16.0,,16.0
3,38472243915,Customer Service-Restaurant,Cincinnati,OH,2019-01-01,Popeyes Logo\n\nCustomer Service-Restaurant\n\...,Job Board,Popeyes,39.1072,-84.5004,,,Hamilton,,17140: Metropolitan Statistical Area,MT391714,0,43405100,Customer and Client Support: Cash Register Ope...,{'Cash Handling': 'Customer and Client Support...,False,,,,,722513.0,43-4051.00,,,,,,,,45201,,,Customer Service-Restaurant,en,Customer Service Representative (General),,,
4,38472243879,Companion Aide,Charlotte,NC,2019-01-01,Companion Aide\n\nThe Cypress of Charlotte Clu...,Job Board,,35.1943,-80.8266,,Companion Aide,Mecklenburg,,16740: Metropolitan Statistical Area,MT371674,0,39902100,Common Skills|Health Care: Basic Living Activi...,"{'Communication Skills': 'Common Skills', 'Com...",False,Higher Secondary Certificate,Higher Secondary Certificate,,1.0,7139.0,39-9021.00,,,,,1 year,parttime,permanent,28201,0-1,low,Companion Aide,en,Caregiver / Personal Care Aide,12.0,,12.0


In [8]:
%%time

ddf00.tail()

CPU times: user 57.5 s, sys: 1min 16s, total: 2min 14s
Wall time: 3min 52s


Unnamed: 0,JobID,CleanJobTitle,CanonCity,CanonState,JobDate,JobText,Source,CanonEmployer,Latitude,Longitude,CanonIntermediary,CanonJobTitle,CanonCounty,DivisionCode,MSA,LMA,InternshipFlag,ConsolidatedONET,CanonSkillClusters,CanonSkills,IsDuplicate,CanonMinimumDegree,CanonRequiredDegrees,CIPCode,MinExperience,ConsolidatedInferredNAICS,BGTOcc,MaxAnnualSalary,MaxHourlySalary,MinAnnualSalary,MinHourlySalary,YearsOfExperience,CanonJobHours,CanonJobType,CanonPostalCode,CanonYearsOfExperienceCanonLevel,CanonYearsOfExperienceLevel,ConsolidatedTitle,Language,BGTSubOcc,ConsolidatedDegreeLevels,MaxDegreeLevel,MinDegreeLevel
441294,38555508265,Sales And Visual Sales,Austin,TX,2019-07-08,"Part-time Sales and Visual Sales in Austin, Te...",Company,Container Store,30.2202,-97.7492,,,Travis,,12420: Metropolitan Statistical Area,MT481242,0,41401200,Specialized Skills|Common Skills|Specialized S...,"{'Cleaning': 'Specialized Skills', 'Communicat...",False,,,,,453998.0,41-4011.00,,,,,,parttime,permanent,73301.0,,,Visual Sales,en,Sales Representative,,,
441295,38555508266,Long Term Elementary Substitute Teacher 1 0 Ft...,,UT,2019-07-08,LONG TERM ELEMENTARY SUBSTITUTE TEACHER 1.0 FT...,Company,Granite School District,,,,Substitute Teacher,,,,,0,25309900,Education and Training: Teaching;Specialized S...,{'Lesson Planning': 'Education and Training: T...,False,,,,,6111.0,25-3099.00,,,,,,fulltime,temporary,,,,Substitute Teacher,en,Substitute Teacher,,,
441296,38555508284,Post-Doctoral Scholar In Water Resources Policy,Irvine,CA,2019-07-08,Postdoctoral Scholar in Water Resources Policy...,Job Board,Irvine,33.7425,-117.747,,,Orange,11244.0,31080: Metropolitan Statistical Area|348: Comb...,DV064204|MT063110,0,19201100,Common Skills|Specialized Skills|Analysis: Dat...,"{'Communication Skills': 'Common Skills', 'Cre...",False,Doctor of Philosophy,Doctor of Philosophy|Doctorate,,,6113.0,,,,,,,fulltime,permanent,92602.0,,,"Doctor/Scholar, Water Resources,Policy",en,,21.0,,21.0
441297,38555508301,Senior Design Engineer,San Diego,CA,2019-07-08,req11053 \n Senior Design Engineer 2 \n \n \n ...,Company,Asml Holding N V,32.7211,-117.164,,,San Diego,,41740: Metropolitan Statistical Area,MT064174,0,17214100,Finance: Budget Management;Specialized Skills|...,{'Budgeting': 'Finance: Budget Management;Spec...,False,Bachelor's,Bachelor's,141901.0,7.0,,17-2141.00,,,,,3-7 years|7 years,,,92101.0,6+,high,Senior Design Engineer,en,Mechanical Design Engineer,16.0,,16.0
441298,38555508302,Senior Supply Chain Specialist,Greensboro,NC,2019-07-08,Sr. Supply Chain Specialist\n\nCompany: N/A\n\...,Job intermediary,,35.0033,-79.3376,Belcan,Supply Chain Specialist,Guilford,,24660: Metropolitan Statistical Area,MT372466,0,13108100,Business: Business Strategy;Specialized Skills...,{'Business Planning': 'Business: Business Stra...,False,Bachelor's,Bachelor's,,2.0,,13-1081.00,,,,,,,,27395.0,1-6,mid,Supply Chain Specialist,en,Supply Chain Specialist,16.0,,16.0


In [9]:
ddf00.npartitions

70

In [10]:
# There are missing company names that map to a recruiting agency and because of this
# we will identify those observations and fill in the missing valyes in the CanonEmployer
# var with "Recruitment Agency"
EmployerCondition = ((ddf00['CanonEmployer'].isnull()) & (ddf00['CanonIntermediary'].notnull()))
EmployerClean = ddf00['CanonEmployer'].where(~EmployerCondition, 'Recruitment Agency')

# we will then drop the original variable and add the new one to the dataset using the following methods
ddf_clean0 = ddf00.drop('CanonEmployer', axis=1)
ddf_clean01 = ddf_clean0.assign(EmployerClean=EmployerClean)

### Note: The following cell will take a while!

In [None]:
%%time

# We have a lot of missing values in this dataset so let's start by calculating those
# as a percentage of all of the samples in our datasets
missing_count = ((ddf_clean01.isna().sum() / ddf_clean01.index.size) * 100)
missing_count_pct = missing_count.compute()
missing_count_pct

In [None]:
# we will now drop the columns with 60% or more missing values
cols_to_drop = list(missing_count_pct[missing_count_pct >= 60].index)
cols_to_drop

In [None]:
# Since the rows above have more than 60, 70 and 80% of missing values,
# we will be getting rid of them with the drop command
ddf_clean1 = ddf_clean01.drop(cols_to_drop, axis=1)

# since english must be the most common language for the majority of positions in 
# the USA, we will fill in missing values in that colunm with the en value in the Language var
ddf_clean2 = ddf_clean1.fillna({'Language': 'en'})

In [None]:
# here we will get rid of the rows in columns with missing values 
# between 1 and 10%
rows_to_drop = list(missing_count_pct[(missing_count_pct < 10) & (missing_count_pct > 0)].index)
rows_to_drop

In [None]:
# here is the code to drop them
ddf_clean3 = ddf_clean2.dropna(subset=rows_to_drop)

In [None]:
# we will assign the word "Unknown" the remaining columns with missing values
# The nice thing about python and many other languages is that we can read the data
# and tell it to reassign np.nan to observations containing the word "Unknown"
remaining_cols_to_clean = list(missing_count_pct[(missing_count_pct >= 10) & (missing_count_pct < 60)].index)
unknown_default_dict = dict(map(lambda columnName: (columnName, 'Unknown'), remaining_cols_to_clean))
unknown_default_dict

In [None]:
# here we fill in those missing values
ddf_clean4 = ddf_clean3.fillna(unknown_default_dict)

In [None]:
# if you would like to make sure you don't have any other missing values,
# uncomment and run the cell below

# print(ddf_clean4.isnull().sum().compute())

In [None]:
# The JobText var is not formatted correctly so we will first clean it
# and create a new variable called clean_text
clean_text = ddf_clean4.loc[:, 'JobText'].apply(lambda x: ' '.join(list(filter(None, x.split()))), meta=np.str)

# we will then drop the old JobText var
ddf_clean5 = ddf_clean4.drop('JobText', axis=1)

# Here we reassign the cleaned var back into the dataset
ddf_clean6 = ddf_clean5.assign(clean_text=clean_text)

# we will now filter out job descriptions that are not written in english
english_condition = ddf_clean6['Language'].isin(['en'])
ddf_clean7 = ddf_clean6[english_condition]

# We will then convert the JobDate var into a date variable
dates = dd.to_datetime(ddf_clean7['JobDate'])
# drop the old one
ddf_clean8 = ddf_clean7.drop('JobDate', axis=1)
# and reassign the new one
ddf_clean9 = ddf_clean8.assign(JobDate=dates)

# let's filter out duplicate jobs and then drop that column
no_duplicates = (ddf_clean9['IsDuplicate'] == "FALSE")
ddf_clean10 = ddf_clean9[no_duplicates]
ddf_clean11 = ddf_clean10.drop('IsDuplicate', axis=1)

### Note: The following cell will take a while!

In [None]:
%%time

# using the same folder in your path, we will create a new one for the cleaned data
# and save our new files there
if not os.path.exists(os.path.join(path, 'clean')):
    os.makedirs(os.path.join(path, 'clean'))
    

# the following lines of code will take the last dataset, repartition it,
# and save it to the desired location. Notice the wildcard "*" below. That is
# the spot Dask will use to number your files starting from 0
(ddf_clean11
 .repartition(npartitions=partitions_out)
 .to_csv(os.path.join(path, 'clean/', 'data_cleaned_*.csv'), index=False)
 )