In [6]:
# dask package installation and import of the libraries that will be used in the project
!python -m pip install "dask[dataframe]"

import pandas as pd
import numpy as np
import dask.dataframe as dd
import shutil



In [2]:
# selection of columns to be used
cols = ['CBO Ocupação 2002',    # profession code
        'Faixa Etária',         # age group        
        'Município',            # city
        'Vl Remun Média Nom',   # salary
        'Sexo Trabalhador']     # gender

In [3]:
# reading the files in a dask dataframe
df = dd.read_csv('rais_raw_data/RAIS_VINC*',sep = ";", encoding= "ISO-8859-1", usecols = cols, low_memory=False, 
                 dtype={'CBO Ocupação 2002': 'object','Faixa Etária': 'object'})

In [4]:
# removal of lines that are not of interest to the project and consequent reduction of the dataset
df = df[df['CBO Ocupação 2002'].str.startswith('2')]

In [5]:
# transforming a dask dataframe to a pandas dataframe
df = df.compute()

In [7]:
# checking the number of rows and columns of the dataframe
df.shape

(7072962, 5)

In [8]:
df.head()

Unnamed: 0,CBO Ocupação 2002,Faixa Etária,Município,Vl Remun Média Nom,Sexo Trabalhador
362,231305,5,500240,359152,1
531,231305,5,500240,375003,1
546,232105,5,500240,222463,1
547,232105,5,500240,215915,1
568,254410,7,500240,142658,1


In [9]:
# loading project supplementary data into pandas dataframes
cbo = pd.read_csv('supplementary_data/cbo.csv')
state = pd.read_csv('supplementary_data/states.csv')

In [10]:
cbo.head()

Unnamed: 0,CBO Ocupação 2002,Profession
0,251505,
1,251510,
2,251515,
3,251520,
4,251525,


In [11]:
state.head()

Unnamed: 0,Município,State
0,110001,Rondonia
1,110002,Rondonia
2,110003,Rondonia
3,110004,Rondonia
4,110005,Rondonia


In [12]:
# join of the original dataframe with the dataframe with the names of the professions
data = pd.merge(df, cbo, on='CBO Ocupação 2002', how ='left')

In [13]:
data.head()

Unnamed: 0,CBO Ocupação 2002,Faixa Etária,Município,Vl Remun Média Nom,Sexo Trabalhador,Profession
0,231305,5,500240,359152,1,
1,231305,5,500240,375003,1,
2,232105,5,500240,222463,1,
3,232105,5,500240,215915,1,
4,254410,7,500240,142658,1,


In [14]:
# removal of lines without the name of a profession
data = data[data['Profession'].notnull()]

In [15]:
data.shape

(54851, 6)

In [16]:
# join of previous dataframe with dataframe with states
data = pd.merge(data,state,on ='Município', how ='left')

In [17]:
# removal of columns with the CBO code and the municipality
data.drop(['CBO Ocupação 2002','Município', "Profession"], axis=1, inplace=True)

In [18]:
data.head()

Unnamed: 0,Faixa Etária,Vl Remun Média Nom,Sexo Trabalhador,State
0,6,268750,1,Mato Grosso do Sul
1,3,143832,1,Mato Grosso do Sul
2,6,246326,1,Mato Grosso do Sul
3,7,423710,1,Mato Grosso do Sul
4,5,169021,1,Mato Grosso do Sul


In [19]:
data['Vl Remun Média Nom']=data['Vl Remun Média Nom'].str.replace(',','.')

In [20]:
# transformation of variables with salary values from "object" to "float64"
data["Vl Remun Média Nom"] = pd.to_numeric(data["Vl Remun Média Nom"])

In [21]:
data.dtypes

Faixa Etária           object
Vl Remun Média Nom    float64
Sexo Trabalhador        int64
State                  object
dtype: object

In [22]:
# removing rows where salary equal 0
data = data[data['Vl Remun Média Nom'] > 0]

In [23]:
data.shape

(52792, 4)

In [24]:
# exchange the encoding of the column "Age Group" by the actual value of the variable "Age Group"
data['Faixa Etária'] = data['Faixa Etária'].map({'01': '10 to 14 years',
                                               '02': '15 to 17 years',
                                               '03': '18 to 24 years',
                                               '04': '25 to 29 years',
                                               '05': '30 to 39 years',
                                               '06': '40 to 49 years',
                                               '07': '50 to 64 years',
                                               '08': '65+ years'
                                              }
                                             )

In [25]:
# exchange the encoding of the "Worker Sex" column for the actual value of the "Worker Sex" variable
data['Sexo Trabalhador'] = data['Sexo Trabalhador'].map({1: 'Male',
                                                         2: 'Female',
                                                        -1: 'uninformed'
                                                        }
                                                       )

In [26]:
# renaming columnS names
data.rename(columns={'Faixa Etária': "age",
                     'Vl Remun Média Nom' : 'salary',
                     'Sexo Trabalhador': 'gender',
                     'State': 'state'
                     }, 
            inplace=True)

