Skip to content

Commit

Permalink
feat: Data deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
simonas-notcat committed Dec 10, 2019
1 parent 1928125 commit c5c10b1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 66 deletions.
28 changes: 17 additions & 11 deletions packages/daf-cli/src/data-explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ program
const answers = await inquirer.prompt([
{
type: 'list',
name: 'sub',
name: 'did',
choices: identities,
message: 'Identity',
},
Expand All @@ -43,16 +43,20 @@ program

switch (answers.type) {
case 'Sent Messages':
showMessageList(await dataStore.findMessages({ iss: answers.sub }))
showMessageList(await dataStore.findMessages({ sender: answers.did }))
break
case 'Received Messages':
showMessageList(await dataStore.findMessages({ sub: answers.sub }))
showMessageList(await dataStore.findMessages({ receiver: answers.did }))
break
case 'Credentials':
showCredentials(answers.sub)
showCredentials(answers.did)
break
}
}

if (cmd.messages) {
showMessageList(await dataStore.findMessages({}))
}
})

const showMessageList = async (messages: any) => {
Expand All @@ -64,23 +68,25 @@ const showMessageList = async (messages: any) => {
const answers = await inquirer.prompt([
{
type: 'list',
name: 'hash',
name: 'id',
choices: messages.map((item: any) => ({
name: `${formatDistanceToNow(item.nbf * 1000)} ${item.type}`,
value: item.hash,
name: `${formatDistanceToNow(item.timestamp * 1000)} ${item.type} from: ${item.sender?.did} to: ${
item.receiver?.did
}`,
value: item.id,
})),
message: 'Message',
},
])
showMessage(answers.hash)
showMessage(answers.id)
}

const showMessage = async (hash: string) => {
const message = await dataStore.findMessage(hash)
const showMessage = async (id: string) => {
const message = await dataStore.findMessage(id)
console.log(message)

const table = []
const credentials = await dataStore.credentialsForMessageHash(hash)
const credentials = await dataStore.credentialsForMessageId(id)
if (credentials.length > 0) {
for (const credential of credentials) {
const fields = await dataStore.credentialsFieldsForClaimHash(credential.hash)
Expand Down
17 changes: 16 additions & 1 deletion packages/daf-core/src/graphql-base-type-defs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,24 @@ export const baseTypeDefs = `
}
type Message {
hash: ID!
id: ID!
threadId: String
rowId: String!
type: String!
sender: Identity
receiver: Identity
raw: String!
data: String
timestamp: Int
metaData: [MessageMetaData]
thread: [Message]
}
type MessageMetaData {
rowId: String!
type: String!
id: String
data: String
}
`
113 changes: 82 additions & 31 deletions packages/daf-data-store/src/data-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export class DataStore {
return rows.map((row: any) => ({
rowId: `${row.rowid}`,
hash: row.hash,
parentHash: row.parent_hash,
iss: { did: row.iss },
sub: { did: row.sub },
jwt: row.jwt,
Expand All @@ -46,19 +45,19 @@ export class DataStore {
}))
}

async credentialsForMessageHash(hash: string) {
async credentialsForMessageId(id: string) {
const query = sql
.select('rowid', '*')
.from('verifiable_credentials')
.where({ parent_hash: hash })
.select('md.rowid', 'vc.*')
.from('verifiable_credentials_meta_data as md')
.leftJoin('verifiable_credentials as vc', { 'md.hash': 'vc.hash' })
.where({ 'md.message_id': id })
.toParams()

const rows = await this.db.rows(query.text, query.values)

return rows.map((row: any) => ({
rowId: `${row.rowid}`,
hash: row.hash,
parentHash: row.parent_hash,
iss: { did: row.iss },
sub: { did: row.sub },
jwt: row.jwt,
Expand Down Expand Up @@ -125,7 +124,6 @@ export class DataStore {
return rows2.map((row: any) => ({
rowId: `${row.rowid}`,
hash: row.hash,
parentHash: row.parent_hash,
iss: { did: row.iss },
sub: { did: row.sub },
jwt: row.jwt,
Expand All @@ -135,23 +133,35 @@ export class DataStore {
}))
}

async findMessages({ iss, sub, tag, limit }: { iss?: string; sub?: string; tag?: string; limit?: number }) {
async findMessages({
sender,
receiver,
threadId,
limit,
}: {
sender?: string
receiver?: string
threadId?: string
limit?: number
}) {
let where = {}

if (iss && sub) {
where = sql.or(where, { iss, sub })
if (sender && receiver) {
where = sql.or(where, { sender, receiver })
} else {
if (iss) where = sql.and(where, { iss })
if (sub) where = sql.and(where, { sub })
if (sender) where = sql.and(where, { sender })
if (receiver) where = sql.and(where, { receiver })
}
if (tag) where = sql.and(where, { tag })
where = sql.or(where, { sub: null })
if (sender || receiver) {
where = sql.or(where, { receiver: null })
}
if (threadId) where = sql.and(where, { thread_id: threadId })

let query = sql
.select('rowid', '*')
.from('messages')
.where(where)
.orderBy('nbf desc')
.orderBy('timestamp desc')

if (limit) {
query = query.limit(limit)
Expand All @@ -162,37 +172,36 @@ export class DataStore {
const rows = await this.db.rows(query.text, query.values)
return rows.map((row: any) => ({
rowId: `${row.rowid}`,
hash: row.hash,
iss: { did: row.iss },
sub: row.sub ? { did: row.sub } : null,
id: row.id,
sender: row.sender ? { did: row.sender } : null,
receiver: row.receiver ? { did: row.receiver } : null,
type: row.type,
tag: row.tag,
threadId: row.thread_id,
data: row.data,
jwt: row.jwt,
nbf: row.nbf,
iat: row.iat,
raw: row.raw,
timestamp: row.timestamp,
}))
}

async findMessage(hash: string) {
async findMessage(id: string) {
const query = sql
.select('rowid', '*')
.from('messages')
.where({ hash })
.where({ id })
.toParams()

const rows = await this.db.rows(query.text, query.values)

const mapped = rows.map((row: any) => ({
rowId: `${row.rowid}`,
hash: row.hash,
iss: { did: row.iss },
sub: row.sub ? { did: row.sub } : null,
id: row.id,
sender: row.sender ? { did: row.sender } : null,
receiver: row.receiver ? { did: row.receiver } : null,
type: row.type,
tag: row.tag,
jwt: row.jwt,
threadId: row.thread_id,
data: row.data,
nbf: row.nbf,
raw: row.raw,
timestamp: row.timestamp,
}))

return mapped[0]
Expand Down Expand Up @@ -239,7 +248,9 @@ export class DataStore {
let query = sql
.select('count(*) as count')
.from('messages')
.where(sql.or(sql.and({ iss: did1 }, { sub: did2 }), sql.and({ iss: did2 }, { sub: did1 })))
.where(
sql.or(sql.and({ sender: did1 }, { receiver: did2 }), sql.and({ sender: did2 }, { receiver: did1 })),
)
.toParams()
const rows = await this.db.rows(query.text, query.values)

Expand Down Expand Up @@ -419,6 +430,46 @@ export class DataStore {
return vcHash
}

async findMessagesByVC(hash: string) {
let query = sql
.select('md.rowid', 'm.*')
.from('verifiable_credentials_meta_data as md')
.leftJoin('messages as m', { 'md.message_id': 'm.id' })
.where({ 'md.hash': hash })

query = query.toParams()

const rows = await this.db.rows(query.text, query.values)
return rows.map((row: any) => ({
rowId: `${row.rowid}`,
id: row.id,
sender: row.sender ? { did: row.sender } : null,
receiver: row.receiver ? { did: row.receiver } : null,
type: row.type,
threadId: row.thread_id,
data: row.data,
raw: row.raw,
timestamp: row.timestamp,
}))
}

async messageMetaData(id: string) {
let query = sql
.select('rowid', '*')
.from('messages_meta_data')
.where({ message_id: id })

query = query.toParams()

const rows = await this.db.rows(query.text, query.values)
return rows.map((row: any) => ({
rowId: `${row.rowid}`,
type: row.type,
id: row.id,
data: row.data,
}))
}

deleteMessage(hash: string) {
return this.db.run('DELETE FROM messages where hash=?', [hash])
}
Expand Down
45 changes: 24 additions & 21 deletions packages/daf-data-store/src/graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ interface Context {

export const resolvers = {
Message: {
vc: async (message: any, {}, { dataStore }: Context) => dataStore.credentialsForMessageHash(message.hash),
vc: async (message: any, {}, { dataStore }: Context) => dataStore.credentialsForMessageId(message.id),
metaData: async (message: any, {}, { dataStore }: Context) => dataStore.messageMetaData(message.id),
thread: async (message: any, {}, { dataStore }: Context) => {
const messages = await dataStore.findMessages({ threadId: message.threadId })
console.log('AAAA', messages)
return messages.filter((item: any) => item.id !== message.id)
},
},
VerifiableClaim: {
fields: async (vc: any, {}, { dataStore }: Context) => dataStore.credentialsFieldsForClaimHash(vc.hash),
inMessages: async (vc: any, {}, { dataStore }: Context) => dataStore.findMessagesByVC(vc.hash),
},
Identity: {
shortId: async (identity: any, {}, { dataStore }: Context) => dataStore.shortId(identity.did),
Expand Down Expand Up @@ -47,13 +54,13 @@ export const resolvers = {
return dataStore.findCredentials({ iss: identity.did, sub: identity.did })
},
messagesSent: async (identity: any, args: any, { dataStore }: Context) => {
return dataStore.findMessages({ iss: identity.did })
return dataStore.findMessages({ sender: identity.did })
},
messagesReceived: async (identity: any, args: any, { dataStore }: Context) => {
return dataStore.findMessages({ sub: identity.did })
return dataStore.findMessages({ receiver: identity.did })
},
messagesAll: async (identity: any, args: any, { dataStore }: Context) => {
return dataStore.findMessages({ iss: identity.did, sub: identity.did })
return dataStore.findMessages({ sender: identity.did, receiver: identity.did })
},
},
Query: {
Expand All @@ -64,30 +71,34 @@ export const resolvers = {
},
messages: async (
_: any,
{ iss, sub, tag, limit }: { iss: string; sub: string; tag: string; limit: number },
{
sender,
receiver,
threadId,
limit,
}: { sender: string; receiver: string; threadId: string; limit: number },
{ dataStore }: Context,
) => {
return dataStore.findMessages({ iss, sub, tag, limit })
return dataStore.findMessages({ sender, receiver, threadId, limit })
},
message: async (_: any, { hash }: { hash: string }, { dataStore }: Context) =>
dataStore.findMessage(hash),
message: async (_: any, { id }: { id: string }, { dataStore }: Context) => dataStore.findMessage(id),
credentials: async (_: any, { iss, sub }: { iss: string; sub: string }, { dataStore }: Context) => {
const res = await dataStore.findCredentials({ iss, sub })
return res
},
},
Mutation: {
deleteMessage: async (_: any, { hash }: { hash: string }, { dataStore }: Context) =>
dataStore.deleteMessage(hash),
deleteMessage: async (_: any, { id }: { id: string }, { dataStore }: Context) =>
dataStore.deleteMessage(id),
},
}

export const typeDefs = `
extend type Query {
identity(did: ID!): Identity
identities(dids: [ID!]): [Identity]
messages(iss: ID, sub: ID, tag: String, limit: Int): [Message]
message(hash: ID!): Message!
messages(sender: ID, reveiver: ID, threadId: String, limit: Int): [Message]
message(id: ID!): Message!
credentials(iss: ID, sub: ID): [VerifiableClaim]
}
Expand All @@ -112,20 +123,11 @@ export const typeDefs = `
}
extend type Message {
iss: Identity!
sub: Identity
jwt: String!
data: String!
iat: Int
nbf: Int
vis: String
tag: String
vc: [VerifiableClaim]
}
type VerifiableClaim {
hash: ID!
parentHash: ID!
rowId: String!
iss: Identity!
sub: Identity!
Expand All @@ -135,6 +137,7 @@ export const typeDefs = `
iat: Int
exp: Int
fields: [VerifiableClaimField]
inMessages: [Message]
}
type VerifiableClaimField {
Expand Down
Loading

0 comments on commit c5c10b1

Please sign in to comment.