In [16]:
from pyspark.sql import SparkSession 
from pyspark import SparkConf
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, count, when, isnan, max, sum ,to_date, year,month
import os 

In [56]:
#Connection details with PostgreSQL
PSQL_SERVERNAME = "localhost"
PSQL_PORTNUMBER = 5432
PSQL_DBNAME = "mydb"
PSQL_USRRNAME = "lamdo"
PSQL_PASSWORD = "lamdo1"
URL = f"jdbc:postgresql://{PSQL_SERVERNAME}:{PSQL_PORTNUMBER}/{PSQL_DBNAME}"

In [4]:
# Name_Table you want to create in the database 
TABLE_POSTGRES = "test_1"

In [5]:
# Connect SparkSession
conf = SparkConf()
path = "/home/lamdo/spark-cluster/data/postgresql-42.2.22.jar"
conf.set("spark.jars", path) 
spark = SparkSession.builder.config(conf=conf).appName("test").getOrCreate()



In [52]:
def extract_data(spark):
    path = "/home/lamdo/spark-cluster/data/data.csv"
    df = spark.read.option('inferSchema',True).option('header' , True).csv(path)
    return df


In [12]:
df.show(5)

+---------+----+----------+--------+-------+---------+--------------+-------+---------+----------+
|Direction|Year|      Date| Weekday|Country|Commodity|Transport_Mode|Measure|    Value|Cumulative|
+---------+----+----------+--------+-------+---------+--------------+-------+---------+----------+
|  Exports|2015|01/01/2015|Thursday|    All|      All|           All|      $|104000000| 104000000|
|  Exports|2015|02/01/2015|  Friday|    All|      All|           All|      $| 96000000| 200000000|
|  Exports|2015|03/01/2015|Saturday|    All|      All|           All|      $| 61000000| 262000000|
|  Exports|2015|04/01/2015|  Sunday|    All|      All|           All|      $| 74000000| 336000000|
|  Exports|2015|05/01/2015|  Monday|    All|      All|           All|      $|105000000| 442000000|
+---------+----+----------+--------+-------+---------+--------------+-------+---------+----------+
only showing top 5 rows



In [53]:
def transform_data(df):
    # Define Schema
    df_write = df.select('Date','Country','Commodity','Value','Cumulative')
    df_write = df_write.withColumn("Date",to_date(df_write["Date"],"dd/MM/yyyy"))
    df_write.printSchema()
    # Find all data of United States about Meat and edible offal.
    df_write = df_write.select(year(df_write["Date"]).alias("Year"),month(df_write["Date"]).alias("Month"),'*').\
                drop(df_write["Date"]).\
                filter((df_write.Country =="United States") & (df_write.Commodity=="Meat and edible offal"))
    '''
    #Check null:
    df_write.select([count(when(col(c).contains('Null') | \
                                (col(c) == '' ) | \
                                col(c).isNull() | \
                                isnan(c), c 
                               )).alias(c)
                        for c in df_write.columns]).show()
    '''

    # Caculator Value per month and year
    df_write = df_write.groupBy(df_write["Year"],df_write["Month"]).\
                        agg(sum("Value").alias("Value"),max("Cumulative").alias("Cumulative")).\
                        orderBy("Year", "Month")
    df_write.show()
    
    
    return(df_write)

In [54]:
def load_data(df_write):
    df_write.write\
        .format("jdbc")\
        .option("url", URL)\
        .option("dbtable", TABLE_POSTGRES)\
        .option("user", PSQL_USRRNAME)\
        .option("password", PSQL_PASSWORD)\
        .option("driver", "org.postgresql.Driver") \
        .save()
    print('-------------------')
    print('|Load Successfully|')
    print('-------------------')

In [57]:
if __name__ == "__main__":
    extract = extract_data(spark)
    transform = transform_data(extract)
    load_data(transform)

root
 |-- Date: date (nullable = true)
 |-- Country: string (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- Value: integer (nullable = true)
 |-- Cumulative: long (nullable = true)

+----+-----+---------+----------+
|Year|Month|    Value|Cumulative|
+----+-----+---------+----------+
|2015|    1|201027000| 204000000|
|2015|    2|198025000| 403000000|
|2015|    3|248034000| 650000000|
|2015|    4|167024000| 818000000|
|2015|    5|212032000|1028000000|
|2015|    6|183025000|1216000000|
|2015|    7|136018000|1353000000|
|2015|    8|106011000|1458000000|
|2015|    9|113009000|1571000000|
|2015|   10| 64002000|1636000000|
|2015|   11|137016000|1774000000|
|2015|   12|141020000|1916000000|
|2016|    1|154021000| 155000000|
|2016|    2|155018000| 310000000|
|2016|    3|177024000| 486000000|
|2016|    4|176025000| 661000000|
|2016|    5|197027000| 859000000|
|2016|    6|180024000|1038000000|
|2016|    7|108011000|1145000000|
|2016|    8| 81009000|1226000000|
+----+-----+--------