/
app.py
113 lines (92 loc) · 3.22 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from flask import Flask, render_template, request, url_for, flash, redirect
import os
from boto3.dynamodb.conditions import Key
from boto3 import resource
from werkzeug.exceptions import abort
from datetime import datetime
dynamodb = resource('dynamodb')
posts_table = dynamodb.Table(os.environ["PostsTable"])
app = Flask(__name__)
app.config['SECRET_KEY'] = '454543gtgdfgdfgfdgfd'
@app.route('/')
def index():
posts = ''
try:
response = posts_table.scan()
posts = response['Items']
except Exception as error:
print("dynamo scan failed:", error, flush=True)
return render_template('index.html', posts=posts)
@app.route('/<int:post_id>')
def post(post_id):
post = get_post(post_id)
return render_template('post.html', post=post)
def get_post(post_id):
try:
response = posts_table.get_item(Key={'id': post_id})
post = response['Item']
except Exception as error:
print("dynamo get post failed:", error, flush=True)
abort(404)
return post
@app.route('/create', methods=('GET', 'POST'))
def create():
if request.method == 'POST':
title = request.form['title']
content = request.form['content']
created = str(datetime.now())
id = int(datetime.now().timestamp())
if not title:
flash('Title is required!')
else:
try:
#insert new post into dynamodb
posts_table.put_item(
Item={
'id': id,
'title': title,
'content': content,
'created': created
}
)
except Exception as error:
print("dynamo PUT failed:", error, flush=True)
return redirect(url_for('index'))
return render_template('create.html')
@app.route('/<int:id>/edit', methods=('GET', 'POST'))
def edit(id):
post = get_post(id)
if request.method == 'POST':
title = request.form['title']
content = request.form['content']
if not title:
flash('Title is required!')
else:
try:
posts_table.update_item(
Key={
'id': id
},
UpdateExpression="set title = :title, content = :content",
ExpressionAttributeValues={
':title': title,
':content': content
}
)
except Exception as error:
print("dynamo update failed:", error, flush=True)
return redirect(url_for('index'))
return render_template('edit.html', post=post)
@app.route('/<int:id>/delete', methods=('POST',))
def delete(id):
post = get_post(id)
try:
posts_table.delete_item(
Key={
'id': id
}
)
flash('"{}" was successfully deleted!'.format(post['title']))
except Exception as error:
print("dynamo delete failed:", error, flush=True)
return redirect(url_for('index'))