In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, regexp_replace, when
from delta import *

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/warehouse'

builder = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Python Spark DataFrames and SQL") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .enableHiveSupport() \

spark = builder.getOrCreate() #spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
#Indica o caminho onde o csv esta armanezado
hdfs_path = "hdfs://hdfs-nn:9000/projeto/bronze/Stats_Country.csv"

In [3]:
# Le o Csv, mas ignora o cabeçalho
shoot = spark.read.option("header", True) \
                  .option("multiline", True) \
                  .option("inferSchema", True) \
                  .option("escape", '"') \
                  .csv(hdfs_path)


In [4]:
# Substituir valores nulos na coluna "2-alpha code" com "FE"
#Optei por aplicar o alfa code da ilha de jersey porque é a maior ilha e com mais população do canal da mancha.
shoot = shoot.withColumn(
    "2-alpha code",
    when(
        col("2-alpha code").isNull(),
        "FE"
    ).otherwise(col("2-alpha code"))
)

In [5]:
print(shoot.columns)


['Country Code', 'Short Name', 'Table Name', 'Long Name', '2-alpha code', 'Currency Unit', 'Special Notes', 'Region', 'Income Group', 'WB-2 code', 'National accounts base year', 'National accounts reference year', 'SNA price valuation', 'Lending category', 'Other groups', 'System of National Accounts', 'Alternative conversion factor', 'PPP survey year', 'Balance of Payments Manual in use', 'External debt Reporting status', 'System of trade', 'Government Accounting concept', 'IMF data dissemination standard', 'Latest population census', 'Latest household survey', 'Source of most recent Income and expenditure data', 'Vital registration complete', 'Latest agricultural census', 'Latest industrial data', 'Latest trade data', '_c30']


In [6]:
#Para eliminar o "\n" que desformata as linhas
shoot = shoot.withColumn("2-alpha code", regexp_replace(col("2-alpha code"), "\n", ""))

In [7]:
from pyspark.sql.functions import col, when

#No conjuntos de paises/organizações, que por serem organizações não requerem moeda propria, aplicamos o valor "uk

shoot = shoot.withColumn(
    "Currency Unit",
    when(
        col("Currency Unit").isNull(),
        "Unknown"
    ).otherwise(col("Currency Unit"))
)

In [8]:
#Substituir as notas especiais nulas para "Unknown"
shoot = shoot.withColumn(
    "Special Notes",
    when(
        col("Special Notes").isNull(),
        "Unknown"
    ).otherwise(col("Special Notes"))
)

In [9]:
#Conjuntos de países/organizações não tem localização geográfica própria, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Region",
    when(
        col("Region").isNull(),
        "Unknown"
    ).otherwise(col("Region"))
)

In [10]:
#Conjuntos de países/organizações podem ter várias regiões num mesmo conjunto, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Income Group",
    when(
        col("Income Group").isNull(),
        "Unknown"
    ).otherwise(col("Income Group"))
)

In [11]:
#Conjuntos de países/organizações não tem um ano base de uma conta nacional , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "National accounts base year",
    when(
        col("National accounts base year").isNull(),
        "Unknown"
    ).otherwise(col("National accounts base year"))
)

In [12]:
#Nem todos os países/organizações tem um ano referência de uma conta nacional , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "National accounts reference year",
    when(
        col("National accounts reference year").isNull(),
        "Unknown"
    ).otherwise(col("National accounts reference year"))
)

In [13]:
#Conjuntos de países/organizações não tem métodos de valoração de preços, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "SNA price valuation",
    when(
        col("SNA price valuation").isNull(),
        "Unknown"
    ).otherwise(col("SNA price valuation"))
)

In [14]:
#Nem todos os países/organizações tem público o financiamente que recebem de instituições finanaceiras internacionais, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Lending category",
    when(
        col("Lending category").isNull(),
        "Unknown"
    ).otherwise(col("Lending category"))
)

In [15]:
#Outras informações opcionais sobre os países/organizações, aos que tiverem o valor nulo  temos de mudar para "Unknown"
shoot = shoot.withColumn( 
    "Other groups",
    when(
        col("Other groups").isNull(),
        "Unknown"
    ).otherwise(col("Other groups"))
)

In [16]:
#As organizações não possuem um sistema nacional de contas , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "System of National Accounts",
    when(
        col("System of National Accounts").isNull(),
        "Unknown"
    ).otherwise(col("System of National Accounts"))
)

In [17]:
#Nenhuma linha da coluna apresenta valor, logo iremos remover a coluna: "Alternative conversion factor"
shoot = shoot.drop("Alternative conversion factor")

