In [1]:
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import Window
import re
from pyspark.sql.types import *
import unicodedata
from google.cloud import storage
from functools import reduce
from pyspark.sql import DataFrame
import datetime
# spark.conf.set("spark.sql.crossJoin.enabled", True)

<hr />

### google mobility preprocessing

<hr />

In [2]:
# read data from gcs
gmr = spark.read.csv('gs://ai-covid19-datalake/raw/google-mobility/google-mobility_report-28-11-2021.csv', inferSchema=True, header=True)

In [3]:
# head? 

gmr.limit(5).toPandas()

Unnamed: 0,country_region_code,country_region,sub_region_1,sub_region_2,metro_area,iso_3166_2_code,census_fips_code,place_id,date,retail_and_recreation_percent_change_from_baseline,grocery_and_pharmacy_percent_change_from_baseline,parks_percent_change_from_baseline,transit_stations_percent_change_from_baseline,workplaces_percent_change_from_baseline,residential_percent_change_from_baseline
0,AE,United Arab Emirates,,,,,,ChIJvRKrsd9IXj4RpwoIwFYv0zM,2020-02-15,0,4,5,0,2,1
1,AE,United Arab Emirates,,,,,,ChIJvRKrsd9IXj4RpwoIwFYv0zM,2020-02-16,1,4,4,1,2,1
2,AE,United Arab Emirates,,,,,,ChIJvRKrsd9IXj4RpwoIwFYv0zM,2020-02-17,-1,1,5,1,2,1
3,AE,United Arab Emirates,,,,,,ChIJvRKrsd9IXj4RpwoIwFYv0zM,2020-02-18,-2,1,5,0,2,1
4,AE,United Arab Emirates,,,,,,ChIJvRKrsd9IXj4RpwoIwFYv0zM,2020-02-19,-2,0,4,-1,2,1


In [4]:
# schema
gmr.printSchema()

root
 |-- country_region_code: string (nullable = true)
 |-- country_region: string (nullable = true)
 |-- sub_region_1: string (nullable = true)
 |-- sub_region_2: string (nullable = true)
 |-- metro_area: string (nullable = true)
 |-- iso_3166_2_code: string (nullable = true)
 |-- census_fips_code: integer (nullable = true)
 |-- place_id: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- retail_and_recreation_percent_change_from_baseline: integer (nullable = true)
 |-- grocery_and_pharmacy_percent_change_from_baseline: integer (nullable = true)
 |-- parks_percent_change_from_baseline: integer (nullable = true)
 |-- transit_stations_percent_change_from_baseline: integer (nullable = true)
 |-- workplaces_percent_change_from_baseline: integer (nullable = true)
 |-- residential_percent_change_from_baseline: integer (nullable = true)



In [5]:
# change timestamp var type to date type (we do not need hours or minutes)
gmr = gmr.withColumn('date', F.col('date').cast('date'))

In [6]:
# filtering observations after 24-05-2021
gmr = gmr.filter(F.col('date') > '2021-05-24')

In [7]:
gmr.printSchema()

root
 |-- country_region_code: string (nullable = true)
 |-- country_region: string (nullable = true)
 |-- sub_region_1: string (nullable = true)
 |-- sub_region_2: string (nullable = true)
 |-- metro_area: string (nullable = true)
 |-- iso_3166_2_code: string (nullable = true)
 |-- census_fips_code: integer (nullable = true)
 |-- place_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- retail_and_recreation_percent_change_from_baseline: integer (nullable = true)
 |-- grocery_and_pharmacy_percent_change_from_baseline: integer (nullable = true)
 |-- parks_percent_change_from_baseline: integer (nullable = true)
 |-- transit_stations_percent_change_from_baseline: integer (nullable = true)
 |-- workplaces_percent_change_from_baseline: integer (nullable = true)
 |-- residential_percent_change_from_baseline: integer (nullable = true)



#### Next steps: 
* select brazillian observations, keeping the header
* make a rapid descriptive analysis
* take out special characters
* upper case the text values
* take the sufix from states names
* write the preprocessed data in a proper location
* make another descriptive analysis

<hr />

In [8]:
# select brazillian observations, keeping the header
print(gmr.count())
gmr = gmr.filter(F.col('country_region_code')=='BR')
print(gmr.count())
# Version until 24-05-2021
# 5561418
# 913701

