Skip to content

Commit

Permalink
Merge 271dd5f into c7ca6ef
Browse files Browse the repository at this point in the history
  • Loading branch information
zkochan committed May 3, 2020
2 parents c7ca6ef + 271dd5f commit a0a3108
Show file tree
Hide file tree
Showing 18 changed files with 173 additions and 51 deletions.
9 changes: 9 additions & 0 deletions .changeset/violet-trainers-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@pnpm/cafs": minor
"@pnpm/fetcher-base": minor
"@pnpm/package-requester": minor
"pnpm": minor
"@pnpm/tarball-fetcher": minor
---

When a new package is being added to the store, its manifest is streamed in the memory. So instead of reading the manifest from the filesystem, we can parse the stream from the memory.
2 changes: 2 additions & 0 deletions packages/cafs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"dependencies": {
"@pnpm/fetcher-base": "workspace:7.0.0-alpha.1",
"@zkochan/rimraf": "1.0.0",
"concat-stream": "2.0.0",
"decompress-maybe": "^1.0.0",
"get-stream": "5.1.0",
"mz": "2.7.0",
Expand All @@ -27,6 +28,7 @@
"tar-stream": "^2.1.2"
},
"devDependencies": {
"@types/concat-stream": "1.6.0",
"@types/mz": "2.7.0",
"@types/node": "^13.13.4",
"@types/ssri": "^6.0.2",
Expand Down
25 changes: 18 additions & 7 deletions packages/cafs/src/addFilesFromDir.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { FilesIndex } from '@pnpm/fetcher-base'
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import fs = require('mz/fs')
import pLimit from 'p-limit'
import path = require('path')
import ssri = require('ssri')
import deferredManifestParsing from './deferredManifestParsing'

const limit = pLimit(20)

Expand All @@ -14,9 +15,10 @@ export default async function (
addBuffer: (buffer: Buffer, mode: number) => Promise<ssri.Integrity>,
},
dirname: string,
manifest?: DeferredManifestPromise,
) {
const index = {}
await _retrieveFileIntegrities(cafs, dirname, dirname, index)
await _retrieveFileIntegrities(cafs, dirname, dirname, index, manifest)
return index
}