In [18]:
#Nenhuma linha da coluna apresenta valor, logo iremos remover a coluna: "PPP survey year"
shoot = shoot.drop("PPP survey year")

In [19]:
#As organizações não possuem balanço de pagamentos , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Balance of Payments Manual in use",
    when(
        col("Balance of Payments Manual in use").isNull(),
        "Unknown"
    ).otherwise(col("Balance of Payments Manual in use"))
)

In [20]:
#Nem todos os países/organizações revelam o débito externo , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "External debt Reporting status",
    when(
        col("External debt Reporting status").isNull(),
        "Unknown"
    ).otherwise(col("External debt Reporting status"))
)

In [21]:
#Nem todos os  países/organizações não possuem um sistema para acompanhar as transações comerciais, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "System of trade",
    when(
        col("System of trade").isNull(),
        "Unknown"
    ).otherwise(col("System of trade"))
)

In [22]:
#Nem todos os países/organizações possuem conceitos contabilisticos especifico , logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Government Accounting concept",
    when(
        col("Government Accounting concept").isNull(),
        "Unknown"
    ).otherwise(col("Government Accounting concept"))
)

In [23]:
#Nem todos os países/organizações possuem padrão o de disseminação de dados usado pelo FMI, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "IMF data dissemination standard",
    when(
        col("IMF data dissemination standard").isNull(),
        "Unknown"
    ).otherwise(col("IMF data dissemination standard"))
)

In [24]:
#Nem todos os países/organizações possuem censos da população, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Latest population census",
    when(
        col("Latest population census").isNull(),
        "Unknown"
    ).otherwise(col("Latest population census"))
)

In [25]:
#Nem todos os países/organizações possuem pesquisas domiciliares, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Latest household survey",
    when(
        col("Latest household survey").isNull(),
        "Unknown"
    ).otherwise(col("Latest household survey"))
)

In [26]:
#Não há fontes mais recentes para os dados, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Source of most recent Income and expenditure data",
    when(
        col("Source of most recent Income and expenditure data").isNull(),
        "Unknown"
    ).otherwise(col("Source of most recent Income and expenditure data"))
)

In [27]:
#Nem todos os países/organizações possuem registos vitais, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Vital registration complete",
    when(
        col("Vital registration complete").isNull(),
        "Unknown"
    ).otherwise(col("Vital registration complete"))
)

In [28]:
#Nem todos os países/organizações possuem censos da agricultura, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Latest agricultural census",
    when(
        col("Latest agricultural census").isNull(),
        "Unknown"
    ).otherwise(col("Latest agricultural census"))
)

In [29]:
#Nem todos os países/organizações possuem dados industriais, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Latest industrial data",
    when(
        col("Latest industrial data").isNull(),
        "Unknown"
    ).otherwise(col("Latest industrial data"))
)

In [30]:
#Nem todos os países/organizações possuem dados comerciais, logo temos de alterar o valor nulo para "Unknown"
shoot = shoot.withColumn( 
    "Latest trade data",
    when(
        col("Latest trade data").isNull(),
        "Unknown"
    ).otherwise(col("Latest trade data"))
)

In [31]:
#Eliminar a c30 que não tem nenhuma informacao
shoot = shoot.drop("_c30")

In [32]:
shoot = shoot.withColumnRenamed("Country Code", "Country_Code") 
shoot = shoot.withColumnRenamed("Short Name", "Short_Name") 
shoot = shoot.withColumnRenamed("Table Name", "Table_Name") 
shoot = shoot.withColumnRenamed("Long Name", "Long_name") 
shoot = shoot.withColumnRenamed("2-alpha code", "2_alpha_code") 
shoot = shoot.withColumnRenamed("Currency Unit", "Currency_Unit") 
shoot = shoot.withColumnRenamed("Special Notes", "Special_Notes") 
shoot = shoot.withColumnRenamed("Income Group", "Income_Group") 
shoot = shoot.withColumnRenamed("WB-2 code", "WB_2_code") 
shoot = shoot.withColumnRenamed("National accounts base year", "Nacional_accounts_base_year") 
shoot = shoot.withColumnRenamed("National accounts reference year", "Nacional_accounts_reference_year") 
shoot = shoot.withColumnRenamed("SNA price valuation", "SNA_price_valuation") 
shoot = shoot.withColumnRenamed("Lending category", "Lending_category") 
shoot = shoot.withColumnRenamed("Other groups", "Other_groups") 
shoot = shoot.withColumnRenamed("System of National Accounts", "System_of_National_Accounts") 
shoot = shoot.withColumnRenamed("Balance of Payments Manual in use", "Balance_of_Payments_Manual_in_use") 
shoot = shoot.withColumnRenamed("External debt Reporting status", "External_debt_Reporting_status") 
shoot = shoot.withColumnRenamed("System of trade", "System_of_trade") 
shoot = shoot.withColumnRenamed("Government Accounting concept", "Government_Accounting_concept") 
shoot = shoot.withColumnRenamed("IMF data dissemination standard", "IMF_data_dissemination_standard") 
shoot = shoot.withColumnRenamed("Latest population census", "Latest_population_census") 
shoot = shoot.withColumnRenamed("Latest household survey", "Latest_household_survey") 
shoot = shoot.withColumnRenamed("Source of most recent Income and expenditure data", "Source_of_most_recent_Income_and_expenditure_data") 
shoot = shoot.withColumnRenamed("Vital registration complete", "Vital_registration_complete") 
shoot = shoot.withColumnRenamed("Latest agricultural census", "Latest_agricultural_census") 
shoot = shoot.withColumnRenamed("Latest industrial data", "Latest_industrial_census") 
shoot = shoot.withColumnRenamed("Latest trade data", "Latest_trade_data")