In [27]:
# removal of lines with probably incorrect padding
data.drop(data.loc[data['age']=='15 to 17 years'].index, inplace=True)
data.drop(data.loc[data['age']=='10 to 14 years'].index, inplace=True)

In [28]:
# resetting the index
data.reset_index(drop=True, inplace=True)

In [29]:
data.head()

Unnamed: 0,age,salary,gender,state
0,40 to 49 years,2687.5,Male,Mato Grosso do Sul
1,18 to 24 years,1438.32,Male,Mato Grosso do Sul
2,40 to 49 years,2463.26,Male,Mato Grosso do Sul
3,50 to 64 years,4237.1,Male,Mato Grosso do Sul
4,30 to 39 years,1690.21,Male,Mato Grosso do Sul


In [30]:
from sqlalchemy import create_engine

In [31]:
# creating the connection with sqlite to create the database
engine = create_engine('sqlite:///database.db', echo=True)
sqlite_connection = engine.connect()

2022-01-30 18:00:27,226 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2022-01-30 18:00:27,228 INFO sqlalchemy.engine.base.Engine ()
2022-01-30 18:00:27,230 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2022-01-30 18:00:27,232 INFO sqlalchemy.engine.base.Engine ()


In [32]:
# creating the variable that contains the database table name (salarys)
sqlite_table = "salarys"

In [33]:
# inserting data from dataframe to database
data.to_sql(sqlite_table, sqlite_connection, if_exists='fail', index=True, index_label="id")

2022-01-30 18:00:33,084 INFO sqlalchemy.engine.base.Engine PRAGMA main.table_info("salarys")
2022-01-30 18:00:33,085 INFO sqlalchemy.engine.base.Engine ()
2022-01-30 18:00:33,087 INFO sqlalchemy.engine.base.Engine PRAGMA temp.table_info("salarys")
2022-01-30 18:00:33,089 INFO sqlalchemy.engine.base.Engine ()
2022-01-30 18:00:33,092 INFO sqlalchemy.engine.base.Engine 
CREATE TABLE salarys (
	id BIGINT, 
	age TEXT, 
	salary FLOAT, 
	gender TEXT, 
	state TEXT
)


2022-01-30 18:00:33,095 INFO sqlalchemy.engine.base.Engine ()
2022-01-30 18:00:33,112 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-30 18:00:33,114 INFO sqlalchemy.engine.base.Engine CREATE INDEX ix_salarys_id ON salarys (id)
2022-01-30 18:00:33,115 INFO sqlalchemy.engine.base.Engine ()
2022-01-30 18:00:33,125 INFO sqlalchemy.engine.base.Engine COMMIT
2022-01-30 18:00:33,158 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2022-01-30 18:00:33,601 INFO sqlalchemy.engine.base.Engine INSERT INTO salarys (id, age, salary, gend

In [34]:
# running a query to check the first 5 elements of the table "salarys" from the database
engine.execute("SELECT * FROM salarys limit 5").fetchall()

2022-01-30 18:00:38,029 INFO sqlalchemy.engine.base.Engine SELECT * FROM salarys limit 5
2022-01-30 18:00:38,033 INFO sqlalchemy.engine.base.Engine ()


[(0, '40 to 49 years', 2687.5, 'Male', 'Mato Grosso do Sul'),
 (1, '18 to 24 years', 1438.32, 'Male', 'Mato Grosso do Sul'),
 (2, '40 to 49 years', 2463.26, 'Male', 'Mato Grosso do Sul'),
 (3, '50 to 64 years', 4237.1, 'Male', 'Mato Grosso do Sul'),
 (4, '30 to 39 years', 1690.21, 'Male', 'Mato Grosso do Sul')]

In [35]:
# running a query to check the table columns
records = engine.execute("PRAGMA table_info(salarys)").fetchall()  # fetches the 6 rows of data
print(records)
for row in records:
    print("Columns: ", row[1])

2022-01-30 18:00:40,864 INFO sqlalchemy.engine.base.Engine PRAGMA table_info(salarys)
2022-01-30 18:00:40,868 INFO sqlalchemy.engine.base.Engine ()
[(0, 'id', 'BIGINT', 0, None, 0), (1, 'age', 'TEXT', 0, None, 0), (2, 'salary', 'FLOAT', 0, None, 0), (3, 'gender', 'TEXT', 0, None, 0), (4, 'state', 'TEXT', 0, None, 0)]
Columns:  id
Columns:  age
Columns:  salary
Columns:  gender
Columns:  state


In [36]:
# running a query to check the total table entries
engine.execute("SELECT count(id) FROM salarys").fetchall()

2022-01-30 18:00:42,953 INFO sqlalchemy.engine.base.Engine SELECT count(id) FROM salarys
2022-01-30 18:00:42,956 INFO sqlalchemy.engine.base.Engine ()


[(52790,)]

In [41]:
shutil.move("database.db", "../database.db")

'../database.db'