Expand All @@ -28,6 +30,7 @@ async function _retrieveFileIntegrities (
rootDir: string,
currDir: string,
index: FilesIndex,
deferredManifest?: DeferredManifestPromise,
) {
try {
const files = await fs.readdir(currDir)
Expand All @@ -40,12 +43,20 @@ async function _retrieveFileIntegrities (
}
if (stat.isFile()) {
const relativePath = path.relative(rootDir, fullPath)
const generatingIntegrity = limit(async () => {
if (deferredManifest && rootDir === currDir && file === 'package.json') {
const buffer = await fs.readFile(fullPath)
deferredManifestParsing(buffer, deferredManifest)
return cafs.addBuffer(buffer, stat.mode)
}
if (stat.size < MAX_BULK_SIZE) {
const buffer = await fs.readFile(fullPath)
return cafs.addBuffer(buffer, stat.mode)
}
return cafs.addStream(fs.createReadStream(fullPath), stat.mode)
})
index[relativePath] = {
generatingIntegrity: limit(() => {
return stat.size < MAX_BULK_SIZE
? fs.readFile(fullPath).then((buffer) => cafs.addBuffer(buffer, stat.mode))
: cafs.addStream(fs.createReadStream(fullPath), stat.mode)
}),
generatingIntegrity,
mode: stat.mode,
size: stat.size,
}
Expand Down
10 changes: 9 additions & 1 deletion packages/cafs/src/addFilesFromTarball.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import { FilesIndex } from '@pnpm/fetcher-base'
import { DeferredManifestPromise, FilesIndex } from '@pnpm/fetcher-base'
import concatStream = require('concat-stream')
import decompress = require('decompress-maybe')
import ssri = require('ssri')
import { Duplex, PassThrough } from 'stream'
import tar = require('tar-stream')
import deferredManifestParsing from './deferredManifestParsing'

export default async function (
addStreamToCafs: (fileStream: PassThrough, mode: number) => Promise<ssri.Integrity>,
_ignore: null | ((filename: string) => Boolean),
stream: NodeJS.ReadableStream,
manifest?: DeferredManifestPromise,
): Promise<FilesIndex> {
const ignore = _ignore ? _ignore : () => false
const extract = tar.extract()
Expand All @@ -20,6 +23,11 @@ export default async function (
next()
return
}
if (filename === 'package.json' && manifest) {
fileStream.pipe(
concatStream((buffer) => deferredManifestParsing(buffer, manifest)),
)
}
const generatingIntegrity = addStreamToCafs(fileStream, header.mode!)
filesIndex[filename] = {
generatingIntegrity,
Expand Down
14 changes: 12 additions & 2 deletions packages/cafs/src/checkFilesIntegrity.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { DeferredManifestPromise } from '@pnpm/fetcher-base'
import rimraf = require('@zkochan/rimraf')
import fs = require('mz/fs')
import pLimit from 'p-limit'
import ssri = require('ssri')
import { getFilePathInCafs } from '.'
import deferredManifestParsing from './deferredManifestParsing'

const limit = pLimit(20)
const MAX_BULK_SIZE = 1 * 1024 * 1024 // 1MB

export default async function (
cafsDir: string,
integrityObj: Record<string, { size: number, mode: number, integrity: string }>,
manifest?: DeferredManifestPromise,
) {
let verified = true
await Promise.all(
Expand All @@ -24,6 +27,7 @@ export default async function (
!await verifyFile(
getFilePathInCafs(cafsDir, fstat),
fstat,
f === 'package.json' ? manifest : undefined,
)
) {
verified = false
Expand All @@ -34,8 +38,12 @@ export default async function (
return verified
}

async function verifyFile (filename: string, fstat: { size: number, integrity: string }) {
if (fstat.size > MAX_BULK_SIZE) {
async function verifyFile (
filename: string,
fstat: { size: number, integrity: string },
deferredManifest?: DeferredManifestPromise,
) {
if (fstat.size > MAX_BULK_SIZE && !deferredManifest) {
try {
const ok = Boolean(await ssri.checkStream(fs.createReadStream(filename), fstat.integrity))
if (!ok) {
Expand All @@ -60,6 +68,8 @@ async function verifyFile (filename: string, fstat: { size: number, integrity: s
const ok = Boolean(ssri.checkData(data, fstat.integrity))
if (!ok) {
await rimraf(filename)
} else if (deferredManifest) {
deferredManifestParsing(data, deferredManifest)
}
return ok
} catch (err) {
Expand Down
12 changes: 12 additions & 0 deletions packages/cafs/src/deferredManifestParsing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { DeferredManifestPromise } from '@pnpm/fetcher-base'

export default function (
buffer: Buffer,
deferred: DeferredManifestPromise,
) {
try {
deferred.resolve(JSON.parse(buffer.toString()))
} catch (err) {
deferred.reject(err)
}
}
3 changes: 3 additions & 0 deletions packages/cafs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"references": [
{
"path": "../fetcher-base"
},
{
"path": "../types"
}
]
}
1 change: 1 addition & 0 deletions packages/fetcher-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"homepage": "https://github.com/pnpm/pnpm/blob/master/packages/fetcher-base#readme",
"dependencies": {
"@pnpm/resolver-base": "workspace:7.0.0",
"@pnpm/types": "workspace:5.0.0",
"@types/ssri": "^6.0.2"
},
"funding": "https://opencollective.com/pnpm"
Expand Down
11 changes: 9 additions & 2 deletions packages/fetcher-base/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import { Resolution } from '@pnpm/resolver-base'
import { DependencyManifest } from '@pnpm/types'
import { Integrity } from 'ssri'

export type Cafs = {
addFilesFromDir: (dir: string) => Promise<FilesIndex>,
addFilesFromTarball: (stream: NodeJS.ReadableStream) => Promise<FilesIndex>,
addFilesFromDir: (dir: string, manifest?: DeferredManifestPromise) => Promise<FilesIndex>,
addFilesFromTarball: (stream: NodeJS.ReadableStream, manifest?: DeferredManifestPromise) => Promise<FilesIndex>,
}

export interface FetchOptions {
cachedTarballLocation: string,
manifest?: DeferredManifestPromise,
lockfileDir: string,
onStart?: (totalSize: number | null, attempt: number) => void,
onProgress?: (downloaded: number) => void,
}

export type DeferredManifestPromise = {
resolve: (manifest: DependencyManifest) => void,
reject: (err: Error) => void,
}

export type FetchFunction = (
cafs: Cafs,
resolution: Resolution,
Expand Down
3 changes: 3 additions & 0 deletions packages/fetcher-base/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
"references": [
{
"path": "../resolver-base"
},
{
"path": "../types"
}
]
}
4 changes: 3 additions & 1 deletion packages/git-fetcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
},
"devDependencies": {
"@pnpm/cafs": "workspace:1.0.0-alpha.1",
"@pnpm/git-fetcher": "link:"
"@pnpm/git-fetcher": "link:",
"@pnpm/types": "workspace:5.0.0",
"p-defer": "3.0.0"
},
"funding": "https://opencollective.com/pnpm"
}
5 changes: 3 additions & 2 deletions packages/git-fetcher/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Cafs } from '@pnpm/fetcher-base'
import { Cafs, DeferredManifestPromise } from '@pnpm/fetcher-base'
import rimraf = require('@zkochan/rimraf')
import execa = require('execa')
import path = require('path')
Expand All @@ -13,6 +13,7 @@ export default () => {
},
opts: {
cafs: Cafs,
manifest?: DeferredManifestPromise,
},
) {
const tempLocation = tempy.directory()
Expand All @@ -21,7 +22,7 @@ export default () => {
// removing /.git to make directory integrity calculation faster
await rimraf(path.join(tempLocation, '.git'))
return {
filesIndex: await opts.cafs.addFilesFromDir(tempLocation),
filesIndex: await opts.cafs.addFilesFromDir(tempLocation, opts.manifest),
}
},
}
Expand Down
11 changes: 8 additions & 3 deletions packages/git-fetcher/test/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
///<reference path="../../../typings/index.d.ts"/>
import createCafs from '@pnpm/cafs'
import createFetcher from '@pnpm/git-fetcher'
import { DependencyManifest } from '@pnpm/types'
import pDefer = require('p-defer')
import test = require('tape')
import tempy = require('tempy')

test('fetch', async t => {
const cafsDir = tempy.directory()
t.comment(`cafs at ${cafsDir}`)
const fetch = createFetcher().git
const fetchResult = await fetch({
const manifest = pDefer<DependencyManifest>()
const { filesIndex } = await fetch({
commit: 'c9b30e71d704cd30fa71f2edd1ecc7dcc4985493',
repo: 'https://github.com/kevva/is-positive.git',
}, {
cafs: createCafs(cafsDir),
manifest,
})
t.ok(fetchResult.filesIndex['package.json'])
t.ok(await fetchResult.filesIndex['package.json'].generatingIntegrity)
t.ok(filesIndex['package.json'])
t.ok(await filesIndex['package.json'].generatingIntegrity)
t.equal((await manifest.promise).name, 'is-positive')
t.end()
})
61 changes: 36 additions & 25 deletions packages/package-requester/src/packageRequester.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import createCafs, {
import { fetchingProgressLogger } from '@pnpm/core-loggers'
import {
Cafs,
DeferredManifestPromise,
FetchFunction,
FetchOptions,
FetchResult,
Expand Down Expand Up @@ -242,7 +243,10 @@ async function resolveAndFetch (

function fetchToStore (
ctx: {
checkFilesIntegrity: (integrity: Record<string, { size: number, mode: number, integrity: string }>) => Promise<boolean>,
checkFilesIntegrity: (
integrity: Record<string, { size: number, mode: number, integrity: string }>,
manifest?: DeferredManifestPromise,
) => Promise<boolean>,
fetch: (
packageId: string,
resolution: Resolution,
Expand Down Expand Up @@ -386,23 +390,30 @@ function fetchToStore (
// ignoring. It is fine if the integrity file is not present. Just refetch the package
}
// if target exists and it wasn't modified, then no need to refetch it
if (integrity && await ctx.checkFilesIntegrity(integrity)) {
files.resolve({
filesIndex: integrity,
fromStore: true,
})
if (opts.fetchRawManifest) {
readBundledManifest(ctx.getFilePathInCafs(integrity['package.json']))
.then(bundledManifest.resolve)
.catch(bundledManifest.reject)

if (integrity) {
const manifest = opts.fetchRawManifest
? pDefer<DependencyManifest>()
: undefined
const verified = await ctx.checkFilesIntegrity(integrity, manifest)
if (verified) {
files.resolve({
filesIndex: integrity,
fromStore: true,
})
if (manifest) {
manifest.promise
.then((manifest) => bundledManifest.resolve(pickBundledManifest(manifest)))
.catch(bundledManifest.reject)
}
finishing.resolve(undefined)
return
}
finishing.resolve(undefined)
return
packageRequestLogger.warn({
message: `Refetching ${target} to store. It was either modified or had no integrity checksums`,
prefix: opts.lockfileDir,
})
}
packageRequestLogger.warn({
message: `Refetching ${target} to store. It was either modified or had no integrity checksums`,
prefix: opts.lockfileDir,
})
}

// We fetch into targetStage directory first and then fs.rename() it to the
Expand All @@ -414,12 +425,21 @@ function fetchToStore (
// As much tarballs should be downloaded simultaneously as possible.
const priority = (++ctx.requestsQueue['counter'] % ctx.requestsQueue['concurrency'] === 0 ? -1 : 1) * 1000 // tslint:disable-line

const fetchManifest = opts.fetchRawManifest
? pDefer<DependencyManifest>()
: undefined
if (fetchManifest) {
fetchManifest.promise
.then((manifest) => bundledManifest.resolve(pickBundledManifest(manifest)))
.catch(bundledManifest.reject)
}
const fetchedPackage = await ctx.requestsQueue.add(() => ctx.fetch(
opts.pkgId,
opts.resolution,
{
cachedTarballLocation: path.join(ctx.storeDir, opts.pkgId, 'packed.tgz'),
lockfileDir: opts.lockfileDir,
manifest: fetchManifest,
onProgress: (downloaded) => {
fetchingProgressLogger.debug({
downloaded,
Expand Down Expand Up @@ -458,15 +478,6 @@ function fetchToStore (
await writeJsonFile(path.join(target, 'integrity.json'), integrity, { indent: undefined })
finishing.resolve(undefined)

let pkgName: string | undefined = opts.pkgName
if (!pkgName || opts.fetchRawManifest) {
const manifest = await readPackage(ctx.getFilePathInCafs(integrity['package.json'])) as DependencyManifest
bundledManifest.resolve(pickBundledManifest(manifest))
if (!pkgName) {
pkgName = manifest.name
}
}

if (isLocalTarballDep && opts.resolution['integrity']) { // tslint:disable-line:no-string-literal
await fs.writeFile(path.join(target, TARBALL_INTEGRITY_FILENAME), opts.resolution['integrity'], 'utf8') // tslint:disable-line:no-string-literal
}
Expand Down
Loading

0 comments on commit a0a3108

Please sign in to comment.