In [773]:
# Import Pyself. Resources
from pyspark.sql.functions import *

# My Imports
import os
import pandas as pd
from datetime import datetime
import shutil

# PySpark Imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *

class MyFunctions(object):

    def __init__(self):
        self.spark = (SparkSession.builder
                      .config('spark.jars', 'driver/postgresql-42.6.0.jar')
                      .config('spark.driver.extraClassPath', 'driver/postgresql-42.6.0.jar')
                      .appName("MyProject").getOrCreate())

        self.host = "localhost"
        self.port = "5432"
        self.database = "ensurwave"
        self.username = "postgres"
        self.password = "postgres"
        self.url = f"jdbc:postgresql://{self.host}:{self.port}/{self.database}"

        self.file = 'data/new/20230331_employees_details.json'
        self.file1 = 'data/new/20230417_employees_details.json'
    
    def rmtree_spark_directory(self):
        try:
            shutil.rmtree(f'{os.getcwd()}/spark-warehouse')
        except:
            print(f"'{os.getcwd()}/spark-warehouse' doesn\'t exists.")

In [774]:
mf = MyFunctions()

In [803]:
is_exist_table = bool(mf.spark.read.format('jdbc')
                .option('url', mf.url)
                .option('dbtable', 'information_schema.tables')
                .option('user', mf.username)
                .option('password', mf.password)
                .option('driver', 'org.postgresql.Driver').load().filter('table_name == "schema_history"').count())

print(is_exist_table)

if not is_exist_table:
    print('verdade')

False
verdade


## Carga do arquivo

In [761]:
# Create a PySpark dataframe
employees_raw = mf.spark.read.option('inferSchema', True).option(
    'multiline', 'true').json(mf.file1)

fileName = mf.file.split('/')[-1]

load_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

employees_raw.show(truncate=False)

