In [1]:
import findspark
findspark.init()

import pyspark

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum
from pyspark.sql import functions as F
import pandas as pd
import json
import os
from elasticsearch import Elasticsearch
import datetime

In [3]:
### Create Spark-Session

spark = SparkSession \
            .builder \
            .appName("Bulk-Data") \
            .getOrCreate()
spark

### Read Files

In [6]:
path_files = r'C:\Users\samfs\Desktop\Projetos-Samuel\AirPolutionSeoul\Python\Resources'

measurement_item_file  = os.path.join(path_files, "Data_Measurement_item_info.csv")
measurement_station_file  = os.path.join(path_files, "Data_Measurement_station_info.csv")
measurement_info_file  = os.path.join(path_files, "Measurement_info.csv")
measurement_file  = os.path.join(path_files, "Measurement_summary.csv")

In [11]:
### Read the Files:

item_info = spark.read.format('csv').option('header','true').load(measurement_item_file)
item_info.show(2)

+---------+---------+-------------------+----------+-------------+-----------+-------------+
|Item code|Item name|Unit of measurement|Good(Blue)|Normal(Green)|Bad(Yellow)|Very bad(Red)|
+---------+---------+-------------------+----------+-------------+-----------+-------------+
|        1|      SO2|                ppm|      0.02|         0.05|       0.15|          1.0|
|        3|      NO2|                ppm|      0.03|         0.06|        0.2|          2.0|
+---------+---------+-------------------+----------+-------------+-----------+-------------+
only showing top 2 rows



In [10]:
station_file = spark.read.format('csv').option('header','true').load(measurement_station_file)
station_file.show(2)

+------------+----------------------+--------------------+------------------+------------------+
|Station code|Station name(district)|             Address|          Latitude|         Longitude|
+------------+----------------------+--------------------+------------------+------------------+
|         101|             Jongno-gu|19, Jong-ro 35ga-...|37.572016399999995|127.00500749999999|
|         102|               Jung-gu|15, Deoksugung-gi...|37.564262899999996|126.97467569999999|
+------------+----------------------+--------------------+------------------+------------------+
only showing top 2 rows



In [12]:
info_file = spark.read.format('csv').option('header','true').load(measurement_info_file)
info_file.show(2)

+----------------+------------+---------+--------------------+-----------------+
|Measurement date|Station code|Item code|       Average value|Instrument status|
+----------------+------------+---------+--------------------+-----------------+
|2017-01-01 00:00|         101|        1|               0.004|                0|
|2017-01-01 00:00|         101|        3|0.059000000000000004|                0|
+----------------+------------+---------+--------------------+-----------------+
only showing top 2 rows



In [13]:
measurement = spark.read.format('csv').option('header','true').load(measurement_file)
measurement.show(2)

+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+-----+
|Measurement date|Station code|             Address|  Latitude|  Longitude|  SO2|  NO2|   O3| CO|PM10|PM2.5|
+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+-----+
|2017-01-01 00:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.004|0.059|0.002|1.2|73.0| 57.0|
|2017-01-01 01:00|         101|19, Jong-ro 35ga-...|37.5720164|127.0050075|0.004|0.058|0.002|1.2|71.0| 59.0|
+----------------+------------+--------------------+----------+-----------+-----+-----+-----+---+----+-----+
only showing top 2 rows



### Equipament class

In [15]:
class equipament:
    
    def __init__(self, code,name,unit,level1,level2,level3,level4):
        self.code = code
        self.name = name
        self.unit = unit
        self.good = float(level1)
        self.normal = float(level2)
        self.bad = float(level3)
        self.veryBad = float(level4)
        self.color = 0
    
    
    
    def getLevel(self,value):
        if value < self.good:
            self.color = 1
            return ("Good")
        elif value < self.normal:
            self.color = 2
            return ("Normal")
        elif value < self.bad:
            self.color = 3
            return ("Bad")
        elif value < self.veryBad:
            self.color = 4
            return ("Very Bad")
        
    def __repr__(self):
        return(f"Equipament Code = {str(self.code)}, Name = {str(self.name)}, Unit of Measurement = {str(self.unit)}")

