Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/upload original content #3839

Merged
merged 20 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ jobs:
yarn test
timeout-minutes: 10
env:
API_ENV: local
PG_HOST: localhost
PG_PORT: ${{ job.services.postgres.ports[5432] }}
PG_USER: app_user
Expand Down
2 changes: 1 addition & 1 deletion packages/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,4 @@
"volta": {
"extends": "../../package.json"
}
}
}
16 changes: 4 additions & 12 deletions packages/api/src/jobs/process-youtube-video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,7 @@ export const processYouTubeVideo = async (
undefined,
jobData.userId
)
if (
!libraryItem ||
libraryItem.state !== LibraryItemState.Succeeded ||
!libraryItem.originalContent
) {
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
logger.info(
`Not ready to get YouTube metadata job state: ${
libraryItem?.state ?? 'null'
Expand Down Expand Up @@ -382,7 +378,7 @@ export const processYouTubeVideo = async (
// enqueue a job to process the full transcript
const updatedContent = await addTranscriptPlaceholdReadableContent(
libraryItem.originalUrl,
libraryItem.originalContent
libraryItem.readableContent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, surprised this worked before

)

if (updatedContent) {
Expand Down Expand Up @@ -438,11 +434,7 @@ export const processYouTubeTranscript = async (
undefined,
jobData.userId
)
if (
!libraryItem ||
libraryItem.state !== LibraryItemState.Succeeded ||
!libraryItem.originalContent
) {
if (!libraryItem || libraryItem.state !== LibraryItemState.Succeeded) {
logger.info(
`Not ready to get YouTube metadata job state: ${
libraryItem?.state ?? 'null'
Expand Down Expand Up @@ -481,7 +473,7 @@ export const processYouTubeTranscript = async (
)
const updatedContent = await addTranscriptToReadableContent(
libraryItem.originalUrl,
libraryItem.originalContent,
libraryItem.readableContent,
transcriptHTML
)

Expand Down
24 changes: 16 additions & 8 deletions packages/api/src/jobs/rss/refreshFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import axios from 'axios'
import crypto from 'crypto'
import { parseHTML } from 'linkedom'
import Parser, { Item } from 'rss-parser'
import { v4 as uuid } from 'uuid'
import { FetchContentType } from '../../entity/subscription'
import { env } from '../../env'
import { ArticleSavingRequestStatus } from '../../generated/graphql'
Expand Down Expand Up @@ -72,13 +73,14 @@ export type RssFeedItem = Item & {
link: string
}

interface User {
interface UserConfig {
id: string
folder: FolderType
libraryItemId: string
}

interface FetchContentTask {
users: Map<string, User> // userId -> User
users: Map<string, UserConfig> // userId -> User
item: RssFeedItem
}

Expand Down Expand Up @@ -160,8 +162,12 @@ const getThumbnail = (item: RssFeedItem) => {
return item['media:thumbnail'].$.url
}

return item['media:content']?.find((media) => media.$.medium === 'image')?.$
.url
if (item['media:content']) {
return item['media:content'].find((media) => media.$?.medium === 'image')?.$
.url
}

return undefined
}

export const fetchAndChecksum = async (url: string) => {
Expand Down Expand Up @@ -276,13 +282,16 @@ const addFetchContentTask = (
) => {
const url = item.link
const task = fetchContentTasks.get(url)
const libraryItemId = uuid()
const userConfig = { id: userId, folder, libraryItemId }

if (!task) {
fetchContentTasks.set(url, {
users: new Map([[userId, { id: userId, folder }]]),
users: new Map([[userId, userConfig]]),
item,
})
} else {
task.users.set(userId, { id: userId, folder })
task.users.set(userId, userConfig)
}

return true
Expand Down Expand Up @@ -315,15 +324,14 @@ const createTask = async (
}

const fetchContentAndCreateItem = async (
users: User[],
users: UserConfig[],
feedUrl: string,
item: RssFeedItem
) => {
const payload = {
users,
source: 'rss-feeder',
url: item.link.trim(),
saveRequestId: '',
labels: [{ name: 'RSS' }],
rssFeedUrl: feedUrl,
savedAt: item.isoDate,
Expand Down
123 changes: 52 additions & 71 deletions packages/api/src/jobs/save_page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import {
ArticleSavingRequestStatus,
CreateLabelInput,
} from '../generated/graphql'
import { redisDataSource } from '../redis_data_source'
import { userRepository } from '../repository/user'
import { saveFile } from '../services/save_file'
import { savePage } from '../services/save_page'
import { uploadFile } from '../services/upload_file'
import { logError, logger } from '../utils/logger'
import { downloadFromUrl, uploadToSignedUrl } from '../utils/uploads'
import {
contentFilePath,
downloadFromBucket,
downloadFromUrl,
isFileExists,
uploadToSignedUrl,
} from '../utils/uploads'

const signToken = promisify(jwt.sign)

Expand All @@ -27,27 +32,19 @@ interface Data {
url: string
finalUrl: string
articleSavingRequestId: string
title: string
contentType: string
savedAt: string

state?: string
labels?: CreateLabelInput[]
source: string
folder: string
rssFeedUrl?: string
savedAt?: string
publishedAt?: string
taskId?: string
}

interface FetchResult {
finalUrl: string
title?: string
content?: string
contentType?: string
}

const isFetchResult = (obj: unknown): obj is FetchResult => {
return typeof obj === 'object' && obj !== null && 'finalUrl' in obj
}

const uploadPdf = async (
url: string,
userId: string,
Expand Down Expand Up @@ -120,32 +117,6 @@ const sendImportStatusUpdate = async (
}
}

const getCachedFetchResult = async (url: string) => {
const key = `fetch-result:${url}`
if (!redisDataSource.redisClient || !redisDataSource.workerRedisClient) {
throw new Error('redis client is not initialized')
}

let result = await redisDataSource.redisClient.get(key)
if (!result) {
logger.debug(`fetch result is not cached in cache redis ${url}`)
// fallback to worker redis client if the result is not found
result = await redisDataSource.workerRedisClient.get(key)
if (!result) {
throw new Error('fetch result is not cached')
}
}

const fetchResult = JSON.parse(result) as unknown
if (!isFetchResult(fetchResult)) {
throw new Error('fetch result is not valid')
}

logger.info('fetch result is cached', url)

return fetchResult
}

export const savePageJob = async (data: Data, attemptsMade: number) => {
const {
userId,
Expand All @@ -159,33 +130,29 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
taskId,
url,
finalUrl,
title,
contentType,
state,
} = data
let isImported,
isSaved,
state = data.state
let isImported, isSaved

try {
logger.info('savePageJob', {
logger.info('savePageJob', {
userId,
url,
finalUrl,
})

const user = await userRepository.findById(userId)
if (!user) {
logger.error('Unable to save job, user can not be found.', {
userId,
url,
finalUrl,
})
// if the user is not found, we do not retry
return false
}

// get the fetch result from cache
const fetchedResult = await getCachedFetchResult(finalUrl)
const { title, contentType } = fetchedResult
let content = fetchedResult.content

const user = await userRepository.findById(userId)
if (!user) {
logger.error('Unable to save job, user can not be found.', {
userId,
url,
})
// if the user is not found, we do not retry
return false
}

try {
// for pdf content, we need to upload the pdf
if (contentType === 'application/pdf') {
const uploadResult = await uploadPdf(
Expand All @@ -198,7 +165,7 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
{
url: finalUrl,
uploadFileId: uploadResult.uploadFileId,
state: state ? (state as ArticleSavingRequestStatus) : undefined,
state: (state as ArticleSavingRequestStatus) || undefined,
labels,
source,
folder,
Expand All @@ -218,27 +185,41 @@ export const savePageJob = async (data: Data, attemptsMade: number) => {
return true
}

if (!content) {
logger.info(`content is not fetched: ${finalUrl}`)
// set the state to failed if we don't have content
content = 'Failed to fetch content'
state = ArticleSavingRequestStatus.Failed
// download the original content
const filePath = contentFilePath(
userId,
articleSavingRequestId,
new Date(savedAt).getTime(),
'original'
)
const exists = await isFileExists(filePath)
if (!exists) {
logger.error('Original content file does not exist', {
finalUrl,
filePath,
})

throw new Error('Original content file does not exist')
}

// for non-pdf content, we need to save the page
const content = (await downloadFromBucket(filePath)).toString()
console.log('Downloaded original content from:', filePath)

// for non-pdf content, we need to save the content
const result = await savePage(
{
url: finalUrl,
clientRequestId: articleSavingRequestId,
title,
originalContent: content,
state: state ? (state as ArticleSavingRequestStatus) : undefined,
labels: labels,
state: (state as ArticleSavingRequestStatus) || undefined,
labels,
rssFeedUrl,
savedAt: savedAt ? new Date(savedAt) : new Date(),
savedAt,
publishedAt: publishedAt ? new Date(publishedAt) : null,
source,
folder,
originalContentUploaded: true,
},
user
)
Expand Down
19 changes: 14 additions & 5 deletions packages/api/src/routers/content_router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { enqueueBulkUploadContentJob } from '../utils/createTask'
import { logger } from '../utils/logger'
import { generateDownloadSignedUrl, isFileExists } from '../utils/uploads'
import {
contentFilePath,
generateDownloadSignedUrl,
isFileExists,
} from '../utils/uploads'

export function contentRouter() {
const router = Router()
Expand Down Expand Up @@ -58,7 +62,7 @@ export function contentRouter() {
const userId = claims.uid

const libraryItems = await findLibraryItemsByIds(libraryItemIds, userId, {
select: ['id', 'updatedAt'],
select: ['id', 'updatedAt', 'savedAt'],
})
if (libraryItems.length === 0) {
logger.error('Library items not found')
Expand All @@ -68,9 +72,14 @@ export function contentRouter() {
// generate signed url for each library item
const data = await Promise.all(
libraryItems.map(async (libraryItem) => {
const filePath = `content/${userId}/${
libraryItem.id
}.${libraryItem.updatedAt.getTime()}.${format}`
const date =
format === 'original' ? libraryItem.savedAt : libraryItem.updatedAt
const filePath = contentFilePath(
userId,
libraryItem.id,
date.getTime(),
format
)

try {
const downloadUrl = await generateDownloadSignedUrl(filePath, {
Expand Down
1 change: 0 additions & 1 deletion packages/api/src/routers/svc/following.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ export function followingServiceRouter() {
userId,
slug,
croppedPathname,
originalHtml: req.body.feedContent,
itemType: parsedResult?.pageType || PageType.Unknown,
canonicalUrl: url,
folder: FOLDER,
Expand Down
Loading
Loading