In [33]:
shoot.printSchema()
shoot.show()

root
 |-- Country_Code: string (nullable = true)
 |-- Short_Name: string (nullable = true)
 |-- Table_Name: string (nullable = true)
 |-- Long_name: string (nullable = true)
 |-- 2_alpha_code: string (nullable = true)
 |-- Currency_Unit: string (nullable = true)
 |-- Special_Notes: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Income_Group: string (nullable = true)
 |-- WB_2_code: string (nullable = true)
 |-- Nacional_accounts_base_year: string (nullable = true)
 |-- Nacional_accounts_reference_year: string (nullable = true)
 |-- SNA_price_valuation: string (nullable = true)
 |-- Lending_category: string (nullable = true)
 |-- Other_groups: string (nullable = true)
 |-- System_of_National_Accounts: string (nullable = true)
 |-- Balance_of_Payments_Manual_in_use: string (nullable = true)
 |-- External_debt_Reporting_status: string (nullable = true)
 |-- System_of_trade: string (nullable = true)
 |-- Government_Accounting_concept: string (nullable = true)
 |-- IMF_

In [34]:
#Definir a estrutura do datagrama
customSchema = StructType([
    StructField("Country_Code", StringType(), True),
    StructField("Short_Name", StringType(), True),
    StructField("Table_Name", StringType(), True),
    StructField("Long_name", StringType(), True),
    StructField("2-alpha_code", StringType(), True),
    StructField("Currency_Unit", StringType(), True),
    StructField("Special_Notes", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("Income_Group", StringType(), True),
    StructField("WB-2_code", StringType(), True),
    StructField("Nacional_accounts_base_year", StringType(), True),
    StructField("Nacional_accounts_reference_year", StringType(), True),
    StructField("SNA_price_valuation", StringType(), True),
    StructField("Lending_category", StringType(), True),
    StructField("Other_groups", StringType(), True),
    StructField("System_of_National_Accounts", StringType(), True),
    StructField("Balance_of_Payments_Manual_in_use", StringType(), True),
    StructField("External_debt_Reporting_status", StringType(), True),
    StructField("System_of_trade", StringType(), True),
    StructField("Government_Accounting_concept", StringType(), True),
    StructField("IMF_data_dissemination_standard", StringType(), True),
    StructField("Latest_population_census", StringType(), True),
    StructField("Latest_household_survey", StringType(), True),
    StructField("Source_of_most_recent_Income_and_expenditure_data", StringType(), True),
    StructField("Vital_registration_complete", StringType(), True),
    StructField("Latest_agricultural_census", StringType(), True),
    StructField("Latest_industrial_census", StringType(), True),
    StructField("Latest_trade_data", StringType(), True)
])

#Escrever os dados no formato delta
shoot \
    .write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("hdfs://hdfs-nn:9000/warehouse/projeto.db/Stats_Country")


In [35]:
#Executa uma consulta sql para exibir todos os dados da tabela
spark.sql(
    """
    SELECT * FROM projeto.Stats_Country
    """
).show()

+------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-------------------+---------+---------------------------+--------------------------------+--------------------+----------------+------------+---------------------------+---------------------------------+------------------------------+--------------------+-----------------------------+-------------------------------+------------------------+-----------------------+-------------------------------------------------+---------------------------+--------------------------+------------------------+-----------------+
|Country_Code|          Short_Name|          Table_Name|           Long_name|2_alpha_code|       Currency_Unit|       Special_Notes|              Region|       Income_Group|WB_2_code|Nacional_accounts_base_year|Nacional_accounts_reference_year| SNA_price_valuation|Lending_category|Other_groups|System_of_National_Accounts|Balance_