## Slack에 무언가 보내기

webhook 개인용 파서 하세요. 많이 보내면 민폐에요.

**메시지**

In [None]:
import requests
import os

# 환경변수에서 webhook URL 가져오기
webhook = os.environ.get('SLACK_WEBHOOK', 'YOUR_WEBHOOK_URL_HERE')
requests.post(webhook, json={'text': f'테스트 안녕하세요!'})

**함수**

In [None]:
import os

webhook = os.environ.get('SLACK_WEBHOOK', 'YOUR_WEBHOOK_URL_HERE')

def send_slack(message):
    requests.post(webhook, json={'text': f"{message}테스트입니다."})

In [4]:
send_slack("환율")

**이미지**

In [None]:
import os

webhook = os.environ.get('SLACK_WEBHOOK', 'YOUR_WEBHOOK_URL_HERE')

requests.post(webhook, json={
    'text': '지난 수업 복습',
    'block': [{
        'type': 'section',
        'accessory': {
            'type': 'image',
            'image_url': 'url',
            'alt_text': 'img'

        }
    }]
})

**오류 확인**

In [None]:
import os

webhook = os.environ.get('SLACK_WEBHOOK', 'YOUR_WEBHOOK_URL_HERE')

def send_slack(message, image_url=None):
    payload = {'text': message}

    if image_url:
        payload['blocks'] = [{
            'type': 'section',
            'text': {'type': 'mrkdwn', 'text': message},
            'accessory': {
                'type': 'image',
                'image_url': image_url,
                'alt_text': 'image'
            }
        }]

    try:
        resp = requests.post(webhook, json=payload, timeout=5)
        resp.raise_for_status()
        return True
    except:
        return False

In [None]:
# 사용
if send_slack("테스트"):
    print('전송 성공')
else:
    print("전송 실패")

### Slack + 환율 API 연동

In [None]:
import requests
import os

webhook = os.environ.get('SLACK_WEBHOOK', 'YOUR_WEBHOOK_URL_HERE')

# 환율 API 가져오기
resp = requests.get('https://api.frankfurter.dev/latest?from=USD&to=KRW') # 사용법 API 사이트에 들어가면 나와있음.
data = resp.json()

In [None]:
# Slack 전송
rate = data['rates']['KRW']
requests.post(webhook, json={
    'text': f'오늘의 환율은 {rate}원!'
})

### 환율 API -> csv, json

**기간 생성 함수**

In [65]:
from datetime import datetime, timedelta

def generate_date_list(start_date, end_date):
    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')

    date_list = []
    current = start

    while current <= end:
        date_list.append(current.strftime('%Y-%m-%d'))
        current += timedelta(days=1)
    
    return date_list

**환율 반환 함수**  
날짜(list, strt) -> 환율

In [66]:
import csv
import json
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
import time
import random
import requests

def get_exchange_rate(date, base="USD", save_format=None):
    
    if isinstance(date, str):
        dt = datetime.strptime(date, '%Y-%m-%d')
        if dt.weekday() >= 5:
            return None
        
        base_url = 'https://api.frankfurter.dev/v1'
        url = f"{base_url}/{date}?base={base}"

        resp = requests.get(url)
        
        # 응답 시각
        utc_time = resp.headers['Date']
        kst_time = parsedate_to_datetime(utc_time) + timedelta(hours=9)
        req_time = kst_time.strftime('%Y-%m-%d %H:%M:%S')

        data = resp.json()
        rates = data['rates']['KRW']
        
        return {
            'date': date,
            'response_time': req_time,
            'rates': rates
        }
    
    elif isinstance(date, list):
        results = []
        for each_date in date:
            result = get_exchange_rate(each_date, base)
            if result is not None:
                results.append(result)
            time.sleep(random.uniform(5, 10))

        # 저장
        if save_format == 'csv':
            save_to_csv(results)
        elif save_format == 'json':
            save_to_json(results)

        return results
    
def save_to_csv(data_list, filename='rates.csv'):
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=['date', 'response_time', 'rates'])
        writer.writeheader()
        writer.writerows(data_list)
    print(f'{filename} 저장 완료!')

