In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
id_path = '/Users/IsabelGontijo/git/covid-19/refined/data/02_id_creation/'
std_path = '/Users/IsabelGontijo/git/covid-19/refined/data/03_standardization/'
dic_file = '/Users/IsabelGontijo/git/covid-19/refined/metadata/03_standardization/dic.csv'

In [3]:
#Load dictionary
dic = spark.read.csv(dic_file, multiLine=True)

In [4]:
#Load id_cidacs base
df = spark.read.csv(id_path, header=True, multiLine=True)

In [5]:
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower())

In [6]:
from datetime import date
def standardizeDate(s):
    if s:
        # condic datas come with hours after the actual date
        # takes the date
        s = s.split()[0]
        
        separators = ['-', '/']
        for sep in separators:
            s = s.replace(sep, '')
        try:
            value = int(s)
        except ValueError:
            return None
        if len(s) == 7:
            s = '0' + s
        if len(s) != 8:
            return None
        # check true date
        try:
            year = int(s[:4])
            month = int(s[4:6])
            day = int(s[6:])
            data = date(year, month, day)
        except ValueError:
            try:
                year = int(s[4:])
                month = int(s[2:4])
                day = int(s[0:2])
                data = date(year, month, day)
            except ValueError:
                return None
        # spark bug (does not allow years near 0001)
        if year < 1000:
            return None
        # check ordinal
        if data.toordinal() < 1:
            return None
        # works
        return data
    else:
        return None
# Register UDF
udf_standardizeDate = F.udf(standardizeDate, DateType())

In [7]:
def standardizeInteger(s):
    if s:
        try:
            int(s)
        except:
            return None
    else:
        return None
    
#Register UDF
udf_standardizeInteger = F.udf(standardizeInteger, IntegerType())

In [8]:
def standardizeDouble(s):
    if s:
        try:
            s = float(s)
            return s
        except:
            return None
    else:
        return None
    
#Register UDF
udf_standardizeDouble = F.udf(standardizeDouble, DoubleType())

In [9]:
#Standardize bytes. The function takes an field containing the 
#transformation rules, which are present in the base dictionary

def standardizeByte(s, t):
    if s:
        import json
        t = t.replace("'", '"')
        dicT = json.loads(t)
        if 'integer_key' in dicT and dicT['integer_key'] == '1':
            try:
                s = int(s)
            except:
                return 99
        if str(s) in dicT.keys():
            return int(dicT[str(s)])
        else:
            return 99
    else:
        return 0
udf_standardizeByte = F.udf(standardizeByte, ByteType())

In [10]:
def standardizeLong(s):
    if s:
        try:
            s = int(s)
            return s
        except:
            return None
    else:
        return None
    
#Register UDF
udf_standardizeLong = F.udf(standardizeLong, LongType())

In [11]:
#For each dictionary entry, associate column name with its corresponding standardize function.
def createdStandardizedFunctions(dic):
    standardizedFunctions = {}
    #Get base dictionary to list
    listDic = dic.collect()
    for item in listDic:
        cName = item._c0
        cType = item._c1
        cTransf = item._c2
        
        if cType == "String":
            standardizedFunctions[cName] = None
        elif cTransf:
            standardizedFunctions[cName] = "df.withColumn('" + cName + "', udf_standardize" + cType + "(df['" + cName + "'], F.lit(\"" + cTransf.replace("\n","") + "\")))"
        else:
            standardizedFunctions[cName] = "df.withColumn('" + cName + "', udf_standardize" + cType + "(df['" + cName + "']))"

    return standardizedFunctions

In [12]:
standardizedDic = createdStandardizedFunctions(dic)

In [13]:
for column in df.columns:
        #String columns should have null standardize functions
        if standardizedDic[column]:
            #Apply standardize function to database
            df = eval(standardizedDic[column])
    #Write standardized files
df.write.parquet(std_path, mode='overwrite')

In [14]:
if sc: sc.stop()