Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Newer
Older
100644 256 lines (212 sloc) 8.481 kb
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
1 # -*- coding: utf-8 -*-
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
2 """
3 MiniTwit
4 ~~~~~~~~
5
6 A microblogging application written with Flask and sqlite3.
7
8 :copyright: (c) 2010 by Armin Ronacher.
9 :license: BSD, see LICENSE for more details.
10 """
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
11 from __future__ import with_statement
12 import time
1640679 @mitsuhiko Updated examples to work with pypy which has a incomplete sqlite3 in …
authored
13 from sqlite3 import dbapi2 as sqlite3
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
14 from hashlib import md5
15 from datetime import datetime
16 from flask import Flask, request, session, url_for, redirect, \
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
17 render_template, abort, g, flash, _app_ctx_stack
3b36bef Improved documentation, added a contextmanager for request binding
Armin Ronacher authored
18 from werkzeug import check_password_hash, generate_password_hash
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
19
20
21 # configuration
22 DATABASE = '/tmp/minitwit.db'
23 PER_PAGE = 30
24 DEBUG = True
25 SECRET_KEY = 'development key'
26
27 # create our little application :)
28 app = Flask(__name__)
dfecc86 @mitsuhiko Ported examples over to new config. documented upgrading
authored
29 app.config.from_object(__name__)
30 app.config.from_envvar('MINITWIT_SETTINGS', silent=True)
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
31
32
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
33 def get_db():
34 """Opens a new database connection if there is none yet for the
35 current application context.
36 """
37 top = _app_ctx_stack.top
38 if not hasattr(top, 'sqlite_db'):
39 top.sqlite_db = sqlite3.connect(app.config['DATABASE'])
40 top.sqlite_db.row_factory = sqlite3.Row
41 return top.sqlite_db
42
43
44 @app.teardown_appcontext
45 def close_database(exception):
46 """Closes the database again at the end of the request."""
47 top = _app_ctx_stack.top
48 if hasattr(top, 'sqlite_db'):
49 top.sqlite_db.close()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
50
51
52 def init_db():
53 """Creates the database tables."""
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
54 with app.app_context():
55 db = get_db()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
56 with app.open_resource('schema.sql') as f:
57 db.cursor().executescript(f.read())
58 db.commit()
59
60
61 def query_db(query, args=(), one=False):
62 """Queries the database and returns a list of dictionaries."""
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
63 cur = get_db().execute(query, args)
64 rv = cur.fetchall()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
65 return (rv[0] if rv else None) if one else rv
66
67
68 def get_user_id(username):
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
69 """Convenience method to look up the id for a username."""
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
70 rv = query_db('select user_id from user where username = ?',
71 [username], one=True)
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
72 return rv[0] if rv else None
73
74
75 def format_datetime(timestamp):
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
76 """Format a timestamp for display."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
77 return datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d @ %H:%M')
78
79
80 def gravatar_url(email, size=80):
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
81 """Return the gravatar image for the given email address."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
82 return 'http://www.gravatar.com/avatar/%s?d=identicon&s=%d' % \
4ec7d2a @mitsuhiko Started working on documentation.
authored
83 (md5(email.strip().lower().encode('utf-8')).hexdigest(), size)
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
84
85
fb2d2e4 @mitsuhiko request_init -> before_request and request_shutdown -> after_request
authored
86 @app.before_request
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
87 def before_request():
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
88 g.user = None
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
89 if 'user_id' in session:
90 g.user = query_db('select * from user where user_id = ?',
91 [session['user_id']], one=True)
92
93
94 @app.route('/')
95 def timeline():
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
96 """Shows a users timeline or if no user is logged in it will
97 redirect to the public timeline. This timeline shows the user's
98 messages as well as all the messages of followed users.
99 """
100 if not g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
101 return redirect(url_for('public_timeline'))
102 return render_template('timeline.html', messages=query_db('''
103 select message.*, user.* from message, user
104 where message.author_id = user.user_id and (
105 user.user_id = ? or
106 user.user_id in (select whom_id from follower
107 where who_id = ?))
108 order by message.pub_date desc limit ?''',
109 [session['user_id'], session['user_id'], PER_PAGE]))
110
111
112 @app.route('/public')
113 def public_timeline():
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
114 """Displays the latest messages of all users."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
115 return render_template('timeline.html', messages=query_db('''
116 select message.*, user.* from message, user
117 where message.author_id = user.user_id
118 order by message.pub_date desc limit ?''', [PER_PAGE]))
119
120
121 @app.route('/<username>')
122 def user_timeline(username):
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
123 """Display's a users tweets."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
124 profile_user = query_db('select * from user where username = ?',
125 [username], one=True)
126 if profile_user is None:
127 abort(404)
73a47a0 @mitsuhiko Removed unused stuff from minitwit and fixed a bug.
authored
128 followed = False
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
129 if g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
130 followed = query_db('''select 1 from follower where
131 follower.who_id = ? and follower.whom_id = ?''',
3c821a0 @florentx Fix typos and remove unused import.
florentx authored
132 [session['user_id'], profile_user['user_id']],
133 one=True) is not None
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
134 return render_template('timeline.html', messages=query_db('''
135 select message.*, user.* from message, user where
136 user.user_id = message.author_id and user.user_id = ?
137 order by message.pub_date desc limit ?''',
138 [profile_user['user_id'], PER_PAGE]), followed=followed,
139 profile_user=profile_user)
140
141
142 @app.route('/<username>/follow')
143 def follow_user(username):
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
144 """Adds the current user as follower of the given user."""
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
145 if not g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
146 abort(401)
147 whom_id = get_user_id(username)
148 if whom_id is None:
149 abort(404)
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
150 db = get_db()
151 db.execute('insert into follower (who_id, whom_id) values (?, ?)',
152 [session['user_id'], whom_id])
153 db.commit()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
154 flash('You are now following "%s"' % username)
155 return redirect(url_for('user_timeline', username=username))
156
157
158 @app.route('/<username>/unfollow')
159 def unfollow_user(username):
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
160 """Removes the current user as follower of the given user."""
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
161 if not g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
162 abort(401)
163 whom_id = get_user_id(username)
164 if whom_id is None:
165 abort(404)
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
166 db = get_db()
167 db.execute('delete from follower where who_id=? and whom_id=?',
168 [session['user_id'], whom_id])
169 db.commit()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
170 flash('You are no longer following "%s"' % username)
171 return redirect(url_for('user_timeline', username=username))
172
173
4aa7621 Updated documentation. Starting to look pretty good
Armin Ronacher authored
174 @app.route('/add_message', methods=['POST'])
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
175 def add_message():
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
176 """Registers a new message for the user."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
177 if 'user_id' not in session:
178 abort(401)
179 if request.form['text']:
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
180 db = get_db()
181 db.execute('''insert into message (author_id, text, pub_date)
182 values (?, ?, ?)''', (session['user_id'], request.form['text'],
183 int(time.time())))
184 db.commit()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
185 flash('Your message was recorded')
186 return redirect(url_for('timeline'))
187
188
4aa7621 Updated documentation. Starting to look pretty good
Armin Ronacher authored
189 @app.route('/login', methods=['GET', 'POST'])
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
190 def login():
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
191 """Logs the user in."""
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
192 if g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
193 return redirect(url_for('timeline'))
194 error = None
195 if request.method == 'POST':
196 user = query_db('''select * from user where
197 username = ?''', [request.form['username']], one=True)
198 if user is None:
199 error = 'Invalid username'
200 elif not check_password_hash(user['pw_hash'],
201 request.form['password']):
202 error = 'Invalid password'
203 else:
204 flash('You were logged in')
205 session['user_id'] = user['user_id']
206 return redirect(url_for('timeline'))
207 return render_template('login.html', error=error)
208
209
4aa7621 Updated documentation. Starting to look pretty good
Armin Ronacher authored
210 @app.route('/register', methods=['GET', 'POST'])
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
211 def register():
2f5a4f8 @mitsuhiko Doc updates and typo fixes
authored
212 """Registers the user."""
03168a5 @mitsuhiko Removed the useless apishowcase example
authored
213 if g.user:
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
214 return redirect(url_for('timeline'))
215 error = None
216 if request.method == 'POST':
217 if not request.form['username']:
218 error = 'You have to enter a username'
219 elif not request.form['email'] or \
220 '@' not in request.form['email']:
221 error = 'You have to enter a valid email address'
222 elif not request.form['password']:
223 error = 'You have to enter a password'
224 elif request.form['password'] != request.form['password2']:
f2dc38c @mitsuhiko Added tests for minitwit. Testing with Flask is awesome
authored
225 error = 'The two passwords do not match'
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
226 elif get_user_id(request.form['username']) is not None:
227 error = 'The username is already taken'
228 else:
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
229 db = get_db()
230 db.execute('''insert into user (
231 username, email, pw_hash) values (?, ?, ?)''',
232 [request.form['username'], request.form['email'],
233 generate_password_hash(request.form['password'])])
234 db.commit()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
235 flash('You were successfully registered and can login now')
236 return redirect(url_for('login'))
237 return render_template('register.html', error=error)
238
239
240 @app.route('/logout')
241 def logout():
3c821a0 @florentx Fix typos and remove unused import.
florentx authored
242 """Logs the user out."""
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
243 flash('You were logged out')
244 session.pop('user_id', None)
245 return redirect(url_for('public_timeline'))
246
247
dfecc86 @mitsuhiko Ported examples over to new config. documented upgrading
authored
248 # add some filters to jinja
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
249 app.jinja_env.filters['datetimeformat'] = format_datetime
250 app.jinja_env.filters['gravatar'] = gravatar_url
251
252
253 if __name__ == '__main__':
c2e5799 @mitsuhiko Updated examples to new sqlite patterns and added new section to appc…
authored
254 init_db()
33850c0 @mitsuhiko Initial checkin of stuff that exists so far.
authored
255 app.run()
Something went wrong with that request. Please try again.