+---------------------------------------------------------------------------------------------------------+------+---------+-------------+----------------------------------------------------------------------------------------------+
|attributes                                                                                               |id    |isDeleted|name         |salaryValues                                                                                  |
+---------------------------------------------------------------------------------------------------------+------+---------+-------------+----------------------------------------------------------------------------------------------+
|{Engineering, alice.johnson@example.com, Senior Developer, 2021-05-01T08:30:00.000Z, Active}             |abc123|false    |Alice Johnson|[{USD, Base, 100000}, {USD, Bonus, 15000}, {USD, Benefits, 5000}, {USD, Stock Options, 20000}]|
|{Sales, john.doe@example.com, Sales Manager, 2020-01-01T09:00:0

## Tratamento das colunas existentes

In [762]:
other_types = [column for column, datatype in employees_raw.dtypes if 'struct' not in datatype and 'array' not in datatype]
struct_types = [f'{column}.*' for column, datatype in employees_raw.dtypes if 'struct' in datatype and 'array' not in datatype]
array_types = [column for column, datatype in employees_raw.dtypes if 'array' in datatype]

# Columns separating structs and array types
new = other_types + struct_types + array_types

second_change = employees_raw.select(new)

for i in array_types:
    second_change = second_change.select('*',explode(i).alias(f'{i}_ex')).drop(i).withColumnRenamed(f'{i}_ex',i)
    second_change = second_change.select('*',f'{i}.*').drop(i)

new_data = second_change.withColumn('fileName',lit(fileName)).withColumn('load_timestamp', to_timestamp(lit(load_date), 'yyyy-MM-dd HH:mm:ss'))
new_dt_columns = new_data.columns
new_dt_columns.sort()

new_data = new_data.select(new_dt_columns)
new_data.show()


+--------+---------------+--------------------+--------------------+------+---------+-------------------+-------------+--------------------+--------------------+--------+-------------+------+
|currency|     department|               email|            fileName|    id|isDeleted|     load_timestamp|         name|            position|           startDate|  status|         type| value|
+--------+---------------+--------------------+--------------------+------+---------+-------------------+-------------+--------------------+--------------------+--------+-------------+------+
|     USD|    Engineering|alice.johnson@exa...|20230331_employee...|abc123|    false|2023-04-17 19:48:28|Alice Johnson|    Senior Developer|2021-05-01T08:30:...|  Active|         Base|100000|
|     USD|    Engineering|alice.johnson@exa...|20230331_employee...|abc123|    false|2023-04-17 19:48:28|Alice Johnson|    Senior Developer|2021-05-01T08:30:...|  Active|        Bonus| 15000|
|     USD|    Engineering|alice.johnson@

## Captura das colunas que vieram do arquivo

In [763]:
file_columns = new_data.schema
file_columns = [(i.name, i.dataType) for i in file_columns]
file_columns.sort()

file_columns = {"name":file_columns}
df_file_columns = pd.DataFrame(file_columns)
display(df_file_columns.head())

Unnamed: 0,name
0,"(currency, StringType())"
1,"(department, StringType())"
2,"(email, StringType())"
3,"(fileName, StringType())"
4,"(id, StringType())"


In [765]:
mf.rmtree_spark_directory()

'/Users/Marcusso/Documents/git/my_airflow_project/include/spark-warehouse' doesn't exists.


In [766]:
# Evaluating inf the table exists
is_exist_table = (mf.spark.read.format('jdbc')
         .option('url', mf.url)
         .option('dbtable', 'information_schema.tables')
         .option('user', mf.username)
         .option('password', mf.password)
         .option('driver', 'org.postgresql.Driver').load().filter('table_name == "employees"').count())

# Creating table if not exists
if not is_exist_table:
    print('Creating new table')
    (new_data
     .write
     .mode('overwrite')
     .format('jdbc')
     .option('url', mf.url)
     .option('dbtable', 'employees')
     .option('user', mf.username)
     .option('password', mf.password)
     .option('driver', 'org.postgresql.Driver')
     .save())
# Persist the data united with previous data
else:
    print('Persisting data...')
    employees = (mf.spark.read.format('jdbc').option('url', mf.url)
                    .option('dbtable', 'employees')
                    .option('user', mf.username)
                    .option('password', mf.password)
                    .option('driver', 'org.postgresql.Driver').load())

    table_columns = employees.schema
    table_columns = [(i.name, i.dataType) for i in table_columns]
    table_columns.sort()

    table_columns = {"name":table_columns}
    df_table_columns = pd.DataFrame(table_columns)

    different_columns = df_table_columns.merge(df_file_columns, on='name', how='outer', indicator=True).query("_merge not in ('both')")

    only_in_table = different_columns.query("_merge == 'left_only'")['name'].tolist()
    only_in_file = different_columns.query("_merge == 'right_only'")['name'].tolist()

    for column_name, column_type in only_in_table:
        new_data = new_data.withColumn(column_name, lit(None).cast(column_type))

    for column_name, column_type in only_in_file:
        employees = employees.withColumn(column_name,lit(None).cast(column_type))

    all_columns = employees.columns
    all_columns.sort()

    new_data = new_data.select(all_columns)
    employees = employees.select(all_columns)

    new_emp = ','.join([f"'{i[0]}'" for i in new_data.select('id').distinct().collect()])

    last_change_employees = employees.filter(f"id not in ({new_emp})")

    persist_data = last_change_employees.unionAll(new_data)
    persist_data.write.mode('overwrite').saveAsTable('tmp_employees')
    persist_data = mf.spark.table('tmp_employees')

    (persist_data
        .write
        .mode('overwrite')
        .format('jdbc')
        .option('url', mf.url)
        .option('dbtable', 'employees')
        .option('user', mf.username)
        .option('password', mf.password)
        .option('driver', 'org.postgresql.Driver')
        .save())

Persisting data...


In [767]:
mf.rmtree_spark_directory()

In [768]:
(mf.spark.read.format('jdbc').option('url', mf.url)
    .option('dbtable', 'employees')
    .option('user', mf.username)
    .option('password', mf.password)
    .option('driver', 'org.postgresql.Driver').load().show())

+--------+---------------+--------------------+--------------------+----------+---------+--------------------+-------------------+-------------+--------------------+----------------+--------------------+--------+-------------+------+
|currency|     department|               email|            fileName|        id|isDeleted|            joinedOn|     load_timestamp|         name|            position|satisfactionScoe|           startDate|  status|         type| value|
+--------+---------------+--------------------+--------------------+----------+---------+--------------------+-------------------+-------------+--------------------+----------------+--------------------+--------+-------------+------+
|     USD|    Engineering|alice.johnson@exa...|20230331_employee...|    abc123|    false|                null|2023-04-17 19:48:28|Alice Johnson|    Senior Developer|            null|2021-05-01T08:30:...|  Active|         Base|100000|
|     USD|    Engineering|alice.johnson@exa...|20230331_employee

In [None]:
mf.rmtree_spark_directory()