## Ingestion Step

**Input** : tblName -> name of table to ingest data to HDFS, executionDate -> date to partition in DHFS Datalake <br>
**Output**: data in DHFS Datalake is updated
1. Load data from PostgreSQL in **tblName** table.
2. Update data in **tblName** folder with these below steps:  
 - 2.1 Get the lastest record_id in datalake (if **tblName** folder isn't empty) 
 - 2.2 Get the lastest records in PostgreSQL
 - 2.3 Append records in PostreSQL from lastest record_id in HDFS Datalake

### Import Neccessary Libraris

In [1]:
import pyspark
from pyspark import SparkContext, SQLContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, max
import sys
import subprocess

### Receive 2 arguments: tblName, executionDate

In [5]:
tblName = input("Input table name from PostgreSQL which load to HDFS: ") 
executionDate = input("Input date you want ingest data from PostgreSQL to HDFS DataLake: ")

In [6]:
executionDate

'2023-07-25'

In [7]:
runTime = executionDate.split("-")
year = runTime[0]
month = runTime[1]
day = runTime[2]

### Load data from tblName table in PostgreSQL

In [8]:
# create spark session
spark = pyspark.sql.SparkSession \
   .builder \
   .appName("Ingestion - from Postgres to HDFS") \
   .config('spark.driver.extraClassPath', "postgresql-42.6.0.jar") \
   .getOrCreate()

23/07/26 00:46:05 WARN Utils: Your hostname, bigdata-etl resolves to a loopback address: 127.0.1.1; using 192.168.85.128 instead (on interface ens33)
23/07/26 00:46:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/26 00:46:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
# read table from db using spark jdbc
df = spark.read \
   .format("jdbc") \
   .option("url", "jdbc:postgresql://localhost:5432/my_company") \
   .option("dbtable", tblName) \
   .option("user", "postgres") \
   .option("password", "loc//14122000") \
   .option("driver", "org.postgresql.Driver") \
   .load()

In [10]:
df.show(10)

[Stage 0:>                                                          (0 + 1) / 1]

+---+--------+----------+----------+
| id|quantity|created_at|product_id|
+---+--------+----------+----------+
|  1|       1|2009-01-25|    331449|
|  2|       1|2019-09-13|    182256|
|  3|       2|2004-05-04|    108399|
|  4|       3|2011-02-20|     81461|
|  5|       3|2007-07-11|    136274|
|  6|       3|2003-08-01|    182938|
|  7|       2|2020-07-16|    368901|
|  8|       1|2023-05-03|    294007|
|  9|       2|2006-06-17|    201290|
| 10|       2|2012-07-25|    396152|
+---+--------+----------+----------+
only showing top 10 rows



                                                                                

### Update data in tblName folder in DHFS

#### Get the lastest record_id in datalake (if **tblName** folder isn't empty) 

In [11]:
# function to interact with hdfs storage
def run_cmd(args_list):
    print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return = proc.returncode
    return s_return, s_output, s_err

In [32]:
tblLocation = f'hdfs://localhost:9000/datalake/{tblName}'

In [33]:
# check whether folder exist of not
(ret, out, err) = run_cmd(['hdfs', 'dfs', '-du', '-s', tblLocation])
exists = True if len(str(out).split()) > 1 else False
print(exists)

Running system command: hdfs dfs -du -s hdfs://localhost:9000/datalake/orders


False


In [34]:
tblQuery = ""
if exists:
    datalake_df = spark.read.format('parquet').load(tblLocation)
    record_id = datalake_df.agg(max("id")).head()[0]
    tblQuery = f"SELECT * FROM {tblName} WHERE id > {record_id} AS tmp"
else:
    tblQuery = f"SELECT * FROM {tblName} AS tmp"

In [35]:
tblQuery

'SELECT * FROM orders AS tmp'

#### Get the lastest records in PostgreSQL

In [36]:
jdbc_df = spark.read \
   .format("jdbc") \
   .option("url", "jdbc:postgresql://localhost:5432/my_company") \
   .option("dbtable", tblName) \
   .option("user", "postgres") \
   .option("password", "loc//14122000") \
   .option("driver", "org.postgresql.Driver") \
   .load(tblQuery)

In [37]:
jdbc_df.show(5)

[Stage 5:>                                                          (0 + 1) / 1]

+---+--------+----------+----------+
| id|quantity|created_at|product_id|
+---+--------+----------+----------+
|  1|       1|2009-01-25|    331449|
|  2|       1|2019-09-13|    182256|
|  3|       2|2004-05-04|    108399|
|  4|       3|2011-02-20|     81461|
|  5|       3|2007-07-11|    136274|
+---+--------+----------+----------+
only showing top 5 rows



                                                                                

#### Append records in PostreSQL from lastest record_id in HDFS Datalake

In [38]:
output_df = jdbc_df.withColumn("year", lit(year)).withColumn("month", lit(month)).withColumn("day", lit(day))
output_df.write.partitionBy("year", "month", "day").mode("append").parquet(tblLocation)

                                                                                