forked from coleifer/peewee
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
298 lines (253 loc) · 10.4 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import datetime
from flask import Flask
from flask import g
from flask import redirect
from flask import request
from flask import session
from flask import url_for
abort, render_template, flash
from functools import wraps
from hashlib import md5
from peewee import *
# config - aside from our database, the rest is for use by Flask
DATABASE = 'tweepee.db'
DEBUG = True
SECRET_KEY = 'hin6bab8ge25*r=x&+5$0kn=-#log$pt^#@vrqjld!^2ci@g*b'
# create a flask application - this ``app`` object will be used to handle
# inbound requests, routing them to the proper 'view' functions, etc
app = Flask(__name__)
app.config.from_object(__name__)
# create a peewee database instance -- our models will use this database to
# persist information
database = SqliteDatabase(DATABASE)
# model definitions -- the standard "pattern" is to define a base model class
# that specifies which database to use. then, any subclasses will automatically
# use the correct storage. for more information, see:
# http://charlesleifer.com/docs/peewee/peewee/models.html#model-api-smells-like-django
class BaseModel(Model):
class Meta:
database = database
# the user model specifies its fields (or columns) declaratively, like django
class User(BaseModel):
username = CharField()
password = CharField()
email = CharField()
join_date = DateTimeField()
class Meta:
order_by = ('username',)
# it often makes sense to put convenience methods on model instances, for
# example, "give me all the users this user is following":
def following(self):
# query other users through the "relationship" table
return User.select().join(
Relationship, on=Relationship.to_user,
).where(Relationship.from_user == self)
def followers(self):
return User.select().join(
Relationship, on=Relationship.from_user,
).where(Relationship.to_user == self)
def is_following(self, user):
return Relationship.select().where(
(Relationship.from_user == self) &
(Relationship.to_user == user)
).count() > 0
def gravatar_url(self, size=80):
return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \
(md5(self.email.strip().lower().encode('utf-8')).hexdigest(), size)
# this model contains two foreign keys to user -- it essentially allows us to
# model a "many-to-many" relationship between users. by querying and joining
# on different columns we can expose who a user is "related to" and who is
# "related to" a given user
class Relationship(BaseModel):
from_user = ForeignKeyField(User, related_name='relationships')
to_user = ForeignKeyField(User, related_name='related_to')
# a dead simple one-to-many relationship: one user has 0..n messages, exposed by
# the foreign key. because we didn't specify, a users messages will be accessible
# as a special attribute, User.message_set
class Message(BaseModel):
user = ForeignKeyField(User)
content = TextField()
pub_date = DateTimeField()
class Meta:
order_by = ('-pub_date',)
# simple utility function to create tables
def create_tables():
database.connect()
User.create_table()
Relationship.create_table()
Message.create_table()
# flask provides a "session" object, which allows us to store information across
# requests (stored by default in a secure cookie). this function allows us to
# mark a user as being logged-in by setting some values in the session data:
def auth_user(user):
session['logged_in'] = True
session['user'] = user
session['username'] = user.username
flash('You are logged in as %s' % (user.username))
# view decorator which indicates that the requesting user must be authenticated
# before they can access the view. it checks the session to see if they're
# logged in, and if not redirects them to the login view.
def login_required(f):
@wraps(f)
def inner(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login'))
return f(*args, **kwargs)
return inner
# given a template and a SelectQuery instance, render a paginated list of
# objects from the query inside the template
def object_list(template_name, qr, var_name='object_list', **kwargs):
kwargs.update(
page=int(request.args.get('page', 1)),
pages=qr.count() / 20 + 1
)
kwargs[var_name] = qr.paginate(kwargs['page'])
return render_template(template_name, **kwargs)
# retrieve a single object matching the specified query or 404 -- this uses the
# shortcut "get" method on model, which retrieves a single object or raises a
# DoesNotExist exception if no matching object exists
# http://charlesleifer.com/docs/peewee/peewee/models.html#Model.get)
def get_object_or_404(model, **kwargs):
try:
return model.get(**kwargs)
except model.DoesNotExist:
abort(404)
# custom template filter -- flask allows you to define these functions and then
# they are accessible in the template -- this one returns a boolean whether the
# given user is following another user.
@app.template_filter('is_following')
def is_following(from_user, to_user):
return from_user.is_following(to_user)
# request handlers -- these two hooks are provided by flask and we will use them
# to create and tear down a database connection on each request. peewee will do
# this for us, but its generally a good idea to be explicit.
@app.before_request
def before_request():
g.db = database
g.db.connect()
@app.after_request
def after_request(response):
g.db.close()
return response
# views -- these are the actual mappings of url to view function
@app.route('/')
def homepage():
# depending on whether the requesting user is logged in or not, show them
# either the public timeline or their own private timeline
if session.get('logged_in'):
return private_timeline()
else:
return public_timeline()
@app.route('/private/')
def private_timeline():
# the private timeline exemplifies the use of a subquery -- we are asking for
# messages where the person who created the message is someone the current
# user is following. these messages are then ordered newest-first.
user = session['user']
messages = Message.select().where(
Message.user << user.following()
)
return object_list('private_messages.html', messages, 'message_list')
@app.route('/public/')
def public_timeline():
# simply display all messages, newest first
messages = Message.select()
return object_list('public_messages.html', messages, 'message_list')
@app.route('/join/', methods=['GET', 'POST'])
def join():
if request.method == 'POST' and request.form['username']:
try:
# use the .get() method to quickly see if a user with that name exists
user = User.get(username=request.form['username'])
flash('That username is already taken')
except User.DoesNotExist:
# if not, create the user and store the form data on the new model
user = User.create(
username=request.form['username'],
password=md5(request.form['password']).hexdigest(),
email=request.form['email'],
join_date=datetime.datetime.now()
)
# mark the user as being 'authenticated' by setting the session vars
auth_user(user)
return redirect(url_for('homepage'))
return render_template('join.html')
@app.route('/login/', methods=['GET', 'POST'])
def login():
if request.method == 'POST' and request.form['username']:
try:
user = User.get(
username=request.form['username'],
password=md5(request.form['password']).hexdigest()
)
except User.DoesNotExist:
flash('The password entered is incorrect')
else:
auth_user(user)
return redirect(url_for('homepage'))
return render_template('login.html')
@app.route('/logout/')
def logout():
session.pop('logged_in', None)
flash('You were logged out')
return redirect(url_for('homepage'))
@app.route('/following/')
@login_required
def following():
user = session['user']
return object_list('user_following.html', user.following(), 'user_list')
@app.route('/followers/')
@login_required
def followers():
user = session['user']
return object_list('user_followers.html', user.followers(), 'user_list')
@app.route('/users/')
def user_list():
users = User.select()
return object_list('user_list.html', users, 'user_list')
@app.route('/users/<username>/')
def user_detail(username):
# using the "get_object_or_404" shortcut here to get a user with a valid
# username or short-circuit and display a 404 if no user exists in the db
user = get_object_or_404(User, username=username)
# get all the users messages ordered newest-first -- note how we're accessing
# the messages -- user.message_set. could also have written it as:
# Message.select().where(user=user).order_by(('pub_date', 'desc'))
messages = user.message_set
return object_list('user_detail.html', messages, 'message_list', user=user)
@app.route('/users/<username>/follow/', methods=['POST'])
@login_required
def user_follow(username):
user = get_object_or_404(User, username=username)
Relationship.get_or_create(
from_user=session['user'],
to_user=user,
)
flash('You are now following %s' % user.username)
return redirect(url_for('user_detail', username=user.username))
@app.route('/users/<username>/unfollow/', methods=['POST'])
@login_required
def user_unfollow(username):
user = get_object_or_404(User, username=username)
Relationship.delete().where(
(Relationship.from_user == session['user']) &
(Relationship.to_user == user)
).execute()
flash('You are no longer following %s' % user.username)
return redirect(url_for('user_detail', username=user.username))
@app.route('/create/', methods=['GET', 'POST'])
@login_required
def create():
user = session['user']
if request.method == 'POST' and request.form['content']:
message = Message.create(
user=user,
content=request.form['content'],
pub_date=datetime.datetime.now()
)
flash('Your message has been created')
return redirect(url_for('user_detail', username=user.username))
return render_template('create.html')
# allow running from the command line
if __name__ == '__main__':
app.run()