<a href="https://colab.research.google.com/github/withswim/data-engineering-batch5/blob/master/%5BSooyoung%5DAirflow_2%EC%A3%BC%EC%B0%A8_%EC%88%99%EC%A0%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **1. name, gender 읽어오기 ETL 버전 완성**

In [None]:
import psycopg2
import requests
import csv

def extract(url):
  f = requests.get(url)
  return (f.text)

def transform(data):
  reader = csv.reader(data.strip().split("\n"))
  return reader

# Redshift connection 함수
def get_Redshift_connection():
    host = "ssde.cnqux5xggmn5.us-east-2.redshift.amazonaws.com"
    redshift_user = "withswim"
    redshift_pass = "Withswim!1"
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))
    conn.set_session(autocommit=True)
    return conn.cursor()


def load(csv_reader):

    try:
        # create a conncection
        cur = get_Redshift_connection()
      
    except Exception as err:
        print("Exception occured while creating a connection", err)

    else:
        try:
            sql = """BEGIN;

                    CREATE TABLE IF NOT EXISTS name_gender (
                    name varchar(64),
                    gender varchar(16)
                    );

                    SELECT COUNT(1) FROM name_gender;
                    """
            cur.execute(sql)
            bef_cnt = cur.fetchone()[0]

            sql = """DELETE FROM name_gender;"""
            cur.execute(sql)
 
            next(csv_reader) #첫번째 title은 skip
            for name, gender in csv_reader:
                sql = """INSERT INTO NAME_GENDER (NAME, GENDER) VALUES (%s, %s);"""
                cur.execute(sql,(name, gender))

            sql = """SELECT COUNT(1) FROM name_gender;"""
            cur.execute(sql)
            aft_cnt = cur.fetchone()[0]

            #멱등성 보장되는지 확인
            if bef_cnt != aft_cnt and bef_cnt != 0:
                raise Exception

            cur.execute("END")

        except Exception as err:
            cur.execute("ROLLBACK")


  """)


In [None]:
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"

data = extract(link)
csv_reader = transform(data)
load(csv_reader)

# **2. 서울 7일 평균/최소/최대 온도 읽어오기 (Full Fresh 버전)**

## **API Key 숨기기**
---
*   API key 정보를 가지고 있는 config.txt파일을 만들어서 colab에 올리거나 google drive에 올리고 mount
*   config파일을 열어서 API key값을 읽어 온 후, url에 붙여서 사용
*   문제점: colab을 사용할 경우 새로 열때마다 파일을 업로드 해주어야 함, google drive에 올릴경우 google drive 비밀번호를 입력해야 함

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import requests
from datetime import datetime, date
import json

from google.colab import files

with open('config.txt', 'r') as f:
  data = f.readline()
  api_key = data
  f.close()


url = "https://api.openweathermap.org/data/2.5/onecall?lat=37.34&lon=126.57&exclude=current,minutely,hourly,alerts&appid="+api_key

response = requests.get(url)
data = response.json()

daily = data["daily"]

try:
    # create a conncection
    cur = get_Redshift_connection()
  
except Exception as err:
    print("Exception occured while creating a connection", err)

else:
    try:
      sql = """BEGIN;
               CREATE TABLE IF NOT EXISTS weather_forecast_full (
                  date DATE PRIMARY KEY,
                  day float,
                  min float,
                  max float
              );
              DELETE FROM weather_forecast_full;"""
      cur.execute(sql)

      for entry in daily:
          dt = datetime.fromtimestamp(entry["dt"]).date()
          day = entry["temp"]["day"] #평균온도
          min = entry["temp"]["min"] #최소온도 
          max = entry["temp"]["max"] #최대온도

          sql = """INSERT INTO weather_forecast_full (date, day, min, max) VALUES (%s, %s, %s, %s);"""
          cur.execute(sql,(dt, day, min, max))

      cur.execute("COMMIT")

    except Exception as err:
        cur.execute("ROLLBACK")



# **3. 서울 7일 평균/최소/최대 온도 읽어오기 (Incremental Update 버전)**

In [None]:
import requests
from datetime import datetime, date
import json

from google.colab import files

with open('config.txt', 'r') as f:
  data = f.readline()
  api_key = data
  f.close()

url = "https://api.openweathermap.org/data/2.5/onecall?lat=37.34&lon=126.57&exclude=current,minutely,hourly,alerts&appid="+api_key

response = requests.get(url)
data = response.json()

daily = data["daily"]

try:
    # create a conncection
    cur = get_Redshift_connection()
  
except Exception as err:
    print("Exception occured while creating a connection", err)

else:
    try:
        sql = """BEGIN;
                    CREATE TABLE IF NOT EXISTS weather_forecast_inc (
                      date DATE PRIMARY KEY,
                      day float,
                      min float,
                      max float,
                      created timestamp DEFAULT sysdate
                    );

                    CREATE TABLE IF NOT EXISTS #temp_weather_forecast (
                      date DATE PRIMARY KEY,
                      day float,
                      min float,
                      max float,
                      created timestamp DEFAULT sysdate
                    );

                    INSERT INTO #temp_weather_forecast
                    SELECT date, day, min, max, created 
                    FROM weather_forecast_inc; 
                     
                    SELECT COUNT(1) FROM #temp_weather_forecast;
                """
        cur.execute(sql)
        bef_cnt = cur.fetchone()[0]

        for entry in daily:
            dt = datetime.fromtimestamp(entry["dt"]).date()
            day = entry["temp"]["day"] #평균온도
            min = entry["temp"]["min"] #최소온도 
            max = entry["temp"]["max"] #최대온도

            sql = """INSERT INTO #temp_weather_forecast (date, day, min, max, created) VALUES (%s, %s, %s, %s, sysdate);"""
            cur.execute(sql,(dt, day, min, max))

        sql = """SELECT COUNT(1) FROM #temp_weather_forecast;"""
        cur.execute(sql)
        aft_cnt = cur.fetchone()[0]

        #추가된 데이터가 없을 경우, Exception으로 이동해서 ROLLBACK
        if bef_cnt == aft_cnt:
           raise Exception

        sql = """
              DELETE FROM weather_forecast_inc;

              INSERT INTO weather_forecast_inc
              SELECT date, day, min, max, created 
              FROM (SELECT date, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created DESC) nn, day, min, max, created
                FROM #temp_weather_forecast
                ORDER BY date, nn)
              WHERE nn = 1; 

              SELECT date, COUNT(1) FROM weather_forecast_inc GROUP BY date HAVING COUNT(1)>1;
              """

        cur.execute(sql)
        result = cur.fetchone() 

        #date컬럼의 uniqueness 보장되는지 확인
        if result != None:
            raise Exception

        cur.execute("COMMIT")

    except Exception as err:
        print("err: ", err)
        cur.execute("ROLLBACK")

        