Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Newer
Older
100644 165 lines (130 sloc) 5.124 kb
803d209 @mitsuhiko Added as example app
authored
1 from datetime import datetime
2 from flask import Flask, request, url_for, redirect, g, session, flash, \
3 abort, render_template
4 from flask.ext.sqlalchemy import SQLAlchemy
5 from flask.ext.oauth import OAuth
6 from juggernaut import Juggernaut
7
8
9 app = Flask(__name__)
10 app.config.from_pyfile('config.cfg')
11 db = SQLAlchemy(app)
12 oauth = OAuth()
13 jug = Juggernaut()
14
15 facebook = oauth.remote_app('facebook',
16 base_url='https://graph.facebook.com/',
17 request_token_url=None,
18 access_token_url='/oauth/access_token',
19 authorize_url='https://www.facebook.com/dialog/oauth',
20 consumer_key='188477911223606',
21 consumer_secret='621413ddea2bcc5b2e83d42fc40495de',
22 request_token_params={'scope': 'email'}
23 )
24
25
26 def url_for_other_page(page):
27 args = request.view_args.copy()
28 args['page'] = page
29 return url_for(request.endpoint, **args)
30 app.jinja_env.globals['url_for_other_page'] = url_for_other_page
31
32
33 class Paste(db.Model):
34 id = db.Column(db.Integer, primary_key=True)
35 code = db.Column(db.Text)
36 pub_date = db.Column(db.DateTime)
37 user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
38 parent_id = db.Column(db.Integer, db.ForeignKey('paste.id'))
39 parent = db.relationship('Paste', lazy=True, backref='children',
40 uselist=False, remote_side=[id])
41
42 def __init__(self, user, code, parent=None):
43 self.user = user
44 self.code = code
45 self.pub_date = datetime.utcnow()
46 self.parent = parent
47
48
49 class User(db.Model):
50 id = db.Column(db.Integer, primary_key=True)
51 display_name = db.Column(db.String(120))
52 fb_id = db.Column(db.String(30), unique=True)
53 pastes = db.relationship(Paste, lazy='dynamic', backref='user')
54
55
56 def send_new_paste_notifications(paste, reply):
57 """Notifies clients about new pastes."""
58 user = None
59 user_id = None
60 if paste.user:
61 user = paste.user.display_name
62 user_id = paste.user.id
63 data = {'paste_id': paste.id, 'reply_id': reply.id, 'user': user}
64 jug.publish('paste-replies:%d' % paste.id, data)
65 if user_id is not None:
66 jug.publish('user-replies:%d' % user_id, data)
67
68
69 @app.before_request
70 def check_user_status():
71 g.user = None
72 if 'user_id' in session:
73 g.user = User.query.get(session['user_id'])
74
75
76 @app.route('/', methods=['GET', 'POST'])
77 def new_paste():
78 parent = None
79 reply_to = request.args.get('reply_to', type=int)
80 if reply_to is not None:
81 parent = Paste.query.get(reply_to)
82 if request.method == 'POST' and request.form['code']:
83 paste = Paste(g.user, request.form['code'], parent=parent)
84 db.session.add(paste)
85 db.session.commit()
86 if parent is not None:
87 send_new_paste_notifications(parent, paste)
88 return redirect(url_for('show_paste', paste_id=paste.id))
89 return render_template('new_paste.html', parent=parent)
90
91
92 @app.route('/<int:paste_id>')
93 def show_paste(paste_id):
94 paste = Paste.query.options(db.eagerload('children')).get_or_404(paste_id)
95 return render_template('show_paste.html', paste=paste)
96
97
98 @app.route('/<int:paste_id>/delete', methods=['GET', 'POST'])
99 def delete_paste(paste_id):
100 paste = Paste.query.get_or_404(paste_id)
101 if g.user is None or g.user != paste.user:
102 abort(401)
103 if request.method == 'POST':
104 if 'yes' in request.form:
105 db.session.delete(paste)
106 db.session.commit()
107 flash('Paste was deleted')
108 return redirect(url_for('new_paste'))
109 else:
110 return redirect(url_for('show_paste', paste_id=paste.id))
111 return render_template('delete_paste.html', paste=paste)
112
113
114 @app.route('/my-pastes/', defaults={'page': 1})
115 @app.route('/my-pastes/page/<int:page>')
116 def my_pastes(page):
117 if g.user is None:
118 return redirect(url_for('login', next=request.url))
119 pagination = Paste.query.filter_by(user=g.user).paginate(page)
120 return render_template('my_pastes.html', pagination=pagination)
121
122
123 @app.route('/login')
124 def login():
125 return facebook.authorize(callback=url_for('facebook_authorized',
126 next=request.args.get('next') or request.referrer or None,
127 _external=True))
128
129
130 @app.route('/logout')
131 def logout():
132 session.clear()
133 flash('You were logged out')
134 return redirect(url_for('new_paste'))
135
136
137 @app.route('/login/authorized')
138 @facebook.authorized_handler
139 def facebook_authorized(resp):
140 next_url = request.args.get('next') or url_for('new_paste')
141 if resp is None:
142 flash('You denied the login')
143 return redirect(next_url)
144
145 session['fb_access_token'] = (resp['access_token'], '')
146
147 me = facebook.get('/me')
148 user = User.query.filter_by(fb_id=me.data['id']).first()
149 if user is None:
150 user = User()
151 user.fb_id = me.data['id']
152 db.session.add(user)
153
154 user.display_name = me.data['name']
155 db.session.commit()
156 session['user_id'] = user.id
157
158 flash('You are now logged in as %s' % user.display_name)
159 return redirect(next_url)
160
161
162 @facebook.tokengetter
163 def get_facebook_oauth_token():
164 return session.get('fb_access_token')
Something went wrong with that request. Please try again.