# Updated version without Filter
# 7796287
# 1301098

2287165
396685


<hr />

#### make a rapid descriptive analysis 
* what are the states? 
* how much cities? 
* how much cities from each states? 
* how much observavions from each state? 
* max and min period for observations of each state
* null values per state on variables: census_fips_code, residential_percent_change_from_baseline, workplaces_percent_change_from_baseline, transit_stations_percent_change_from_baseline, parks_percent_change_from_baseline, and retail_and_recreation_percent_change_from_baseline

In [9]:
# what are the states?
# how much cities?
print("how much cities:", gmr.select(['sub_region_2']).distinct().count())

# # how much cities from each states?
states_and_cities = gmr.select(['sub_region_1', 'sub_region_2']).distinct().groupby('sub_region_1').count().sort('sub_region_1').withColumnRenamed('count', 'n_cities')
# states_and_cities.show(30)

# # how much observavions from each state?
n_observations = gmr.select(['sub_region_1', 'sub_region_2']).groupby('sub_region_1').count().sort('sub_region_1').withColumnRenamed('count', 'n_observations')
# n_observations.show(30)

# # max and min period for observations of each state
w = Window.partitionBy('sub_region_1')
min_date = gmr.select(['sub_region_1', 'date']).withColumn('min_date', F.min('date').over(w)).where(F.col('date') == F.col('min_date')).distinct().drop('date')
max_date = gmr.select(['sub_region_1', 'date']).withColumn('max_date', F.max('date').over(w)).where(F.col('date') == F.col('max_date')).distinct().drop('date')
# min_date.show(30)
# max_date.show(30)

# null values
columns = ['census_fips_code', 'residential_percent_change_from_baseline', 'workplaces_percent_change_from_baseline', 'transit_stations_percent_change_from_baseline', 'parks_percent_change_from_baseline', 'retail_and_recreation_percent_change_from_baseline']

