In [1]:
# imports
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_replace, monotonically_increasing_id
from pyspark.sql.types import IntegerType, DecimalType, StringType

In [2]:
# iniciar spark
conf = SparkConf()
#conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2')
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2,com.microsoft.azure:spark-mssql-connector_2.12:1.2.0')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.InstanceProfileCredentialsProvider')
spark = SparkSession.builder.config(conf=conf).getOrCreate()



:: loading settings :: url = jar:file:/usr/local/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.microsoft.azure#spark-mssql-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-17189f2c-cdd9-4384-834a-5d0914a9959a;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
	found com.microsoft.azure#spark-mssql-connector_2.12;1.2.0 in central
	found com.microsoft.sqlserver#mssql-jdbc;8.4.1.jre8 in central
:: resolution report :: resolve 395ms :: artifacts dl 18ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	com.microsoft.azure#spark-mssql-connector_2.12;1.2.0 from central in [default]
	com.microsoft.sqlserver#mssql-jdbc;8.4.1.jre8 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	-------------------------------------------

In [3]:
# Variáveis S3
bucket_tratado = os.environ['S3-BUCKET-TRATADOS']
client_bucket_name = os.environ['S3-BUCKET-CLIENTE']

# Variáveis Azure
user = ''
password = ''

server_name = 'bd-tweet-dev.database.windows.net'
data_base_name = 'BD-TWEETS'

# server_name = 'poupay.database.windows.net'
# data_base_name = 'poupay'

try:
    user = os.environ['USER']
    password = os.environ['PASSWORD']
except:
    # executar o cmd 'jupyter kernelspec list' e edite o arquivo 'kernel.json' no diretório, adicionando a propriedade env
    # que recebe um Json com as variáveis de ambiente
    print('Defina as variáveis de ambiente USER e PASSWORD no kernel')

In [4]:
def upload_df_s3(df, client_bucket_name, file_name, type_file='csv'):
    if type_file == 'csv':
        df.write \
       .mode('overwrite') \
       .csv(f's3a://{client_bucket_name}/{file_name}')
    elif type_file == 'json':
        df.write \
       .mode('overwrite') \
       .json(f's3a://{client_bucket_name}/{file_name}')
    else:
        print('Extensão de arquivo inválida')

In [5]:
def export_df_azure(df, table_name):
    _df.write \
    .format('com.microsoft.sqlserver.jdbc.spark') \
    .mode('overwrite') \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .option('url', f'jdbc:sqlserver://{server_name};databaseName={data_base_name};') \
    .option('dbtable', table_name) \
    .option('user', user) \
    .option('password', password) \
    .save()

In [6]:
def file_to_dataframe(bucket_name, file_name, delimiter=';', file_format='csv'):
    if file_format == 'csv':
        return spark.read.option('delimiter', delimiter).option('header', 'true').csv(f's3a://{bucket_name}/{file_name}')
    elif file_format == 'json':
        return spark.read.option("multiline","true").json(f's3a://{bucket_name}/{file_name}')
    else:
        print('Formato inválido')

In [7]:
# load s3 file
file_name = 'meats_1991_2019'
file_name2 = 'meat_2020_2021.csv'

df_1991_2019 = file_to_dataframe(bucket_tratado, file_name)
df_2020_2021 = file_to_dataframe(bucket_tratado, file_name2)
df = df_1991_2019.union(df_2020_2021).distinct()

22/06/08 01:27:24 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [8]:
# tratamento dados

# coluna de pais
df = df.withColumn('Pais', regexp_replace('Pais', 'Brazil', 'Brasil')) \
    .withColumn('Pais', regexp_replace('Pais', 'Germany', 'Alemanha'))

# coluna de produtos
df = df.withColumn('Produto', regexp_replace('Produto', 'Meat, cattle', 'Bovino')) \
    .withColumn('Produto', regexp_replace('Produto', 'Meat, chicken', 'Frango')) \
    .withColumn('Produto', regexp_replace('Produto', 'Meat, pig', 'Suino')) \
    .withColumn('Produto', regexp_replace('Produto', 'Eggs, hen, in shell', 'Ovo de Galinha')) \
    .withColumn('Valor', regexp_replace('Valor', ',', '.'))

# tirando dados de ovos
df = df.filter('Produto IN ("Bovino", "Frango", "Suino")')

In [9]:
# configurando nome de coluna tabela principal
df = df.withColumnRenamed('Pais', 'nome_pais') \
    .withColumnRenamed('Produto', 'nome_produto') \
    .withColumnRenamed('Ano', 'ano') \
    .withColumnRenamed('Valor', 'valor') \
    .sort("nome_pais","nome_produto", "ano")

# df.show()

In [10]:
# dataframe de países
df_paises = df.select('nome_pais').distinct() \
    .withColumnRenamed('nome_pais', 'pais') \
    .withColumn('id_pais', (monotonically_increasing_id() + 1))

# df_paises.show()

In [11]:
# dataframe de produtos
df_produtos = df.select('nome_produto').distinct() \
    .withColumnRenamed('nome_produto', 'produto') \
    .withColumn('id_produto', (monotonically_increasing_id() + 1))

# df_produtos.show()

In [12]:
# dataframe registros
df_registros = df.join(df_paises, df.nome_pais == df_paises.pais) \
    .join(df_produtos, df.nome_produto == df_produtos.produto) \
    .select('valor', 'ano', 'id_pais', 'id_produto')

# df_registros.show()

In [14]:
# Configurações do arquivo a ser gerado no S3
file_name = 'PAISES_MEAT'
file_extension = 'json'

# Tabela que representa o dataframe no Azure
table_name = 'TB_PAISES'

# Montando o dataframe com o nome dos campos como o da tabela
_df = df_paises.select(
    col('id_pais').cast(IntegerType()).alias('ID_PAIS'),
    col('pais').alias('NOME_PAIS')
)

upload_df_s3(_df, client_bucket_name, file_name, file_extension)
export_df_azure(df, table_name)

22/06/08 01:29:05 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/08 01:29:07 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [15]:
# Configurações do arquivo a ser gerado no S3
file_name = 'PRODUTOS_MEAT'
file_extension = 'json'

# Tabela que representa o dataframe no Azure
table_name = 'TB_PRODUTOS'

# Montando o dataframe com o nome dos campos como o da tabela
_df = df_produtos.select(
    col('id_produto').cast(IntegerType()).alias('ID_PRODUTO'),
    col('produto').alias('NOME_PRODUTO')
)

upload_df_s3(_df, client_bucket_name, file_name, file_extension)
export_df_azure(df, table_name)

22/06/08 01:29:11 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/08 01:29:12 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [16]:
# Configurações do arquivo a ser gerado no S3
file_name = 'VALOR_PRODUCAO'
file_extension = 'csv'

# Tabela que representa o dataframe no Azure
table_name = 'TB_VALOR_PRODUCAO'

# Montando o dataframe com o nome dos campos como o da tabela
_df = df_registros.select(
    col('valor').cast(DecimalType(17,2)).alias('VALOR_PRODUCAO'),
    col('ano').cast(IntegerType()).alias('ANO_VENDA'),
    col('id_pais').cast(IntegerType()).alias('ID_PAIS'),
    col('id_produto').cast(IntegerType()).alias('ID_PRODUTO')
)

upload_df_s3(_df, client_bucket_name, file_name, file_extension)
export_df_azure(df, table_name)

22/06/08 01:29:16 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
22/06/08 01:29:18 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                