Skip to content

Commit

Permalink
Inbound RSS
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobobryant committed Feb 10, 2024
1 parent c5e9090 commit 6a0f3ee
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 4 deletions.
2 changes: 2 additions & 0 deletions deps.edn
@@ -1,6 +1,8 @@
{:paths ["src" "resources" "target/resources"]
:deps {com.biffweb/biff #:git{:url "https://github.com/jacobobryant/biff", :sha "529660f", :tag "v1.0.0"}
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}
remus/remus {:mvn/version "0.2.2"}
org.jsoup/jsoup {:mvn/version "1.11.3"}
metosin/muuntaja {:mvn/version "0.6.8"}
ring/ring-defaults {:mvn/version "0.3.4"}
org.clojure/clojure {:mvn/version "1.11.1"}
Expand Down
2 changes: 2 additions & 0 deletions dev/repl.clj
Expand Up @@ -74,6 +74,8 @@
'{:find (pull message [*])
:where [[message :message/text]]}))

(com.eelchat.subscriptions/fetch-rss (get-context))

;; Call this function if you make a change to main/initial-system,
;; main/components, :tasks, :queues, config.env, or deps.edn.
(main/refresh)
Expand Down
2 changes: 2 additions & 0 deletions src/com/eelchat.clj
Expand Up @@ -5,6 +5,7 @@
[com.eelchat.home :as home]
[com.eelchat.middleware :as mid]
[com.eelchat.ui :as ui]
[com.eelchat.subscriptions :as sub]
[com.eelchat.schema :as schema]
[clojure.test :as test]
[clojure.tools.logging :as log]
Expand All @@ -18,6 +19,7 @@
[app/module
(biff/authentication-module {})
home/module
sub/module
schema/module])

(def routes [["" {:middleware [mid/wrap-site-defaults]}
Expand Down
63 changes: 60 additions & 3 deletions src/com/eelchat/app.clj
@@ -1,7 +1,9 @@
(ns com.eelchat.app
(:require [com.biffweb :as biff :refer [q]]
[com.eelchat.subscriptions :as sub]
[com.eelchat.middleware :as mid]
[com.eelchat.ui :as ui]
[clojure.string :as str]
[ring.adapter.jetty9 :as jetty]
[rum.core :as rum]
[xtdb.api :as xt]))
Expand Down Expand Up @@ -95,22 +97,64 @@
[:div {:class "grow-[1.75]"}]]))))

(defn message-view [{:message/keys [membership text created-at]}]
(let [username (str "User " (subs (str membership) 0 4))]
(let [username (if (= :system membership)
"🎅🏻 System 🎅🏻"
(str "User " (subs (str membership) 0 4)))]
[:div
[:.text-sm
[:span.font-bold username]
[:span.w-2.inline-block]
[:span.text-gray-600 (biff/format-date created-at "d MMM h:mm aa")]]
[:p.whitespace-pre-wrap.mb-6 text]]))

(defn command-tx [{:keys [biff/db channel roles params]}]
(let [subscribe-url (second (re-find #"^/subscribe ([^\s]+)" (:text params)))
unsubscribe-url (second (re-find #"^/unsubscribe ([^\s]+)" (:text params)))
list-command (= (str/trimr (:text params)) "/list")
message (fn [text]
{:db/doc-type :message
:message/membership :system
:message/channel (:xt/id channel)
:message/text text
;; Make sure this message comes after the user's message.
:message/created-at (biff/add-seconds (java.util.Date.) 1)})]
(cond
list-command
[(message (apply
str
"Subscriptions:"
(for [url (->> (q db
'{:find (pull subscription [:subscription/url])
:in [channel]
:where [[subscription :subscription/channel channel]]}
(:xt/id channel))
(map :subscription/url)
sort)]
(str "\n - " url))))]

(not (contains? roles :admin))
nil

subscribe-url
[{:db/doc-type :subscription
:db.op/upsert {:subscription/url subscribe-url
:subscription/channel (:xt/id channel)}}
(message (str "Subscribed to " subscribe-url))]

unsubscribe-url
[{:db/op :delete
:xt/id (biff/lookup-id db :subscription/channel (:xt/id channel) :subscription/url unsubscribe-url)}
(message (str "Unsubscribed from " unsubscribe-url))])))

(defn new-message [{:keys [channel membership params] :as ctx}]
(let [message {:xt/id (random-uuid)
:message/membership (:xt/id membership)
:message/channel (:xt/id channel)
:message/created-at (java.util.Date.)
:message/text (:text params)}]
(biff/submit-tx (assoc ctx :biff.xtdb/retry false)
[(assoc message :db/doc-type :message)])
(concat [(assoc message :db/doc-type :message)]
(command-tx ctx)))
[:<>]))

(defn channel-page [{:keys [biff/db community channel] :as ctx}]
Expand Down Expand Up @@ -169,6 +213,19 @@
ws (get @chat-clients (:message/channel doc))]
(jetty/send! ws html))))