In [16]:
item_df = item_info.toPandas()

list_equipaments = [equipament(row[0],row[1],row[2],row[3],row[4],row[5],row[6]) for index,row in item_df.iterrows()]

In [17]:
equipaments = {
    "SO2": list_equipaments[0],
    "NO2": list_equipaments[1],
    "CO": list_equipaments[2],
    "O3": list_equipaments[3],
    "PM10": list_equipaments[4],
    "PM2.5": list_equipaments[5]
}
equipaments

{'SO2': Equipament Code = 1, Name = SO2, Unit of Measurement = ppm,
 'NO2': Equipament Code = 3, Name = NO2, Unit of Measurement = ppm,
 'CO': Equipament Code = 5, Name = CO, Unit of Measurement = ppm,
 'O3': Equipament Code = 6, Name = O3, Unit of Measurement = ppm,
 'PM10': Equipament Code = 8, Name = PM10, Unit of Measurement = Mircrogram/m3,
 'PM2.5': Equipament Code = 9, Name = PM2.5, Unit of Measurement = Mircrogram/m3}

In [18]:
equipaments['SO2'].getLevel(0.01)

'Good'

### Adding new Column

In [25]:
## udf functions
from pyspark.sql.functions import col

udf_risk_SO2 = udf(lambda x: equipaments['SO2'].getLevel(float(x)))
udf_risk_NO2 = udf(lambda x: equipaments['NO2'].getLevel(float(x)))
udf_risk_CO = udf(lambda x: equipaments['CO'].getLevel(float(x)))
udf_risk_O3 = udf(lambda x: equipaments['O3'].getLevel(float(x)))
udf_risk_PM10 = udf(lambda x: equipaments['PM10'].getLevel(float(x)))

In [26]:
complete_measurements = measurement.withColumn("Level_SO2",udf_risk_SO2("SO2")) \
                                    .withColumn("Level_NO2",udf_risk_NO2("NO2")) \
                                    .withColumn("Level_CO",udf_risk_CO("CO")) \
                                    .withColumn("Level_O3",udf_risk_O3("O3")) \
                                    .withColumn("Level_PM10",udf_risk_PM10("PM10"))

In [31]:
complete_measurements_list = complete_measurements.collect()

AttributeError: 'list' object has no attribute 'toList'

In [40]:
len(complete_measurements_list)
complete_measurements_list[1]

Row(Measurement date='2017-01-01 01:00', Station code='101', Address='19, Jong-ro 35ga-gil, Jongno-gu, Seoul, Republic of Korea', Latitude='37.5720164', Longitude='127.0050075', SO2='0.004', NO2='0.058', O3='0.002', CO='1.2', PM10='71.0', PM2.5='59.0', Level_SO2='Good', Level_NO2='Normal', Level_CO='Good', Level_O3='Good', Level_PM10='Normal')

## Connect to Elasticsearch

In [39]:
from elasticsearch import helpers
es = Elasticsearch("localhost:9200")
es

<Elasticsearch([{'host': 'localhost', 'port': 9200}])>

In [None]:
### Convert to Bulk-JSON
id_cont = 0
actions = []

for row in complete_measurements_list:
    action = {
        "_index": "seoul_measurements",
        "_id": id_cont,
        "_source": {
              "timestamp": row[0],
              "station_code": int(row[1]),
              "address": row[2],
              "coordinate": {"lat": float(row[3]),"lon": float(row[4])},
              "SO2": float(row[5]),
              "NO2": float(row[6]),
              "O3": float(row[7]),
              "CO": float(row[8]),
              "PM10": float(row[9]),
              "Level_SO2": str(row[10]),
              "Level_NO2": str(row[11]),
              "Level_O3": str(row[12]),
              "Level_CO": str(row[13]),
              "Level_PM10": str(row[14])
            }
        }
    actions.append(action)
    id_cont += 1

helpers.bulk(es, actions)