nulls_census_fips_code = gmr.filter(F.col('census_fips_code').isNull()).select(['sub_region_1', 'census_fips_code']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'nulls_census_fips_code')
nulls_residential_percent_change_from_baseline = gmr.filter(F.col('residential_percent_change_from_baseline').isNull()).select(['sub_region_1', 'residential_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'null_residential_percent_change_from_baseline')
nulls_workplaces_percent_change_from_baseline = gmr.filter(F.col('workplaces_percent_change_from_baseline').isNull()).select(['sub_region_1', 'workplaces_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'workplaces_percent_change_from_baseline')
nulls_transit_stations_percent_change_from_baseline = gmr.filter(F.col('transit_stations_percent_change_from_baseline').isNull()).select(['sub_region_1', 'transit_stations_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'transit_stations_percent_change_from_baseline')
nulls_parks_percent_change_from_baseline = gmr.filter(F.col('parks_percent_change_from_baseline').isNull()).select(['sub_region_1', 'parks_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'parks_percent_change_from_baseline')
nulls_retail_and_recreation_percent_change_from_baseline = gmr.filter(F.col('retail_and_recreation_percent_change_from_baseline').isNull()).select(['sub_region_1', 'retail_and_recreation_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'retail_and_recreation_percent_change_from_baseline')

nulls = nulls_census_fips_code.join(nulls_residential_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_workplaces_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_transit_stations_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_parks_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_retail_and_recreation_percent_change_from_baseline, 'sub_region_1', how='left')
                                

# joining all
desc_gmr = states_and_cities.join(n_observations, 'sub_region_1', 'left')\
                            .join(min_date, 'sub_region_1', 'left')\
                            .join(max_date, 'sub_region_1', 'left')\
                            .join(nulls, 'sub_region_1', 'left')

# writing and showing

desc_gmr.write.csv('gs://ai-covid19-datalake/raw/google-mobility/desc_pp_google_mobility_report_28_11_2021_pre_formatting_v1.csv', header=True, mode='overwrite')

desc_gmr.toPandas()

how much cities: 2252


Unnamed: 0,sub_region_1,n_cities,n_observations,min_date,max_date,nulls_census_fips_code,null_residential_percent_change_from_baseline,workplaces_percent_change_from_baseline,transit_stations_percent_change_from_baseline,parks_percent_change_from_baseline,retail_and_recreation_percent_change_from_baseline
0,State of Minas Gerais,311,52484.0,2021-05-25,2021-11-25,52484.0,31045.0,890.0,40103.0,40724.0,37392.0
1,State of Espírito Santo,60,9947.0,2021-05-25,2021-11-25,9947.0,5735.0,135.0,7573.0,6326.0,6120.0
2,State of Mato Grosso,56,9512.0,2021-05-25,2021-11-25,9512.0,5302.0,18.0,6977.0,7560.0,6502.0
3,State of Goiás,92,16181.0,2021-05-25,2021-11-25,16181.0,8879.0,547.0,11679.0,12173.0,10119.0
4,State of Rio de Janeiro,86,15307.0,2021-05-25,2021-11-25,15307.0,4794.0,172.0,7530.0,6203.0,6162.0
5,State of Roraima,3,529.0,2021-05-25,2021-11-25,529.0,159.0,,159.0,159.0,159.0
6,State of Ceará,106,17889.0,2021-05-25,2021-11-25,17889.0,11656.0,177.0,16072.0,11714.0,13121.0
7,State of Paraná,183,30702.0,2021-05-25,2021-11-25,30702.0,17107.0,878.0,22042.0,23194.0,20456.0
8,State of Paraíba,50,8155.0,2021-05-25,2021-11-25,8155.0,5834.0,441.0,7036.0,6200.0,6139.0
9,State of Pará,71,11604.0,2021-05-25,2021-11-25,11604.0,6770.0,102.0,8957.0,7920.0,8117.0


<hr />

In [10]:
# take out special characters
def remove_accents(col):
    col = str(col)
    col = col.replace('á', 'a')
    col = col.replace('à', 'a')
    col = col.replace('ã', 'a')
    col = col.replace('â', 'a')
    col = col.replace('é', 'e')
    col = col.replace('ê', 'e')
    col = col.replace('í', 'i')
    col = col.replace('ó', 'o')
    col = col.replace('õ', 'o')
    col = col.replace('ô', 'o')
    col = col.replace('ú', 'u')
    col = col.replace('ü', 'u')
    col = col.replace('ç', 'c')
    return col
udf_remove_accents = F.udf(remove_accents, StringType())

def normalize_ascii(col):
    return re.sub('[^A-Za-z0-9]+', ' ', str(col))
udf_normalize_ascii = F.udf(normalize_ascii, StringType())

def take_state_prefix(col):
    return col.replace('STATE OF ', '')
udf_take_state_prefix = F.udf(take_state_prefix, StringType())

<hr />

In [11]:
# dropping NA in state and municipality
gmr = gmr.dropna(subset=['sub_region_1', 'sub_region_2'])

# taking accents and non-ascii chars
gmr = gmr.withColumn('sub_region_1', udf_remove_accents(F.col('sub_region_1')))
gmr = gmr.withColumn('sub_region_1', udf_normalize_ascii(F.col('sub_region_1')))

gmr = gmr.withColumn('sub_region_2', udf_remove_accents(F.col('sub_region_2')))
gmr = gmr.withColumn('sub_region_2', udf_normalize_ascii(F.col('sub_region_2')))


# turning columns into upper case
gmr = gmr.withColumn('sub_region_1', F.upper(F.col('sub_region_1')))
gmr = gmr.withColumn('sub_region_2', F.upper(F.col('sub_region_2')))

# taking prefix
gmr = gmr.withColumn('sub_region_1', udf_take_state_prefix(F.col('sub_region_1')))

In [12]:
gmr.select(['sub_region_1', 'sub_region_2']).distinct().show()

+-----------------+------------------+
|     sub_region_1|      sub_region_2|
+-----------------+------------------+
|       PERNAMBUCO|        CAMARAGIBE|
|RIO GRANDE DO SUL|             GIRUA|
|   SANTA CATARINA|DIONISIO CERQUEIRA|
|          ALAGOAS|  SAO JOSE DA LAJE|
|            BAHIA|         IBIRATAIA|
|            CEARA|      QUIXERAMOBIM|
|   ESPIRITO SANTO|        JOAO NEIVA|
|      MATO GROSSO|            MATUPA|
|     MINAS GERAIS|       MATEUS LEME|
|          PARAIBA|         BOQUEIRAO|
|        SAO PAULO|             AVARE|
|        SAO PAULO|            ITAPUI|
|        SAO PAULO|           ITUPEVA|
|        SAO PAULO|           DRACENA|
|            BAHIA|            IBICUI|
|            BAHIA|             LAPAO|
|            CEARA|            AURORA|
|            CEARA|         CARIRIACU|
|            CEARA|       NOVA RUSSAS|
|     MINAS GERAIS|      ITAMARANDIBA|
+-----------------+------------------+
only showing top 20 rows



In [13]:
# what are the states?
# how much cities?
print("how much cities:", gmr.select(['sub_region_2']).distinct().count())

# # how much cities from each states?
states_and_cities = gmr.select(['sub_region_1', 'sub_region_2']).distinct().groupby('sub_region_1').count().sort('sub_region_1').withColumnRenamed('count', 'n_cities')
# states_and_cities.show(30)

# # how much observavions from each state?
n_observations = gmr.select(['sub_region_1', 'sub_region_2']).groupby('sub_region_1').count().sort('sub_region_1').withColumnRenamed('count', 'n_observations')
# n_observations.show(30)

# # max and min period for observations of each state
w = Window.partitionBy('sub_region_1')
min_date = gmr.select(['sub_region_1', 'date']).withColumn('min_date', F.min('date').over(w)).where(F.col('date') == F.col('min_date')).distinct().drop('date')
max_date = gmr.select(['sub_region_1', 'date']).withColumn('max_date', F.max('date').over(w)).where(F.col('date') == F.col('max_date')).distinct().drop('date')
# min_date.show(30)
# max_date.show(30)

# null values
columns = ['census_fips_code', 'residential_percent_change_from_baseline', 'workplaces_percent_change_from_baseline', 'transit_stations_percent_change_from_baseline', 'parks_percent_change_from_baseline', 'retail_and_recreation_percent_change_from_baseline']

nulls_census_fips_code = gmr.filter(F.col('census_fips_code').isNull()).select(['sub_region_1', 'census_fips_code']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'nulls_census_fips_code')
nulls_residential_percent_change_from_baseline = gmr.filter(F.col('residential_percent_change_from_baseline').isNull()).select(['sub_region_1', 'residential_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'null_residential_percent_change_from_baseline')
nulls_workplaces_percent_change_from_baseline = gmr.filter(F.col('workplaces_percent_change_from_baseline').isNull()).select(['sub_region_1', 'workplaces_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'workplaces_percent_change_from_baseline')
nulls_transit_stations_percent_change_from_baseline = gmr.filter(F.col('transit_stations_percent_change_from_baseline').isNull()).select(['sub_region_1', 'transit_stations_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'transit_stations_percent_change_from_baseline')
nulls_parks_percent_change_from_baseline = gmr.filter(F.col('parks_percent_change_from_baseline').isNull()).select(['sub_region_1', 'parks_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'parks_percent_change_from_baseline')
nulls_retail_and_recreation_percent_change_from_baseline = gmr.filter(F.col('retail_and_recreation_percent_change_from_baseline').isNull()).select(['sub_region_1', 'retail_and_recreation_percent_change_from_baseline']).groupBy(['sub_region_1']).count().withColumnRenamed('count', 'retail_and_recreation_percent_change_from_baseline')

nulls = nulls_census_fips_code.join(nulls_residential_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_workplaces_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_transit_stations_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_parks_percent_change_from_baseline, 'sub_region_1', how='left')\
                                .join(nulls_retail_and_recreation_percent_change_from_baseline, 'sub_region_1', how='left')
                                

# joining all
desc_gmr = states_and_cities.join(n_observations, 'sub_region_1', 'left')\
                            .join(min_date, 'sub_region_1', 'left')\
                            .join(max_date, 'sub_region_1', 'left')\
                            .join(nulls, 'sub_region_1', 'left')

# writing and showing

desc_gmr.write.csv('gs://ai-covid19-datalake/standard/google-mobility/desc_pp_google_mobility_report_28_11_2021_pos_formatting_v1.csv', header=True, mode='overwrite')

desc_gmr.toPandas()

how much cities: 2248


Unnamed: 0,sub_region_1,n_cities,n_observations,min_date,max_date,nulls_census_fips_code,null_residential_percent_change_from_baseline,workplaces_percent_change_from_baseline,transit_stations_percent_change_from_baseline,parks_percent_change_from_baseline,retail_and_recreation_percent_change_from_baseline
0,RIO DE JANEIRO,85,15122,2021-05-25,2021-11-25,15122,4794,172.0,7530,6203,6162
1,RIO GRANDE DO SUL,174,29501,2021-05-25,2021-11-25,29501,12956,336.0,21182,16665,15132
2,RONDONIA,23,3903,2021-05-25,2021-11-25,3903,2229,8.0,3429,3121,2605
3,MATO GROSSO,55,9327,2021-05-25,2021-11-25,9327,5302,18.0,6977,7560,6502
4,ESPIRITO SANTO,59,9762,2021-05-25,2021-11-25,9762,5735,135.0,7573,6326,6120
5,AMAZONAS,15,2463,2021-05-25,2021-11-25,2463,1693,19.0,2257,1970,1884
6,SERGIPE,28,4740,2021-05-25,2021-11-25,4740,3232,151.0,4145,3545,3919
7,RIO GRANDE DO NORTE,37,6467,2021-05-25,2021-11-25,6467,4136,188.0,4833,3943,4622
8,SANTA CATARINA,147,24452,2021-05-25,2021-11-25,24452,12312,697.0,17939,16212,14249
9,PARAIBA,49,7970,2021-05-25,2021-11-25,7970,5834,441.0,7036,6200,6139


In [14]:
gmr.write.csv('gs://ai-covid19-datalake/standard/google-mobility/pp_google-mobility_report-28-11-2021.csv', header=True, mode='overwrite')

<hr />

### municipality ibge data

<hr />

In [15]:
# read data from gcs
ibge = spark.read.csv('gs://ai-covid19-datalake/raw/ibge-data/ibge-municipality-to-code-28-11-2021.csv', inferSchema=True, header=True)

In [16]:
# show some lines
ibge.limit(5).toPandas()

Unnamed: 0,UF,Nome_UF,Mesorregião Geográfica,Nome_Mesorregião,Microrregião Geográfica,Nome_Microrregião,Município,mun_code,mun_name
0,11,Rondônia,2,Leste Rondoniense,6,Cacoal,15,1100015,Alta Floresta D'Oeste
1,11,Rondônia,2,Leste Rondoniense,6,Cacoal,379,1100379,Alto Alegre dos Parecis
2,11,Rondônia,2,Leste Rondoniense,3,Ariquemes,403,1100403,Alto Paraíso
3,11,Rondônia,2,Leste Rondoniense,5,Alvorada D'Oeste,346,1100346,Alvorada D'Oeste
4,11,Rondônia,2,Leste Rondoniense,3,Ariquemes,23,1100023,Ariquemes


In [17]:
# schema
ibge.printSchema()

root
 |-- UF: integer (nullable = true)
 |-- Nome_UF: string (nullable = true)
 |-- Mesorregião Geográfica: integer (nullable = true)
 |-- Nome_Mesorregião: string (nullable = true)
 |-- Microrregião Geográfica: integer (nullable = true)
 |-- Nome_Microrregião: string (nullable = true)
 |-- Município: integer (nullable = true)
 |-- mun_code: integer (nullable = true)
 |-- mun_name: string (nullable = true)



In [18]:
ibge = ibge.select(['UF', 'mun_code', 'Nome_UF', 'mun_name']).withColumnRenamed('Nome_UF', 'sub_region_1').withColumnRenamed('mun_name', 'sub_region_2')

In [19]:
# dropping NA in state and municipality
ibge = ibge.dropna(subset=['sub_region_1', 'sub_region_2'])

# taking accents and non-ascii chars
ibge = ibge.withColumn('sub_region_1', udf_remove_accents(F.col('sub_region_1')))
ibge = ibge.withColumn('sub_region_1', udf_normalize_ascii(F.col('sub_region_1')))

ibge = ibge.withColumn('sub_region_2', udf_remove_accents(F.col('sub_region_2')))
ibge = ibge.withColumn('sub_region_2', udf_normalize_ascii(F.col('sub_region_2')))


# turning columns into upper case
ibge = ibge.withColumn('sub_region_1', F.upper(F.col('sub_region_1')))
ibge = ibge.withColumn('sub_region_2', F.upper(F.col('sub_region_2')))

In [20]:
# show some lines
ibge.limit(5).toPandas()

Unnamed: 0,UF,mun_code,sub_region_1,sub_region_2
0,11,1100015,RONDONIA,ALTA FLORESTA D OESTE
1,11,1100379,RONDONIA,ALTO ALEGRE DOS PARECIS
2,11,1100403,RONDONIA,ALTO PARAISO
3,11,1100346,RONDONIA,ALVORADA D OESTE
4,11,1100023,RONDONIA,ARIQUEMES


In [21]:
ibge.write.csv('gs://ai-covid19-datalake/standard/ibge-data/pp_ibge-municipality-to-code-28-11-2021.csv', header=True, mode='overwrite')

<hr />

### meteorological data

<hr />

In [22]:
# getting the list of inmet files

client = storage.Client()
BUCKET_NAME = 'ai-covid19-datalake'
bucket = client.get_bucket(BUCKET_NAME)

blobs = bucket.list_blobs()
files = []
for blob in blobs:
    uri = 'raw/inmet-data/inmet-meteorological-data-25052021-a-28112021/'
    if blob.name[0:-38] == uri:
        files.append('gs://ai-covid19-datalake/'+blob.name)

In [24]:
inmet_schema = StructType([
    StructField('measurement_date', DateType()),
    StructField('total_daily_precipitation_mm', FloatType()),
    StructField('daily_atmospheric_pression_mb', FloatType()),
    StructField('daily_avg_dew_point_temp_c', FloatType()),
    StructField('max_daily_temp_maxima_diaria_c', FloatType()),
    StructField('daily_avg_temp_c', FloatType()),
    StructField('daily_min_temp_c', FloatType()),
    StructField('daily_avg_relative_air_humidity_percent', FloatType()),
    StructField('daily_min_relative_air_humidity_percent', FloatType()),
    StructField('max_gust_wind_ms', FloatType()),
    StructField('avg_wind_velocity_ms', FloatType()),
    StructField('empty', StringType())
    
])
    
inmet_header = ['measurement_date', 'total_daily_precipitation_mm', 'daily_atmospheric_pression_mb', 'daily_avg_dew_point_temp_c',
          'max_daily_temp_maxima_diaria_c', 'daily_avg_temp_c', 'daily_min_temp_c', 'daily_avg_relative_air_humidity_percent',
          'daily_min_relative_air_humidity_percent', 'max_gust_wind_ms', 'avg_wind_velocity_ms']

def replace_comma_and_convert(line):
    positions = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    date_positions = [0]
    
    for dp in date_positions:
        # converting string do proper date type, e.g.: '2019-01-02'
        if (line[dp] != 'null') and (line[dp] != ''):
            date_time_obj = datetime.datetime.strptime(line[dp], '%Y-%m-%d')
            line[dp] = date_time_obj.date()
        else:
            line[dp] = None

    
    for p in positions:
        if (line[p] != 'null') and (line[p] != ''):
            line[p] = float(line[p].replace(',', '.'))
        else:
            line[p] = None
    return line

In [25]:
dfs = []
for f in files:
    # reading file from list
    inmet_data = sc.textFile(f).map(lambda x: x.split(';'))
    
    # taking out blank lines (serious matter)
    inmet_data = inmet_data.filter(lambda x: x != [''])
    
    # getting the station ids 
    station_name = inmet_data.filter(lambda x: "Nome:" in x[0]).collect()[0][0].split(' ')[1]
    station_code = inmet_data.filter(lambda x: "Estacao:" in x[0]).collect()[0][0].split(' ')[2]
    
    # dropping the first 10 unnecessary top lines on file
    inmet_data = inmet_data.zipWithIndex().filter(lambda tup: tup[1] > 10).map(lambda x:x[0])
    
    # converting string to float values
    inmet_data = inmet_data.map(replace_comma_and_convert)
    
    # converting rdd to a dataframe with schema 
    df = spark.createDataFrame(inmet_data, schema=inmet_schema)
    
    # creating station and municipality variables
    df = df.withColumn('station_id', F.lit(station_code)).withColumn('municipality_name', F.lit(station_name))
    
    # appending new dataframe to be joined
    dfs.append(df)
    
inmet_df = reduce(DataFrame.unionAll, dfs)

In [26]:
# built dataset
inmet_df.limit(5).toPandas()

Unnamed: 0,measurement_date,total_daily_precipitation_mm,daily_atmospheric_pression_mb,daily_avg_dew_point_temp_c,max_daily_temp_maxima_diaria_c,daily_avg_temp_c,daily_min_temp_c,daily_avg_relative_air_humidity_percent,daily_min_relative_air_humidity_percent,max_gust_wind_ms,avg_wind_velocity_ms,empty,station_id,municipality_name
0,2021-05-26,0.0,889.537476,14.070833,28.4,21.5875,15.7,64.583336,38.0,8.3,1.770833,,A001,BRASILIA
1,2021-05-27,0.0,888.337524,14.470833,28.9,22.125,15.8,64.5,39.0,6.0,1.5375,,A001,BRASILIA
2,2021-05-28,0.0,887.237488,14.083333,28.0,21.512501,15.5,65.25,40.0,8.1,1.65,,A001,BRASILIA
3,2021-05-29,0.0,888.0625,13.466667,27.9,20.891666,15.3,65.208336,37.0,5.7,1.141667,,A001,BRASILIA
4,2021-05-30,0.0,888.979187,13.683333,27.299999,20.924999,14.9,65.916664,39.0,8.0,1.6875,,A001,BRASILIA


In [27]:
# reading stations link file
stations_link = spark.read.csv('gs://ai-covid19-datalake/raw/inmet-data/inmet-estacoes-19-02-2018.csv', header=True).select(['station_id', 'sub_region_1'])

In [28]:
# list of stations and its states
stations_link.limit(5).toPandas()

Unnamed: 0,station_id,sub_region_1
0,A001,DISTRITO FEDERAL
1,A002,GOIAS
2,A003,GOIAS
3,A004,GOIAS
4,A008,DISTRITO FEDERAL


In [29]:
# bringing the state and municipality information
inmet_df = inmet_df.join(stations_link, 'station_id', 'left')

In [30]:
# now with the new state column added
inmet_df.limit(5).toPandas()

Unnamed: 0,station_id,measurement_date,total_daily_precipitation_mm,daily_atmospheric_pression_mb,daily_avg_dew_point_temp_c,max_daily_temp_maxima_diaria_c,daily_avg_temp_c,daily_min_temp_c,daily_avg_relative_air_humidity_percent,daily_min_relative_air_humidity_percent,max_gust_wind_ms,avg_wind_velocity_ms,empty,municipality_name,sub_region_1
0,A001,2021-05-26,0.0,889.537476,14.070833,28.4,21.5875,15.7,64.583336,38.0,8.3,1.770833,,BRASILIA,DISTRITO FEDERAL
1,A001,2021-05-27,0.0,888.337524,14.470833,28.9,22.125,15.8,64.5,39.0,6.0,1.5375,,BRASILIA,DISTRITO FEDERAL
2,A001,2021-05-28,0.0,887.237488,14.083333,28.0,21.512501,15.5,65.25,40.0,8.1,1.65,,BRASILIA,DISTRITO FEDERAL
3,A001,2021-05-29,0.0,888.0625,13.466667,27.9,20.891666,15.3,65.208336,37.0,5.7,1.141667,,BRASILIA,DISTRITO FEDERAL
4,A001,2021-05-30,0.0,888.979187,13.683333,27.299999,20.924999,14.9,65.916664,39.0,8.0,1.6875,,BRASILIA,DISTRITO FEDERAL


In [31]:
inmet_df.select('measurement_date').printSchema()

root
 |-- measurement_date: date (nullable = true)



In [32]:
# df.groupBy('date').count().orderBy('date').show()
maxdate, mindate = df.select(F.max("measurement_date"), F.min("measurement_date")).first()

print(mindate, maxdate)

2021-05-26 2021-11-28


In [33]:
inmet_df.write.csv('gs://ai-covid19-datalake/standard/inmet-data/pp_inmet_meteorological_data-25052021-a-28112021', header=True, mode='overwrite')