In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [19]:
spark = SparkSession.builder \
     .master("local") \
    .appName("PySpark PostgreSQL") \
    .config("spark.jars", "postgresql-42.3.6.jar") \
    .getOrCreate()

# path\to\
#C:\Local-Spark\spark-3.2.1-bin-hadoop3.2\jars\postgresql-42.3.6.jar

#load by JDBC Driver

In [20]:
# load by table
app_incident = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/SMartDB")\
    .option("dbtable", "app_incident") \
    .option("user", "postgres") \
    .option("password", "P@ssw0rd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

app_incident.printSchema()

root
 |-- id: integer (nullable = true)
 |-- incident_no: string (nullable = true)
 |-- incident_datetime: timestamp (nullable = true)
 |-- incident_reference_customer_caseNo: string (nullable = true)
 |-- incident_customer_support: string (nullable = true)
 |-- incident_problem_start: timestamp (nullable = true)
 |-- incident_problem_end: timestamp (nullable = true)
 |-- incident_description: string (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- incident_severity_id: integer (nullable = true)
 |-- incident_status_id: integer (nullable = true)
 |-- inventory_id: integer (nullable = true)
 |-- incident_close_datetime: timestamp (nullable = true)
 |-- incident_type_id: integer (nullable = true)
 |-- service_type_id: integer (nullable = true)
 |-- incident_subject: string (nullable = true)



In [11]:
#declare table
app_incident.createOrReplaceTempView('incident_table')

sql="""
select incident_no,incident_datetime ,incident_close_datetime ,incident_subject
from incident_table where   incident_datetime >='2022-05-01'
limit 20
"""

df_incident=spark.sql(sql)
df_incident.show()

+-------------+-------------------+-----------------------+--------------------+
|  incident_no|  incident_datetime|incident_close_datetime|    incident_subject|
+-------------+-------------------+-----------------------+--------------------+
|SR-ES-22-0753|2022-05-17 13:00:00|    2022-05-17 21:00:00|พบ 3PAR 9540 SFP ...|
|SR-ES-22-0530|2022-05-02 15:29:00|    2022-05-04 16:30:00|DBK-01iMedApp and...|
|SR-ES-22-0783|2022-05-20 13:38:00|    2022-05-20 15:00:00|HA Group Notifica...|
|SR-ES-22-0788|2022-05-18 09:10:00|    2022-05-18 11:30:00|     Battery expired|
|SR-ES-22-0651|2022-05-09 18:00:00|                   null|Migrate load bala...|
|SR-ES-22-0578|2022-05-05 17:05:00|    2022-05-06 11:51:00|BSN-MSA2050-01 Di...|
|SR-ES-22-0792|2022-05-22 11:15:00|    2022-05-23 14:00:00|BHQ-3PAR8200-S2 D...|
|SR-ES-22-0754|2022-05-17 09:58:00|    2022-05-17 11:40:00|vCenter alert "Lo...|
|SR-ES-22-0617|2022-05-06 18:30:00|    2022-05-07 05:00:00|ตรวจสอบระบบ เนื่อ...|
|SR-ES-22-0755|2022-05-13 10

In [22]:
# load by view
dim_inventory = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/SMartDB")\
    .option("dbtable", "dim_inventory") \
    .option("user", "postgres") \
    .option("password", "P@ssw0rd") \
    .option("driver", "org.postgresql.Driver") \
    .load()

dim_inventory.printSchema()

root
 |-- id: integer (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- product_type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- model: string (nullable = true)
 |-- customer_warranty_start: date (nullable = true)
 |-- customer_warranty_end: date (nullable = true)
 |-- customer_sla: string (nullable = true)
 |-- is_dummy: boolean (nullable = true)



In [25]:
#declare table
dim_inventory.createOrReplaceTempView('view_inventory')

sql="""
select product_type,brand,count(serial_number) as no_inventory
from view_inventory  where  customer_warranty_start>='2022-05-01'
group by  product_type,brand
"""

view_inventory=spark.sql(sql)
view_inventory.show()

+------------+------+------------+
|product_type| brand|no_inventory|
+------------+------+------------+
|      Switch|   HPE|           5|
|     Storage|NetApp|           2|
|    Software|VMWare|           1|
|      Server|   HPE|          49|
+------------+------+------------+



In [26]:
sql_count_status="""
select incident_status_id,count(incident_status_id) as count_status
from incident_table group by  incident_status_id  order by incident_status_id 
"""
df_count_status=spark.sql(sql_count_status)
df_count_status.show()



+------------------+------------+
|incident_status_id|count_status|
+------------------+------------+
|                 2|          35|
|                 3|           2|
|                 4|         727|
|                 5|          17|
+------------------+------------+



In [45]:
# load multiple table

# load by table
reader_db= spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/SMartDB")\
    .option("user", "postgres") \
    .option("password", "P@ssw0rd") \

reader_db

<pyspark.sql.readwriter.DataFrameReader at 0x1fa0025daf0>

In [46]:
for tablename in ['app_brand','app_model']:
    reader_db.option("dbtable", tablename).load().createTempView(tablename)
    

In [48]:
sql_brand="""
select * from app_brand
"""

df_brand=spark.sql(sql_brand)
df_brand.show()

+---+-----------+
| id| brand_name|
+---+-----------+
|  1|        Yit|
|  2|     NetApp|
|  3|     VMWare|
|  4|        HPE|
|  5|      Cisco|
|  6|       DELL|
|  7|    Nutanix|
|  8|     RedHat|
|  9|   Symantec|
| 10| TrendMicro|
| 11|     Oracle|
| 12|Super Micro|
| 13|      Veeam|
| 15|   Broadcom|
| 16|  Palo Alto|
| 17|         F5|
| 18| CheckPoint|
| 19|    Veritas|
| 20|        EMC|
| 21|        IBM|
+---+-----------+
only showing top 20 rows



In [49]:
sql_model="""
select * from app_model
"""

df_model=spark.sql(sql_model)
df_model.show()

+---+--------------------+---------+--------------------+--------------------+--------+
| id|          model_name|is_active|   model_description|    model_updated_at|brand_id|
+---+--------------------+---------+--------------------+--------------------+--------+
|  1|               dummy|    false|This model is for...|2021-05-27 17:33:...|       1|
|  4|ProLiant BL460c Gen9|     true|              Server|2021-06-08 21:33:...|       4|
|  6|             FAS2554|     true|             Storage|2021-06-08 21:33:...|       2|
|  9|            AFF A220|     true|             Storage|2021-06-08 21:33:...|       2|
| 11|              AVA400|     true|             Storage|2021-06-08 21:33:...|       2|
| 12|              AVA-v8|     true|             Storage|2021-06-08 21:33:...|       2|
| 13|             FAS2520|     true|             Storage|2021-06-08 21:33:...|       2|
| 18|             FAS8200|     true|             Storage|2021-06-08 21:33:...|       2|
| 19|              SG5612|     t

In [27]:
#  load by  psycopg2 

# https://www.geeksforgeeks.org/connecting-postgresql-with-sqlalchemy-in-python/
# https://kontext.tech/article/636/connect-to-postgresql-in-spark-pyspark

In [33]:
# load vy view

import psycopg2
import psycopg2.extras as extras
 
def get_postgres_conn():
 try:
  conn = psycopg2.connect(
         database='SMartDB', user='postgres',
      password='P@ssw0rd', host='localhost', 
     )
  return conn

 except Exception as error:
  print(error)      
  raise error
    
def list_data(sql,params,connection):
 df=None   
 with connection.cursor() as cursor:
    
    if params is None:
       cursor.execute(sql)
    else:
       cursor.execute(sql,params) 
    
#     print(sql)
#     print(params)
    
    columns = [col[0] for col in cursor.description]
    dataList = [dict(zip(columns, row)) for row in cursor.fetchall()]
    df = pd.DataFrame(data=dataList) 
 return df 

In [39]:
sql_all="""

select  incident.id, incident.incident_no, product_type.productype_name,brand.brand_name,model.model_name
,incident.incident_severity_id,severity.severity_name,incident.incident_type_id,xtype.incident_type_name
,incident.incident_status_id,status.incident_status_name,incident.service_type_id,service.service_type_name

,inventory.serial_number,datacenter.datacenter_name,incident.incident_subject

,TO_CHAR(incident.incident_datetime  AT TIME ZONE 'Asia/Bangkok','DD Mon YYYY HH24:MI') as open_datetime
,TO_CHAR(incident.incident_close_datetime  AT TIME ZONE 'Asia/Bangkok','DD Mon YYYY HH24:MI') as close_datetime


from app_incident as incident
inner join app_incident_type as  xtype on incident.incident_type_id = xtype.id
inner join  app_incident_status as status on incident.incident_status_id = status.id
inner join  app_incident_severity as severity on  incident.incident_severity_id = severity.id
inner join  app_service_type as service on incident.service_type_id= service.id

inner join app_inventory as inventory on incident.inventory_id = inventory.id
inner join app_datacenter as datacenter on inventory.datacenter_id = datacenter.id
inner join app_brand as brand on inventory.brand_id = brand.id
inner join app_model as model on inventory.model_id = model.id
inner join app_product_type as product_type on inventory.product_type_id = product_type.id
inner join app_customer as cust_support on inventory.customer_support_id = cust_support.id



inner join app_project as project on inventory.project_id = project.id
inner join app_company as company on project.company_id = company.id

where company.id=2
and inventory.customer_warranty_start<='2022-12-31'
and inventory.customer_warranty_end>='2022-01-01'

and incident.incident_status_id <>3  
and incident.incident_severity_id<>4

order by id

"""

df_all=list_data(sql_all,None,get_postgres_conn())
df_all.tail()

Unnamed: 0,id,incident_no,productype_name,brand_name,model_name,incident_severity_id,severity_name,incident_type_id,incident_type_name,incident_status_id,incident_status_name,service_type_id,service_type_name,serial_number,datacenter_name,incident_subject,open_datetime,close_datetime
125,789,SR-ES-22-0789,Software,VMWare,vRealize Automation Advanced Per OSI,3,Minor,1,General Incident,4,Closed,1,Incident,-,SILA1,[Cenprod] ขอรบกวนช่วยตรวจสอบ VM Cloning Issue,20 May 2022 09:38,20 May 2022 14:25
126,790,SR-ES-22-0790,Software,TrendMicro,Smart Protection for Endpoints,3,Minor,1,General Incident,4,Closed,2,Request,-,SILA1,[Trendmicro Apex1] สอบถาม Clean Malware ไฟล์ m...,22 May 2022 07:31,22 May 2022 13:00
127,791,SR-ES-22-0791,Software,VMWare,vRealize Automation Advanced Per OSI,3,Minor,1,General Incident,4,Closed,2,Request,-,SILA1,[CenProd] รบกวนช่วยตรวจสอบ DNS บน VM,23 May 2022 10:11,23 May 2022 14:04
128,799,SR-ES-22-0799,Storage,NetApp,FAS8020,3,Minor,1,General Incident,2,In Progress,1,Incident,451548000236,SILA1,รบกวนทีม NetApp Support ช่วยหา Solution ในการ ...,13 May 2022 13:23,
129,800,SR-ES-22-0800,Software,VMWare,vRealize Automation Advanced Per OSI,3,Minor,8,Software,2,In Progress,2,Request,-,SILA1,[MOM] ประชุมเพื่อหาแนวทางในการเพิ่ม Network In...,20 May 2022 09:00,


In [42]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Creating the DataFrame
sprak_df_all = spark.createDataFrame(df_all)

sprak_df_all.select(['incident_no','model_name','incident_status_name','severity_name']).show()

+-------------+--------------------+--------------------+-------------+
|  incident_no|          model_name|incident_status_name|severity_name|
+-------------+--------------------+--------------------+-------------+
|SR-ES-22-0015|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0016|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0017|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0018|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0019|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0020|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0021|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0022|vRealize Automati...|              Closed|        Minor|
|SR-ES-22-0030|              SG5712|              Closed|        Major|
|SR-ES-22-0031|             FAS8020|              Closed|        Major|
|SR-ES-22-0032|             FAS8020|              Closed|     Cr

In [None]:
# Load aclchemy