(defn on-new-subscription [{:keys [biff.xtdb/node] :as ctx} tx]
(let [db-before (xt/db node {::xt/tx-id (dec (::xt/tx-id tx))})]
(doseq [[op & args] (::xt/tx-ops tx)
:when (= op ::xt/put)
:let [[doc] args]
:when (and (contains? doc :subscription/url)
(nil? (xt/entity db-before (:xt/id doc))))]
(biff/submit-job ctx :fetch-rss (assoc doc :biff/priority 0)))))

(defn on-tx [ctx tx]
(on-new-message ctx tx)
(on-new-subscription ctx tx))

(defn wrap-community [handler]
(fn [{:keys [biff/db user path-params] :as ctx}]
(if-some [community (xt/entity db (parse-uuid (:id path-params)))]
Expand Down Expand Up @@ -203,4 +260,4 @@
:post new-message
:delete delete-channel}]
["/connect" {:get connect}]]]]
:on-tx on-new-message})
:on-tx on-tx})
12 changes: 11 additions & 1 deletion src/com/eelchat/schema.clj
Expand Up @@ -25,10 +25,20 @@
[:channel/title :string]
[:channel/community :community/id]]

:subscription/id :uuid
:subscription [:map {:closed true}
[:xt/id :subscription/id]
[:subscription/url :string]
[:subscription/channel :channel/id]
[:subscription/last-post-uri {:optional true} :string]
[:subscription/fetched-at {:optional true} inst?]
[:subscription/last-modified {:optional true} :string]
[:subscription/etag {:optional true} :string]]

:message/id :uuid
:message [:map {:closed true}
[:xt/id :message/id]
[:message/membership :membership/id]
[:message/membership [:or :membership/id [:enum :system]]]
[:message/text :string]
[:message/channel :channel/id]
[:message/created-at inst?]]})
Expand Down
73 changes: 73 additions & 0 deletions src/com/eelchat/subscriptions.clj
@@ -0,0 +1,73 @@
(ns com.eelchat.subscriptions
(:require [com.biffweb :as biff :refer [q]]
[remus :as remus])
(:import [org.jsoup Jsoup]))

(defn every-n-minutes [n]
(iterate #(biff/add-seconds % (* n 60)) (java.util.Date.)))

(defn subscriptions-to-update [db]
(q db
'{:find (pull subscription [*])
:in [t]
:where [[subscription :subscription/url]
[(get-attr subscription :subscription/fetched-at #inst "1970")
[fetched-at ...]]
[(<= fetched-at t)]]}
(biff/add-seconds (java.util.Date.) (* -60 30))))

(defn assoc-result [{:keys [biff/base-url]} {:subscription/keys [url last-modified etag] :as subscription}]
(assoc subscription ::result (biff/catchall-verbose
(remus/parse-url
url
{:headers (biff/assoc-some
{"User-Agent" base-url}
"If-None-Match" etag
"If-Modified-Since" last-modified)
:socket-timeout 5000
:connection-timeout 5000}))))

(defn format-post [{:keys [title author published-date updated-date link contents]}]
(let [text-body (some-> contents
first
:value
(Jsoup/parse)
(.text))
text-body (if (and text-body (< 300 (count text-body)))
(str (subs text-body 0 300) "...")
text-body)]
(str title " | " author " | " (or published-date updated-date) "\n"
text-body "\n"
link)))

(defn subscription-tx [{:subscription/keys [channel last-post-uri] :keys [xt/id ::result]}]
(let [post (-> result :feed :entries first)
uri ((some-fn :uri :link) post)]
(concat [(biff/assoc-some
{:db/doc-type :subscription
:db/op :update
:xt/id id
:subscription/fetched-at :db/now}
:subscription/last-post-uri uri
:subscription/last-modified (get-in result [:response :headers "Last-Modified"])
:subscription/etag (get-in result [:response :headers "Etag"]))]
(when (and (some? uri) (not= uri last-post-uri))
[{:db/doc-type :message
:message/membership :system
:message/channel channel
:message/created-at :db/now
:message/text (format-post post)}]))))

(defn fetch-rss [{:keys [biff/db] :as ctx}]
(doseq [subscription (subscriptions-to-update db)]
(biff/submit-job ctx :fetch-rss subscription)))

(defn fetch-rss-consumer [{:keys [biff/job] :as ctx}]
(biff/submit-tx ctx
(subscription-tx (assoc-result ctx job))))

(def module
{:tasks [{:task #'fetch-rss
:schedule #(every-n-minutes 5)}]
:queues [{:id :fetch-rss
:consumer #'fetch-rss-consumer}]})

0 comments on commit 6a0f3ee

Please sign in to comment.