def save_to_json(data_list, filename='rates.json'):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data_list, f, ensure_ascii=False, indent=2)
    print(f"{filename} 저장 완료!")

In [68]:
temp_date= generate_date_list('2025-10-17', '2025-10-21')
get_exchange_rate(temp_date)

[{'date': '2025-10-17',
  'response_time': '2025-10-22 19:43:41',
  'rates': 1420.58},
 {'date': '2025-10-20',
  'response_time': '2025-10-22 19:44:01',
  'rates': 1423.54},
 {'date': '2025-10-21',
  'response_time': '2025-10-22 19:44:11',
  'rates': 1430.5}]

### Slack + AWS

In [None]:
import json
import urllib.request
from datetime import datetime, timedelta
from email.utils import parsedate_to_datetime
import time
import random
import boto3
from io import StringIO
import csv
import os

def generate_date_list(start_date, end_date):
    """날짜 리스트 생성"""
    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')
    date_list = []
    current = start
    while current <= end:
        date_list.append(current.strftime('%Y-%m-%d'))
        current += timedelta(days=1)
    return date_list

def get_exchange_rate(date, base="USD"):
    """환율 조회"""
    if isinstance(date, str):
        dt = datetime.strptime(date, '%Y-%m-%d')
        if dt.weekday() >= 5:
            return None
        
        url = f"https://api.frankfurter.dev/v1/{date}?base={base}"
        
        with urllib.request.urlopen(url) as response:
            # 응답 시각
            utc_time = response.headers['Date']
            kst_time = parsedate_to_datetime(utc_time) + timedelta(hours=9)
            req_time = kst_time.strftime('%Y-%m-%d %H:%M:%S')
            
            # 데이터
            data = json.loads(response.read())
            rates = data['rates']['KRW']
        
        return {
            'date': date,
            'response_time': req_time,
            'rates': rates
        }
    
    elif isinstance(date, list):
        results = []
        for each_date in date:
            result = get_exchange_rate(each_date, base)
            if result is not None:
                results.append(result)
            time.sleep(random.uniform(5, 10))
        return results

def upload_to_s3(data_list, bucket_name, file_format='csv'):
    """S3에 파일 업로드"""
    s3 = boto3.client('s3')
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if file_format == 'csv':
        output = StringIO()
        writer = csv.DictWriter(output, fieldnames=['date', 'response_time', 'rates'])
        writer.writeheader()
        writer.writerows(data_list)
        
        filename = f'exchange_rates_{timestamp}.csv'
        s3.put_object(
            Bucket=bucket_name,
            Key=filename,
            Body=output.getvalue(),
            ContentType='text/csv'
        )
    
    elif file_format == 'json':
        filename = f'exchange_rates_{timestamp}.json'
        s3.put_object(
            Bucket=bucket_name,
            Key=filename,
            Body=json.dumps(data_list, ensure_ascii=False, indent=2),
            ContentType='application/json'
        )
    
    return filename

def send_slack(message):
    """Slack 메시지 전송"""
    webhook = os.environ['SLACK_WEBHOOK']
    
    req = urllib.request.Request(
        webhook,
        data=json.dumps({'text': message}).encode('utf-8'),
        headers={'Content-Type': 'application/json'}
    )
    urllib.request.urlopen(req)

def lambda_handler(event, context):
    """Lambda 핸들러"""
    try:
        # 환율 조회
        dates = generate_date_list('2025-08-15', '2025-08-30')
        results = get_exchange_rate(dates, base="USD")
        
        # S3 업로드
        bucket_name = os.environ['BUCKET_NAME']
        filename = upload_to_s3(results, bucket_name, file_format='csv')
        
        # Slack 알림
        send_slack(f'환율 데이터 저장 완료!\n파일: {filename}\n레코드 수: {len(results)}')
        
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Success', 'filename': filename})
        }
    
    except Exception as e:
        send_slack('환율 데이터 저장 실패')
        print(f'에러 상세: {str(e)}')
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }